Two weeks ago I had zero experience with Spark, Hive, or Hadoop. Two weeks later I was able to reimplement Artsy sitemaps using Spark and even gave a “Getting Started” workshop to my team (with some help from @izakp). I’ve also made some pull requests into Hive-JSON-Serde and am starting to really understand what’s what in this fairly complex, yet amazing ecosystem.
This post will get you started with Hadoop, HDFS, Hive and Spark, fast.
What is Spark?
Apache Spark is a fast and general purpose engine for large-scale data processing. You can write code in Scala or Python and it will automagically parallelize itself on top of Hadoop. It basically runs map/reduce.
What is Hadoop and HDFS?
Hadoop is a software library, which is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It’s really a common library called Hadoop Common and a framework called Hadoop MapReduce that sits on top of a distributed file system, called HDFS.
What is Yarn?
The Hadoop Distributed File System or HDFS is a way to distribute file system data to a bunch of workers. The distribution, job scheduling and cluster resource management is done by a system called Yarn.
What is Hive?
Hadoop alone doesn’t know much about data structure and deals with text files. Most humans work with SQL, so the Apache Hive data warehouse software facilitates reading, writing, and managing large datasets residing in distributed HDFS storage using SQL. It lets you create and query a SQL schema on top of text files, which can be in various formats, including the usual CSV or JSON.
Running Locally
A good place to start is to run a few things locally.
Hadoop
On OSX run brew install hadoop
, then configure it (This post was helpful.) I turned the configuration into a script in my dotfiles. Once installed you can run hstart
. Once working, you can navigate to a local resource manager on https://localhost:50070
or the job tracker on https://localhost:8088
and run a small test.
$> wget https://www.dropbox.com/s/cyuah7lc31g0x3h/hadoop-mapreduce-examples-2.6.0.jar?dl=1 -O hadoop-mapreduce-examples-2.6.0.jar
$> hadoop jar hadoop-mapreduce-examples-2.6.0.jar pi 10 100
Number of Maps = 10
Samples per Map = 100
...
Job Finished in 3.203 seconds
Estimated value of Pi is 3.14800000000000000000
Hive
Once you’ve installed Hadoop, install Hive. On OSX run brew install hive
, the configure it. I turned the configuration into a script in my dofiles as well. The biggest difficulty is that you need to initialize a metastore where Hive stores its configuration information with schematool -initSchema -dbType derby
(or another dbType
, such as mysql
or postgres
). In the case of Derby, the metastore_db
is created in the same directory as from where you run the command, so it needs to be tied down to a location via hive-site.xml
.
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:derby:;databaseName=/usr/local/Cellar/hive/metastore_db;create=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
</configuration>
Once installed you can run hive
and get a hive>
prompt.
Before we do that, lets get some data into HDFS.
$> cat /tmp/physicists.csv
Albert,Einstein
Marie,Curie
$> hadoop fs -mkdir /user/data
$> hadoop fs -put /tmp/physicists.csv /user/data
$> hadoop fs -ls /user/data
-rw-r--r-- 1 dblock supergroup 39 2017-04-03 12:51 /user/data/physicists.csv
Load data into Hive. If you have existing data files you can just use those and add a schema on top of them with CREATE EXTERNAL TABLE
.
$> hive
hive> CREATE EXTERNAL TABLE physicists(first string, last string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = ",",
"quoteChar" = "'",
"escapeChar" = "\\"
)
STORED AS TEXTFILE
LOCATION '/user/data';
OK
Time taken: 0.602 seconds
hive> hive> SELECT * FROM physicists;
OK
Albert Einstein
Marie Curie
Time taken: 1.732 seconds, Fetched: 3 row(s)
Spark
Neither Hadoop or Hive are prerequisites to run Spark on OSX, install it with brew install apache-spark
. From your installation in /usr/local/Cellar/apache-spark/X.Y.Z
run ./bin/run-example SparkPi 10
from there. You should see Pi is roughly 3.1413551413551413
.
You can run a Spark shell with spark-shell
. Lets play with a sample dataset of country GDPs (Update: unfortunately lost forever) curated for us by @ByzantineFault during a recent @ArtsyOpenSource Spark workshop.
$> wget https://gist.githubusercontent.com/izakp/2244e9b256fab99cf8bbf6215d9c02c6/raw/a92422f29412fcac34e4847cf0d5409e1d057e29/gdp.json
$> spark-shell
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
scala> val gdpDF = spark.read.json("/tmp/gdp.json")
gdpDF: org.apache.spark.sql.DataFrame = [Country Code: string, Country Name: string ... 2 more fields]
scala> gdpDF.printSchema()
root
|-- Country Code: string (nullable = true)
|-- Country Name: string (nullable = true)
|-- Value: double (nullable = true)
|-- Year: long (nullable = true)
We can turn this data into a proper resilient distributed dataset (RDD).
scala> val gdpRDD = gdpDF.rdd
gdpRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[12] at rdd at <console>:27
scala> gdpRDD.count()
res3: Long = 10355
scala> val pairRDD = gdpRDD.map( row => (row.get(1), row.getAs[Double](2)) )
pairRDD: org.apache.spark.rdd.RDD[(Any, Double)] = MapPartitionsRDD[13] at map at <console>:29
scala> pairRDD.foreach(println)
(Arab World,3.6229613725645E10)
(Arab World,4.31351681965945E10)
(Arab World,5.47808799760272E10)
(Arab World,1.04523001271412E11)
(Arab World,1.15752821947206E11)
(Arab World,1.44057806919486E11)
(Arab World,1.65984443446144E11)
(Arab World,1.83528095316993E11)
(Arab World,2.47491798430331E11)
...
scala> pairRDD.groupByKey().foreach(println)
(Australia,CompactBuffer(1.85743084331952E10, 1.96516967185575E10, 1.98868854294994E10, 2.15007279650577E10, 2.37585395900997E10, 2.5930115354463E10, 2.72550117594355E10, 3.03830216149625E10, 3.264755291746E10, 3.66121626161944E10, 4.12520998992048E10, 4.51271138985329E10, 5.19369158878505E10, 6.37001921844971E10, 8.87899778924097E10, 9.70981838044517E10, 1.0483297617547E11, 1.10115852259693E11, 1.18238213399504E11, 1.34607520163581E11, 1.49679108635098E11, 1.7655752931615E11, 1.93684210526316E11, 1.76929340196537E11, 1.93232204310813E11, 1.80215540385058E11, 1.82032736429771E11, 1.89113287453679E11, 2.35787252619325E11, 2.9947479846918E11, 3.10944978838015E11, 3.25641629385449E11, 3.25313389217873E11, 3.12028527262507E11, 3.22874437910758E11, 3.68022720522721E11, 4.0140971168437E11, 4.35636249804351E11, 3.9932510343892E11, 3.8869219200401E11, 4.14987125541669E11, 3.784882472899E11, 3.94250732524069E11, 4.66451368666278E11, 6.12871674491393E11, 6.93338595699895E11, 7.47205750224618E11, 8.53441155688153E11, 1.05503165229816E12, 9.26283274398423E11, 1.14126776018815E12, 1.38806635609196E12, 1.53442590576266E12, 1.56037247312521E12, 1.45377021067204E12))
...
Lets calculate the total GDP for each country during the years surveyed.
scala> pairRDD.reduceByKey((a,b) => a + b).foreach(println)
(Australia,2.0298746299037598E13)
(Small states,2.778625996176125E12)
(Brazil,3.212770450746525E13)
...
For more advanced examples check out the Spark programming guide.
We can also get our data from our previous Hive installation (Spark comes with its own “standalone” hive, too) by linking hive-site.xml
into Spark’s libexec/conf/hive-site.xml
as in my dotfiles.
$> ln -s /usr/local/Cellar/hive/2.1.0/libexec/conf/hive-site.xmlhive/conf/hive-site.xml /usr/local/Cellar/spark/2.1.0/libexec/conf/hive-site.xmlhive/conf/hive-site.xml
$> spark-shell
scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@26838837
scala> val rdd = hc.sql("SELECT * FROM physicists")
scala> rdd.foreach(row => println(row))
[Albert,Einstein]
[Marie,Curie]
Next Steps
The next step should be to write a Spark job in Scala. If you want to share one, I’ll add it to this tutorial.