Yesterday I did a short piece of work to map IP addresses to their respective origin countries for one of our customers.
There are a bunch of useful libraries online that will help you determine whether an IP address within a particular network. The algebra is fairly simple as you can convert an IP address into a Base 10 representation fairly easly.
So we need to know 2 things that will help us:
All IP addresses can be represented as decimal numbers
All IP networks in countries can be represented using CIDR notation which is a range of numbers or hosts that can exist within a network
Take 192.168.0.1. This would be (192*256^3)+(168*256^2)+0+1 = 3232235521 using a simple algebraic formula.
We can also take CIDR notation which would be the network IP range and this would entail that we expand our algebraic formula to calculate a range of IPs. Using the same methodology as above we can represent the following 192.168.0.0/27 which is a network of hosts. This would result in 2^(32-27) hosts as IP addresses are 32 bit we use the 32 in our formula. This gives us 2^5 hosts or 32 hosts. If we extrapolate this our first IP would be 192.168.0.0 and our last 192.168.0.31. Converting both of these back to decimal or long values gives us 3232235520 - 3232235551 and we can easily check that 192.168.0.1 or 3232235521 falls into that range. This makes it much easier and quicker than doing a string compare.
Thankfully there are libraries that wrap up this functionality for us. Using Python the netaddr library should be used but this post will be in Scala. As such I'm going to use the risksense library which you can view here https://github.com/risksense/ipaddr
Firstly we'll add the library to our databricks cluster which can be done by clicking on the running cluster and then selecting the libraries tab, then add a library from Maven Central by clicking the search magnifying glass and typing in com.risksense for Scala 2.12 and select the only available vesion 1.0.2 - make sure you use a cluster that has Scala 2.12 installed. I am using Databricks 7.4 at the time of writing.
You may need to restart the cluster to be able to use the library.
You can now download the following an IP address database from the internet. I used this one https://datahub.io/core/geoip2-ipv4
I create a new database in a Databricks notebook called ips using the following:
%sql
CREATE DATABASE ips
Then go through the add new table wizard (you can read more here https://docs.databricks.com/data/tables.html) and import the file into the table.
I change the names of the columns in the file to be more user friendly. You should something like so.
This particular database has ~ 173K rows. This implementation was quick and dirty and I hope to be able to replace risksense with something simpler and quicker but in the short term you should be able to use this approach if you shuffle the contents of the file around so that it finds the data you need quickly rather than iterating through 173K rows hundreds of times on each task.
To test this out we'll add a single cell containing the following:
// com.risksense:ipaddr_2.12:1.0.2
import com.risksense.ipaddr._
val ipRegions = spark.read.table("ips.ipregions").cache()
ipRegions.count()
// broadcast the networks to each of the nodes - hacky crap replace
val ips = ipRegions.select("Network", "CountryName").rdd.map(networks => (networks(0), networks(1))).collectAsMap()
val broadcastedNetworks = sc.broadcast(ips)
What this does is reads in the table and caches and only uses the Network and CountryName columns, collects the table to the driver node as a Scala Map type and then broadcasts this so that each of the nodes in the cluster has a copy. Compressed this is fairly small so won't generate a lot of network traffic. We can then use this in a User Defined Function (UDF).
import scala.collection.mutable.Map
import org.apache.spark.sql.functions.{col, udf}
def getUdfNetworkRegion = (ipAddress: String) => {
val networkList = broadcastedNetworks.value.asInstanceOf[Map[String, String]]
val checkVal = networkList.find(network => IpNetwork(network._1).contains(ipAddress)).getOrElse((ipAddress, "Unknown"))
checkVal._2
}
val getRegionFromIp = spark.udf.register("GetRegionFromIp", getUdfNetworkRegion)
So firstly we'll create a function called getUdfNetworkRegion which will take an IP Address and return a country of origin name or Unknown if we can't find the IP in our database.
Then we register the UDF.
Now we should be able to test this.
import spark.implicits._
val ipSamples = Seq("85.236.136.12", "46.36.200.103", "103.30.248.22", "1.0.0.34", "156.0.201.1").toDF("IPAddresses")
val df = ipSamples.select($"IPAddresses", getRegionFromIp($"IPAddresses").alias("Country"))
display(df)
This takes about 13 seconds to run on a small 2 node cluster and yields the following.
For now I'm just returning countries but I'm thinking of extending this to use cities from another database and also lat/longs so that I can have better representation on a map in PowerBI. Watch this space.
Comments