• dazfuller

Just one more column, what could go wrong?

Sometimes, when you go scanning through the documentation for Spark, you come across notes about certain functions. These tend to offer little hints and tips about using the function, or to give warnings about their usage.


One of these functions is the DataFrame withColumn function. Taken as of the Spark 3.1.1 documentation the note on it reads.

This method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even StackOverflowException. To avoid this, use select() with the multiple columns at once.

But these seems such a harmless little function, that just simple tells Spark that we'd like to add a new column based on some expression. So what exactly is it doing?


The introduction of a projection is similar to a select statement in SQL, it takes the columns and expressions requested and presents them as columns. This way any subsequent operations can refer to the new projected column.


For example, if we takethe following code.

df = (spark.read.table("staging.landreg_price_paid")
           .withColumn("deed_date_year", year("deed_date"))
           .withColumn("deed_date_month", month("deed_date"))
           .withColumn("postcode_outward", element_at(split("postcode", " "), 1))
           .withColumn("postcode_district", regexp_extract("postcode_outward", "\w+\d+", 0)))

for i in range(0, 11):
    df = df.withColumn(f"col_{i}", lit(0).cast("double"))

df.explain(extended=True)

This is reading in from an existing table, adding some new columns (where postcode_district depends on postcode_outward), and the adds in another 10 column, just to simulate a process where we're adding in extra columns later on. Finally this is calling "explain" to show the various stages of the execution plan.


And the resulting execution plans? Well they look like this.

Execution plan when using a lot of withColumn statements
Execution plan when using a lot of withColumn statements

If you can read the plans then you'll notice that by the time we've gotten to the Optimized and Physical plans Spark has managed to optimize out all of the Projections. This is great because it means that it's not doing all of that work when it's actually executed. But it's also not so great as Spark has had to spend time optimizing all of that out! Hopefully you can see how, if there were even more of these, we'd eventually get to a situation which would result in the overflow exception errors mentioned in the note.


So what can we do instead? Well in the note it says that we can use a single Select statement with all of the columns instead. And we can do that by exploding our our additional comments, and with a little refactoring to get it to look like this.

cols = [lit(0).cast("double").alias(f"col_{i}") for i in range(0, 11)]

df2 = (spark.read.table("staging.landreg_price_paid")
            .select(
              "*"
              , year("deed_date").alias("deed_date_year")
              , month("deed_date").alias("deed_date_month")
              , element_at(split("postcode", " "), 1).alias("postcode_outward")
              , regexp_extract(element_at(split("postcode", " "), 1), "\w+\d+", 0).alias("postcode_district")
              , *cols
            ))

df2.explain(extended=True)

Here we're using a single select statement that selects all of the existing columns (the wildcard character), our new columns with a little repetition, and all of the new columns by using the unpack operator (or "splat" which I think is a much better name).


So, does this make a difference to the execution plans? Well lets have a look.

Execution plan using a single Select statement
Execution plan using a single Select statement

Without even reading it this is looking a lot better. We're still arriving at an Optimized and Physical plan which are pretty much the same as before, but our Logical plans are much smaller, meaning that the optimizer has much less work to do. As you can see in the image they now only contain a single projection which is the expected output of the query. This means we're now much less likely to run into the stack overflow exception mentioned in the withColumn note.


But is there a performance impact to this as well? I ran both sets on a Databricks 8.2 Runtime, which provides Spark 3, several times and took the best time from each set. With all of the withColumn statements the best run time was 2.22 seconds, and using the single select statement this came down to 0.62 seconds. So we're removing potential issues and improving performance as we go.


So should we always avoid the withColumn statement? Well not really as it's still an incredibly useful function. In the improved example above you can see I'm repeating myself with the code for postcode_outward, so we could improve this by using a single withColumn statement and then using that in the select. Or when we're reading in files it can be useful to add a withColumn statement to append the input file name. The key is to reduce our usage of withColumn to only those times when it's necessary, and if we're doing a lot of transformations - either directly or in a loop - then we should look at alternatives instead.

45 views0 comments

Recent Posts

See All

Why leave bad data to chance?

Something that we often see as Spark jobs are moved into production is that handling of bad data is either ignored, or a lot of effort goes into validating the data and, specifically for this post, th

When in doubt, shell out

The command line is a powerful environment that lets you do a lot of work quickly, easily, and in a repeatable way