• dazfuller

Why leave bad data to chance?

Something that we often see as Spark jobs are moved into production is that handling of bad data is either ignored, or a lot of effort goes into validating the data and, specifically for this post, the data types. The largest culprit for driving this behaviour is the "inferSchema" option when reading data types which do not contain or enforce their own schema (CSV, JSON, etc...). For this post I'm going to focus on CSV, as it's still one of the most widely used data formats for exchanging data (no, Excel is not a valid choice!).


Having the ability to infer a schema isn't a bad thing, I use it a lot when exploring data sets, and it can be a great way to short-cut how you define a schema. In some data sources you're better off leaving it to the data source reader to work things out for you because the data could be wildly inconsistent each time you read it. So it definitely has a place in the toolbox. But... If you're reading the same kind of data, time and again, as part of a production pipeline, you're going to want to make sure that data is consistent. And if it's not consistent? Well, then you have choices about what you do. We could only read in valid records, or we could capture the bad data and do something else with it.


So lets have a look at a really simple data read process and see what we can break. I'm doing this in PySpark so most people can follow along, but the same is true in Scala, Java, or .NET.

source_data: str = [
  "id,dt,value",
  "1,2021-01-19T00:10:02,17.3",
  "2,2021-02-04T13:12:52,19.2",
  "3,2021-01-30T10:59:06,21.3"
]

df = spark.read.csv(sc.parallelize(source_data),
                    header=True,
                    inferSchema=True)
df.printSchema()

df.show()

Here we're creating a simple list of comma-separated values, converting them to an RDD, and then reading that data in specifying that there is a header record, and that we want the schema inferred. This gives the following output.


root
 |-- id: integer (nullable = true)
 |-- dt: timestamp (nullable = true)
 |-- value: double (nullable = true)

+---+-------------------+-----+
| id|                 dt|value|
+---+-------------------+-----+
|  1|2021-01-19 00:10:02| 17.3|
|  2|2021-02-04 13:12:52| 19.2|
|  3|2021-01-30 10:59:06| 21.3|
+---+-------------------+-----+

So it's read all of the data, given the correct columns, and worked out that "dt" is a timestamp, and "value" is a double, great.


This is only 4 records including the header, but by inferring the schema we've asked the data source to do 2 things for us. The first is that we've asked it to scan over the data to work out what it's schema is, based on the configuration we've provided (or left as defaults). The second thing we've asked it to do is to then re-read the data again, and extract it according to the schema we asked it to infer. 4 records is pretty quick, but if this was a few gigabytes of data every read, that's a lot of extra processing that needs to happen. Extra processing means more time spent doing compute, slower performance, and potentially more CO2 emissions.


Another piece of this is, what happens if we receive some corrupt data? Well, lets have a look at what schema inferrence does in this case.

source_data: str = [
  "id,dt,value",
  "1,2021-01-19T00:10:02,17.3",
  "2,2021-02-04T13:12:52,19.2",
  "3,2021-01-30T10:59:06,21.3",
  "4,2021-31-12T14:01:02,13.2"
]

df = spark.read.csv(sc.parallelize(source_data),
                    header=True,
                    inferSchema=True)
df.printSchema()

df.show()

Here we've added a new record for the 12th day of the 31st month, which is clearly rubbish, so what happens?

root
 |-- id: integer (nullable = true)
 |-- dt: string (nullable = true)
 |-- value: double (nullable = true)

+---+-------------------+-----+
| id|                 dt|value|
+---+-------------------+-----+
|  1|2021-01-19T00:10:02| 17.3|
|  2|2021-02-04T13:12:52| 19.2|
|  3|2021-01-30T10:59:06| 21.3|
|  4|2021-31-12T14:01:02| 13.2|
+---+-------------------+-----+

Well, that's not really great. We have our fourth record in our output, with an invalid timestamp. Looking at the schema we can see it's because the "dt" column is now a string, which is what the data source will default to if it can't work out a clearly defined data type.


At this point you will often see people start doing things such as iterating over the columns to work out column types. Or creating new columns, casting the possibly suspect data into the target data type. If the value is null then the cast failed and it's an invalid record, unless of course the original value was also null, then what? Alternatively, people will sometimes just continue to work with the data as a string and add a lot more functions in later to work with it. Or, people don't check at all and the pipeline fails, requiring someone to investigate the issue, fix the source data, or update the pipeline to handle this new scenario.


So, in this case where we've added in the corrupt data we've asked Spark to scan our data twice, once to get the schema, and again to get the data, and it's wrong. So what's the better option here? Well, moving into production systems the better option is to provide the schema when we read the data.

from pyspark.sql.types import *

schema = StructType([
  StructField("id", IntegerType(), nullable=False),
  StructField("dt", TimestampType(), nullable=False),
  StructField("value", DoubleType(), nullable=False)
])

source_data: str = [
  "id,dt,value",
  "1,2021-01-19T00:10:02,17.3",
  "2,2021-02-04T13:12:52,19.2",
  "3,2021-01-30T10:59:06,21.3",
  "4,2021-31-12T14:01:02,bob"
]

df = spark.read.csv(sc.parallelize(source_data),
                    header=True,
                    schema=schema)
df.printSchema()

df.show(truncate=False)

This is exactly the same as before, except we've now told Spark what we expect the schema to be, we're also still saying that the data contains a record header. The output from this looks as follows.

root
 |-- id: integer (nullable = false)
 |-- dt: timestamp (nullable = false)
 |-- value: double (nullable = false)

+---+-------------------+-----+
|id |dt                 |value|
+---+-------------------+-----+
|1  |2021-01-19 00:10:02|17.3 |
|2  |2021-02-04 13:12:52|19.2 |
|3  |2021-01-30 10:59:06|21.3 |
|4  |1970-01-01 00:00:00|0.0  |
+---+-------------------+-----+

We still have 4 data records, but now we have a "dt" value of the 1st Jan 1970, and a "value" of 0. The reason for this is that the data source defaults to "PERMISSIVE" mode, in this mode the reader will read the record up to the first error it finds, it then puts the contents of the record into a new column (if one is defined). Any data which has been successfully read is added to the row, and all other data is set to null or default values which is what we're seeing here. Importantly though, we've now only scanned the data once, for data of any good size that is a significant amount of processing time saved.


So, can we see the corrupt record data? Yes, by changing the options and schema to specify the corrupt column.

from pyspark.sql.types import *

schema = StructType([
  StructField("id", IntegerType(), nullable=False),
  StructField("dt", TimestampType(), nullable=False),
  StructField("value", DoubleType(), nullable=False),
  StructField("corrupt_data", StringType(), nullable=True)
])

source_data: str = [
  "id,dt,value",
  "1,2021-01-19T00:10:02,17.3",
  "2,2021-02-04T13:12:52,19.2",
  "3,2021-01-30T10:59:06,21.3",
  "4,2021-31-12T14:01:02,bob"
]

df = spark.read.csv(sc.parallelize(source_data),
                    header=True,
                    schema=schema,
                    columnNameOfCorruptRecord="corrupt_data")
df.printSchema()

df.show(truncate=False)

As you can see we've added a new nullable string field to the schema, this is required for the corrupt record column. We then add the name of the column to the read operation.

root
 |-- id: integer (nullable = false)
 |-- dt: timestamp (nullable = false)
 |-- value: double (nullable = false)
 |-- corrupt_data: string (nullable = true)
 
+---+-------------------+-----+-------------------------+
|id |dt                 |value|corrupt_data             |
+---+-------------------+-----+-------------------------+
|1  |2021-01-19 00:10:02|17.3 |null                     |
|2  |2021-02-04 13:12:52|19.2 |null                     |
|3  |2021-01-30 10:59:06|21.3 |null                     |
|4  |1970-01-01 00:00:00|0.0  |4,2021-31-12T14:01:02,bob|
+---+-------------------+-----+-------------------------+

We can now easily see which records are corrupt and do something else with them, whether that's error reporting, saving it out somewhere else, or processing of a different kind. But what if we don't want to use the values at all? Well, I mentioned that this was using "PERMISSIVE" mode, but there are 2 other modes as well including DROPMALFORMED which will ignore any record which is corrupt, that looks a little more like this.

from pyspark.sql.types import *

schema = StructType([
  StructField("id", IntegerType(), nullable=False),
  StructField("dt", TimestampType(), nullable=False),
  StructField("value", DoubleType(), nullable=False)
])

source_data: str = [
  "id,dt,value",
  "1,2021-01-19T00:10:02,17.3",
  "2,2021-02-04T13:12:52,19.2",
  "3,2021-01-30T10:59:06,21.3",
  "4,2021-31-12T14:01:02,bob"
]

df = spark.read.csv(sc.parallelize(source_data),
                    header=True,
                    schema=schema,
                    mode="DROPMALFORMED")
df.printSchema()

df.show(truncate=False)

As you can see we've now removed the corrupt column from our schema and no longer reference it in the reader. But we have introduced the "mode" option and set it to "DROPMALFORMED", which will now tell the reader to ignore any record which is corrupt.

root
 |-- id: integer (nullable = false)
 |-- dt: timestamp (nullable = false)
 |-- value: double (nullable = false)

+---+-------------------+-----+
|id |dt                 |value|
+---+-------------------+-----+
|1  |2021-01-19 00:10:02|17.3 |
|2  |2021-02-04 13:12:52|19.2 |
|3  |2021-01-30 10:59:06|21.3 |
+---+-------------------+-----+

The final model which I'm not showing here is "FAILFAST", this is the setting you want to use if you want the pipeline to fail as quickly as possible if a corrupt record exists. It's not often that this option is used, but it can be critical in processes where the integrity of the data is paramount and even a single corrupt record means that the data supply chain needs to be investigated.


In all of these final cases, once we have performed the read operation, we have a data frame with columns of the correct name and with the correct data types, improving the quality of our data at the point we read it. We also have options available to us to determine how exactly we handle scenarios where the source data might be unreliable at times.


In addition, by not inferring the schema every time we read the data we are no longer scanning the the data twice, saving valuable time and compute. Performance is not about making our code work faster, but making it do less work to get the result we want. By making the cluster do less work we save time and save compute cycles making our code more efficient and greener in the process.

30 views0 comments

Recent Posts

See All