Daniel Doubrovkine bio photo

Daniel Doubrovkine

aka dB., CTO at artsy.net, fun at playplay.io, NYC

Email Twitter LinkedIn Github

I recently started working with Apache Spark, Hadoop, HDFS and Hive. I made some jobs in Scala that generated website sitemaps and another that calculated the imporance of an artwork amongst all other artist’s artworks. Today I wrote an inverted index that exploded a dictionary of art genome data.

The input is millions of rows containing a (gene name -> gene value) dictionary, stored in HDFS as map<string,double>, and the output is many more millions of rows of JSON such as { artist_id: "id", name: "gene name", value: "gene value" } exported to S3. This index is ultimately imported back into MongoDB and used on Artsy for fast lookup of artworks or artists that have certain genes within a requested range.

In order to write a single file of output to send to S3 our Spark code calls RDD[string].collect(). This works well for small data sets - we can save a .jsondump file to the local file system and send it to S3.

def save(
  results: RDD[String],
): Unit = {
  // create a temporary file
  val file = File.createTempFile(buildName, "jsondump")

  // stream data into the file
  val bw = new BufferedWriter(new FileWriter(file))
  results.collect().foreach { f =>

  // send the file to S3
  val request = new PutObjectRequest("bucket", "file.jsondump", file)
  new AmazonS3Client.putObject(request)

For larger datasets this code causes java.lang.OutOfMemoryError : GC overhead limit exceeded. An obvious solution would be to partition the data and send pieces to S3, but that would also require changing the import code that consumes that data. Fortunately, Spark lets you mount S3 as a file system and use its built-in functions to write unpartitioned data.

def save(
  results: RDD[String],
): Unit = {
  new AmazonS3Client.copyObject(
    "bucket", "file.jsondump.tmp/part-00000",
    "bucket", "file.jsondump"
  new AmazonS3Client.deleteObject("bucket", s"file.jsondump.tmp/part-00000")
  new AmazonS3Client.deleteObject("bucket", s"file.jsondump.tmp/_SUCCESS")

In the code above repartition doesn’t bring the results into driver memory, it just prompts a shuffle of the data on the network to one single location, unlike collect. This can be slow due to network overhead but doesn’t run out of memory. Then, we use the native saveAsTextFile to stream this data to S3 and cleanup after ourselves.

For s3a:// to work you need to configure credentials globally, however you can also do it in code if you want to experiment.

results.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "...")
results.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "...")

A few useful notes and links.