Bringing Azure Databricks to Security Data
Back in October 2020 I had the amazing opportunity to with with Richard on a small project where we wanted to speed up processing of CEF (Common Event Format) data sets. The Common Event Format is a method of structuring data which can be sent over syslog and is supported by most security devices and systems.The format itself consists of two parts to each message. The header - which is a fixed format and pipe separated - and the extensions which are a collection of key/value pairs.
So before diving into the how, lets look at the why. Organisations can collect large volumes of CEF data from various sources and depend on different tools to consume that data. But what happens when that data gets really big, or you want to bring data analytics and data science to your security data? At that point you want to bring in tools which are built for the job, and the most obvious of these is Spark. By bringing all of the data together you can start to provide a more holistic view of your security landscape, joining lots of different sources together, and even looking back over larger periods of time by offloading the scale that you need to Spark. Even better if you can use a platform service such as Databricks where you no longer need to worry about how to configure and scale your Spark solution.
Richard recently did a post on mapping IP addresses to their country of origin, this is an obvious next step here where you can not only bring all of this data together, but you can start to enrich it with additional data sets giving you more ways of viewing security incidents, or creating curated views of data which you can then create dashboards from.
Going into this solution we had some key requirements which we wanted to meet, these were requirements which were intended to make the end solution as easy to end users as possible, users who might not know Scala (we'll assume that they probably don't), might not know Python (though this gets more likely), but probably have some SQL skills (most people do).
Should be usable from Scala, Python, and Spark-SQL
Needs to handle data which does not entirely conform to the CEF standard
Should handle the schema for known CEF extenions
There was already an existing solution for ingesting this data, but it meant that data had to be read as text, and then split into columns by Spark after being read. This could get tricky as the standard itself is not a particularly clean format even when implemented correctly. The following is an example CEF record.
CEF:0|FireEye|CMS|188.8.131.52420|DM|domain-match|1|rt=Feb 09 2015 00:27:43 UTC dvc=10.201.78.190 cn3Label=cncPort cn3=53 cn2Label=sid cn2=80494706 shost=dev001srv02.example.com proto=udp cs5Label=cncHost cs5=mfdclk001.org dvchost=DEVFEYE1 spt=61395 dvc=10.100.25.16 smac=00:00:0c:07:ac:00 cn1Label=vlan cn1=0 externalId=851777 cs4Label=link cs4=https://DEVCMS01.example.com/event_stream/events_for_bot?ev_id\\=851777 dmac=00:1d:a2:af:32:a1 cs1Label=sname cs1=Trojan.Generic.DNS
Inside of the header section values can contain pipe characters as long as they are escaped. In the extensions section values can contain equality characters "=" as long as they are escaped, the value strings are not delimited either. All of this means that there is a lot of regular expression work to extract the necessary values, including look arounds to handle character escaping.
To make this into a solution which would work across languages, and not rely on a data engineering team to first ingest all of the data into tables which others could then later consume, we took the decision to create our own Spark data source. Creating a data source is not a straight-forward affair, the documentation is lacking and the implementation is changing with the advent of the V2 Data Source APIs. We took the further decision to target the V2 Data Source APIs, even though they are still under development, because they provide a number of useful features which we didn't have to code ourselves, but it meant a lot of scanning through the Spark code base to see how they worked (thank goodness for Open Source projects and Github). The only drawback was that it limited us to supporting Scala 2.12 and Spark 3.0.1, this meant that the Databricks Runtime needed to support this is 7.3 LTS and above. Though, this had the benefit of requiring workloads to move to a Long Term Support runtime.
We already had existing parsing code which we could bring in, but rather than relying on the Spark APIs we were now able to operate at a much lower level as we were processing the data before it was distributed out into a Spark Data Frame. And we were able to unit test the code as it was being built to ensure it handled the CEF specification, and deviations from it (because not everyone is perfect). I'll not go into the full details of the implementation, but if you want to see or even contribute you can check out the code on Github.
The final result of the work is a JAR file which can be added into the Spark environment (again, this becomes a lot easier using a platform like Databricks). From this you can then use the data source in Scala (because it's implemented in Scala), from Python (because it registers itself as a data source), and from Spark-SQL. The data used here is taken from open-legacy on Github.
The parser analyses the records and applies data type conversions where it knows how to, so where it sees a value of rt=Feb 09 2015 00:27:43 UTC it knows to convert this to a timestamp value of 2015-02-09 00:27:43Z. There were a few assumptions we had to make here as the CEF standard allows date and time values without timezone or year specifiers, so in those cases we assume UTC for time zone, and 1970 for the year.
Continuing on from Richard's post about IP locations, we can then bring that data in and join it to the CEF data to find out where in the world, in this use case, the target of an outbound request is located.
In our testing we found that moving from reading as text and post-processing the data, to using the custom data source led, in most scenarios to a 10x performance increase. Obviously we were really happy with this, it meant we could data to the users faster, drive out insights more rapidly and, of course, save money as the cluster did not need to be on for so long to process the data files.
Most recently we have introduced the ability to write data back out in CEF format, allowing integration with existing tool sets, though of course the data set being saved has to conform to the CEF standard itself. We also added in a feature which validates options passed to the data source and have it provide error information saying what it thought you meant to write.
Feel free to check out the source and help out by contributing if there's a feature you want, or if you spot a bug.