We've all seen it at some point I'm sure. Data was created in something like a spreadsheet and, over the years, it's been added to by introducing new columns, rather than new rows. From a human point-of-view it can make a sort of sense, scroll right for the latest data, instead of scrolling through loads of rows to get there. But for data engineering, when people want to see year-on-year type summaries it's horrible, especially when the is going to stay as an active feed of data and so it will just keep growing!
This kind of view of data is a pivoted view, and it's typically what people love to do in Excel using Pivot Tables. But we have the opposite problem, we need to get the data out of this pivoted view into something more usable from which we can create aggregations, facts and dimensions, and just make it easier to query. We need to un-pivot the data, and we're going to use Spark in Azure Databricks to do it.
You might want to look at the stack function and see if it works for you first. But in our case here we're going to use a combination of maps and explode. You'd normally do this in 1 or 2 steps, but to show what's happening I'm going to go through it step-by-step.
First up we're going to create a map, mapping each year to the 3 column values for each row. In Scala we can use the native map feature, but in PySpark we have to use the Spark create_map function, along with the chain function from the itertools library.
years = [2020, 2019, 2018]
kv_map = create_map(
list(
chain(
*[(lit(y), array(col(f"pop_{y}"), col(f"density_{y}"), col(f"median_age_{y}"))) for y in years])
)
)
This is a bit of a lengthier process than in Scala, but it works just as well. Inside of the list comprehension we're creating a set of values, but to use this with the create_map function we need to flatten it which is why we're using chain. Chain returns chain object so we just list to get a list from it before it's passed to create_map.
Once done we can then use kv_map directly in a withColumn calls as it's a now a Column object.
top_5_countries_collated = top_5_countries.withColumn("collated", kv_map)
For each row we now have a mapping of the year to the values for that year in the order of population, density, and median age. We can now explode this data out quite simply using the explode function.
top_5_countries_exploded = top_5_countries_collated.selectExpr("name", "explode(collated)")
Here I'm using selextExpr in case I wanted to include something like a wildcard in future, but you could use the explode function directly. This now gives us the following.
Now we have a record for each country and year, with a "value" column containing the population, density, and median age values. From this point it's a case of some simple extraction and formatting.
top_5_countries_unpivoted = top_5_countries_exploded.select(
col("name")
, col("key").alias("year")
, element_at("value", 1).alias("population")
, element_at("value", 2).alias("density_pkm2")
, element_at("value", 3).alias("median_age")
)
And that's it, our data is now un-pivoted and useful.
From here we can now filter based on year, or use features such as windowing to provide additional data such as population change by year.
window = Window.partitionBy("name").orderBy("year")
display(
top_5_countries_unpivoted
.withColumn("prev_population", coalesce(lag("population", 1).over(window), "population"))
.select(
col("name"),
col("year"),
col("population"),
(col("population") - col("prev_population")).alias("population_change")
)
)
Getting data into a good shape is a key part to Data Engineering, and we often get data in all sorts of shape and quality. Having snippets like this in your toolbox means that you can more rapidly get data transformed, cleaned, and into a useful state so that business can start driving out more value from it.
Comments