标签:
Hadoop is the standard tool for distributed computing across really large data sets and is the reason why you see "Big Data" on advertisements as you walk through the airport. It has become an operating system for Big Data, providing a rich ecosystem of tools and techniques that allow you to use a large cluster of relatively cheap commodity hardware to do computing at supercomputer scale. Two ideas from Google in 2003 and 2004 made Hadoop possible: a framework for distributed storage (The Google File System), which is implemented as HDFS in Hadoop, and a framework for distributed computing (MapReduce).
These two ideas have been the prime drivers for the advent of scaling analytics, large scale machine learning, and other big data appliances for the last ten years! However, in technology terms, ten years is an incredibly long time, and there are some well-known limitations that exist, with MapReduce in particular. Notably, programming MapReduce is difficult. You have to chain Map and Reduce tasks together in multiple steps for most analytics. This has resulted in specialized systems for performing SQL-like computations or machine learning. Worse, MapReduce requires data to be serialized to disk between each step, which means that the I/O cost of a MapReduce job is high, making interactive analysis and iterative algorithms very expensive; and the thing is, almost all optimization and machine learning is iterative.
To address these problems, Hadoop has been moving to a more general resource management framework for computation, YARN (Yet Another Resource Negotiator). YARN implements the next generation of MapReduce, but also allows applications to leverage distributed resources without having to compute with MapReduce. By generalizing the management of the cluster, research has moved toward generalizations of distributed computation, expanding the ideas first imagined in MapReduce.
Spark is the first fast, general purpose distributed computing paradigm resulting from this shift and is gaining popularity rapidly. Spark extends the MapReduce model to support more types of computations using a functional programming paradigm, and it can cover a wide range of workflows that previously were implemented as specialized systems built on top of Hadoop. Spark uses in-memory caching to improve performance and, therefore, is fast enough to allow for interactive analysis (as though you were sitting on the Python interpreter, interacting with the cluster). Caching also improves the performance of iterative algorithms, which makes it great for data theoretic tasks, especially machine learning.
In this post we will first discuss how to set up Spark to start easily performing analytics, either simply on your local machine or in a cluster on EC2. We then will explore Spark at an introductory level, moving towards an understanding of what Spark is and how it works (hopefully motivating further exploration). In the last two sections we will start to interact with Spark on the command line and then demo how to write a Spark application in Python and submit it to the cluster as a Spark job.
Spark is pretty simple to set up and get running on your machine. All you really need to do is download one of the pre-built packages and so long as you have Java 6+ and Python 2.6+ you can simply run the Spark binary on Windows, Mac OS X, and Linux. Ensure that the java
program is on your PATH
or that the JAVA_HOME
environment variable is set. Similarly, python
must also be in your PATH
.
Assuming you already have Java and Python:
At this point, you‘ll have to figure out how to go about things depending on your operating system. Windows users, please feel free to comment about tips to set up in the comments section.
Generally, my suggestion is to do as follows (on a POSIX OS):
Unzip Spark
~$ tar -xzf spark-1.2.0-bin-hadoop2.4.tgz
Move the unzipped directory to a working application directory (C:\Program Files
for example on Windows, or /opt/
on Linux). Where you move it to doesn‘t really matter, so long as you have permissions and can run the binaries there. I typically install Hadoop and related tools in /srv/
on my Ubuntu boxes, and will use that directory here for illustration.
~$ mv spark-1.2.0-bin-hadoop2.4.tgz /srv/spark-1.2.0
Symlink the version of Spark to a spark
directory. This will allow you to simply download new/older versions of Spark and modify the link to manage Spark versions without having to change your path or environment variables.
~$ ln -s /srv/spark-1.2.0 /srv/spark
Edit your BASH profile to add Spark to your PATH
and to set the SPARK_HOME
environment variable. These helpers will assist you on the command line. On Ubuntu, simply edit the ~/.bash_profile
or ~/.profile
files and add the following:
export SPARK_HOME=/srv/spark
export PATH=$SPARK_HOME/bin:$PATH
After you source your profile (or simply restart your terminal), you should now be able to run a pyspark
interpreter locally. Execute the pyspark
command, and you should see a result as follows:
~$ pyspark
Python 2.7.8 (default, Dec 2 2014, 12:45:58)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties
[… snip …]
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ `_/
/__ / .__/\_,_/_/ /_/\_\ version 1.2.0
/_/
Using Python version 2.7.8 (default, Dec 2 2014 12:45:58)
SparkContext available as sc.
>>>
At this point Spark is installed and ready to use on your local machine in "standalone mode." You can develop applications here and submit Spark jobs that will run in a multi-process/multi-threaded mode, or you can configure this machine as a client to a cluster (though this is not recommended as the driver plays an important role in Spark jobs and should be in the same network as the rest of the cluster). Probably the most you will do with Spark on your local machine beyond development is to use the spark-ec2
scripts to configure an EC2 Spark cluster on Amazon‘s cloud.
When Googling around for helpful Spark tips, I discovered a couple posts that mentioned how to configure PySpark with IPython notebook. IPython notebook is an essential tool for data scientists to present their scientific and theoretical work in an interactive fashion, integrating both text and Python code. For many data scientists, IPython notebook is their first introduction to Python and is used widely so I thought it would be worth including it in this post.
Most of the instructions here are adapted from an IPython notebook: Setting up IPython with PySpark. However, we will focus on connecting your IPython shell to PySpark in standalone mode on your local computer rather than on an EC2 cluster. If you would like to work with PySpark/IPython on a cluster, feel free to check out those instructions and if you do, please comment on how it went!
Create an iPython notebook profile for our Spark configuration.
~$ ipython profile create spark
[ProfileCreate] Generating default config file: u‘$HOME/.ipython/profile_spark/ipython_config.py‘
[ProfileCreate] Generating default config file: u‘$HOME/.ipython/profile_spark/ipython_notebook_config.py‘
[ProfileCreate] Generating default config file: u‘$HOME/.ipython/profile_spark/ipython_nbconvert_config.py‘
Keep note of where the profile has been created, and replace the appropriate paths in the following steps:
Create a file in $HOME/.ipython/profile_spark/startup/00-pyspark-setup.py
and add the following:
import os
import sys
# Configure the environment
if ‘SPARK_HOME‘ not in os.environ:
os.environ[‘SPARK_HOME‘] = ‘/srv/spark‘
# Create a variable for our root path
SPARK_HOME = os.environ[‘SPARK_HOME‘]
# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "build"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))
Start up an IPython notebook with the profile we just created.
~$ ipython notebook --profile spark
In your notebook, you should see the variables we just created.
print SPARK_HOME
At the top of your IPython notebook, make sure you add the Spark context.
from pyspark import SparkContext
sc = SparkContext( ‘local‘, ‘pyspark‘)
Test the Spark context by doing a simple computation using IPython.
def isprime(n):
"""
check if integer n is a prime
"""
# make sure n is a positive integer
n = abs(int(n))
# 0 and 1 are not primes
if n < 2:
return False
# 2 is the only even prime number
if n == 2:
return True
# all other even numbers are not primes
if not n & 1:
return False
# range starts with 3 and only needs to go up the square root of n
# for all odd numbers
for x in range(3, int(n**0.5)+1, 2):
if n % x == 0:
return False
return True
# Create an RDD of numbers from 0 to 1,000,000
nums = sc.parallelize(xrange(1000000))
# Compute the number of primes in the RDD
print nums.filter(isprime).count()
If you get a number without errors, then your context is working correctly!
Editor‘s Note: The above configures an IPython context for directly invoking IPython notebook with PySpark. However, you can also launch a notebook using PySpark directly as follows:
$ IPYTHON_OPTS="notebook --pylab inline" pyspark
Either methodology works similarly depending on your use case for PySpark and IPython. The former allows you to more easily connect to a cluster with IPython notebook, and thus, it is the method I prefer.
In my time teaching distributed computing with Hadoop, I‘ve discovered that a lot can be taught locally on a pseudo-distributed node or in single-node mode. However, in order to really get what‘s happening, a cluster is necessary. There is often a disconnect between learning these skills and the actual computing requirements when data just gets too large. If you have a little bit of money to spend learning how to use Spark in detail, I would recommend setting up a quick cluster for experimentation. Note that a cluster of 5 slaves (and 1 master) used at a rate of approximately 10 hours per week will cost you approximately $45.18 per month.
A full discussion can be found at the Spark documentation: Running Spark on EC2. Be sure to read this documentation thoroughly as you‘ll end up sending money on an EC2 cluster if you start these steps! I‘ve highlighted a few key points here:
Export your key pairs to your environment. Either issue these commands in your shell, or add them to your profile.
export AWS_ACCESS_KEY_ID=myaccesskeyid
export AWS_SECRET_ACCESS_KEY=mysecretaccesskey
Note that different utilities use different environment names, so make sure to use these for the Spark scripts.
Launch a cluster as follows:
~$ cd $SPARK_HOME/ec2
ec2$ ./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> launch <cluster-name>
SSH into a cluster to run Spark jobs.
ec2$ ./spark-ec2 -k <keypair> -i <key-file> login <cluster-name>
Destroy a cluster as follows.
ec2$ ./spark-ec2 destroy <cluster-name>.
These scripts will automatically create a local HDFS cluster for you to add data to, and there is a copy-dir
command that will allow you to sync code and data to the cluster. However, your best bet is to simply use S3 for data storage and create RDDs that load data using the s3://
URI.
Now that we have Spark set up, let‘s have a bit of a discussion about what Spark is. Spark is a general purpose cluster computing framework that provides efficient in-memory computations for large data sets by distributing computation across multiple computers. If you‘re familiar with Hadoop, then you know that any distributed computing framework needs to solve two problems: how to distribute data and how to distribute computation. Hadoop uses HDFS to solve the distributed data problem and MapReduce as the programming paradigm that provides effective distributed computation. Similarly, Spark has a functional programming API in multiple languages that provides more operators than map and reduce, and does this via a distributed data framework called resilient distributed datasets or RDDs.
RDDs are essentially a programming abstraction that represents a read-only collection of objects that are partitioned across machines. RDDs can be rebuilt from a lineage (and are therefore fault tolerant), are accessed via parallel operations, can be read from and written to distributed storages like HDFS or S3, and most importantly, can be cached in the memory of worker nodes for immediate reuse. Because RDDs can be cached in memory, Spark is extremely effective at iterative applications, where the data is being reused throughout the course of an algorithm. Most machine learning and optimization algorithms are iterative, making Spark an extremely effective tool for data science. Additionally, because Spark is so fast, it can be accessed in an interactive fashion via a command line prompt similar to the Python REPL.
The Spark library itself contains a lot of the application elements that have found their way into most Big Data applications including support for SQL-like querying of big data, machine learning and graph algorithms, and even support for live streaming data.
The core components are:
Because these components meet many Big Data requirements as well as the algorithmic and computational requirements of many data science tasks, Spark has been growing rapidly in popularity. Not only that, but Spark provides APIs in Scala, Java, and Python; meeting the needs for many different groups and allowing more data scientists to easily adopt Spark as their Big Data solution.
Programming Spark applications is similar to other data flow languages that had previously been implemented on Hadoop. Code is written in a driver program which is lazily evaluated, and upon an action, the driver code is distributed across the cluster to be executed by workers on their partitions of the RDD. Results are then sent back to the driver for aggregation or compilation. Essentially the driver program creates one or more RDDs, applies operations to transform the RDD, then invokes some action on the transformed RDD.
These steps are outlined as follows:
When Spark runs a closure on a worker, any variables used in the closure are copied to that node, but are maintained within the local scope of that closure. Spark provides two types of shared variables that can be interacted with by all workers in a restricted fashion. Broadcast variables are distributed to all workers, but are read-only. Broadcast variables can be used as lookup tables or stopword lists. Accumulators are variables that workers can "add" to using associative operations and are typically used as counters.
Spark applications are essentially the manipulation of RDDs through transformations and actions. Future posts will go into this in greater detail, but this understanding should be enough to execute the example programs below.
A brief note on the execution of Spark. Essentially, Spark applications are run as independent sets of processes, coordinated by a SparkContext
in a driver program. The context will connect to some cluster manager (e.g. YARN) which allocates system resources. Each worker in the cluster is managed by an executor, which is in turn managed by the SparkContext
. The executor manages computation as well as storage and caching on each machine.
What is important to note is that application code is sent from the driver to the executors, and the executors specify the context and the various tasks to be run. The executors communicate back and forth with the driver for data sharing or for interaction. Drivers are key participants in Spark jobs, and therefore, they should be on the same network as the cluster. This is different from Hadoop code, where you might submit a job from anywhere to the JobTracker, which then handles the execution on the cluster.
The easiest way to start working with Spark is via the interactive command prompt. To open the PySpark terminal, simply type in pyspark
on the command line.
~$ pyspark
[… snip …]
>>>
PySpark will automatically create a SparkContext
for you to work with, using the local Spark configuration. It is exposed to the terminal via the sc
variable. Let‘s create our first RDD.
>>> text = sc.textFile("shakespeare.txt")
>>> print text
shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
The textFile
method loads the complete works of Shakespeare into an RDD named text. If you inspect the RDD you can see that it is a MappedRDD and that the path to the file is a relative path from the current working directory (pass in a correct path to the shakespeare.txt file on your system). Let‘s start to transform this RDD in order to compute the "hello world" of distributed computing: "word count."
>>> from operator import add
>>> def tokenize(text):
... return text.split()
...
>>> words = text.flatMap(tokenize)
>>> print words
PythonRDD[2] at RDD at PythonRDD.scala:43
We first imported the operator add
, which is a named function that can be used as a closure for addition. We‘ll use this function later. The first thing we have to do is split our text into words. We created a function called tokenize
whose argument is some piece of text and who returns a list of the tokens (words) in that text by simply splitting on whitespace. We then created a new RDD called words
by transforming the text
RDD through the application of the flatMap
operator, and passed it the closure tokenize
. As you can see, words
is a PythonRDD
, but the execution should have happened instantaneously. Clearly, we haven‘t split the entire Shakespeare data set into a list of words yet.
If you‘ve done the Hadoop "word count" using MapReduce, you‘ll know that the next steps are to map each word to a key value pair, where the key is the word and the value is a 1, and then use a reducer to sum the 1s for each key.
First, let‘s apply our map.
>>> wc = words.map(lambda x: (x,1))
>>> print wc.toDebugString()
(2) PythonRDD[3] at RDD at PythonRDD.scala:43
| shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
| shakespeare.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2
Instead of using a named function, we will use an anonymous function (with the lambda
keyword in Python). This line of code will map the lambda to each element of words. Therefore, each x
is a word, and the word will be transformed into a tuple (word, 1) by the anonymous closure. In order to inspect the lineage so far, we can use the toDebugString
method to see how our PipelinedRDD
is being transformed. We can then apply the reduceByKey
action to get our word counts and then write those word counts to disk.
>>> counts = wc.reduceByKey(add)
>>> counts.saveAsTextFile("wc")
Once we finally invoke the action saveAsTextFile
, the distributed job kicks off and you should see a lot of INFO
statements as the job runs "across the cluster" (or simply as multiple processes on your local machine). If you exit the interpreter, you should see a directory called "wc" in your current working directory.
$ ls wc/
_SUCCESS part-00000 part-00001
Each part file represents a partition of the final RDD that was computed by various processes on your computer and saved to disk. If you use the head
command on one of the part files, you should see tuples of word count pairs.
$ head wc/part-00000
(u‘fawn‘, 14)
(u‘Fame.‘, 1)
(u‘Fame,‘, 2)
(u‘kinghenryviii@7731‘, 1)
(u‘othello@36737‘, 1)
(u‘loveslabourslost@51678‘, 1)
(u‘1kinghenryiv@54228‘, 1)
(u‘troilusandcressida@83747‘, 1)
(u‘fleeces‘, 1)
(u‘midsummersnightsdream@71681‘, 1)
Note that none of the keys are sorted as they would be in Hadoop (due to a necessary shuffle and sort phase between the Map and Reduce tasks). However, you are guaranteed that each key appears only once across all part files as you used the reduceByKey
operator on the counts RDD. If you want, you could use the sort
operator to ensure that all the keys are sorted before writing them to disk.
Writing Spark applications is similar to working with Spark in the interactive console. The API is the same. First, you need to get access to the SparkContext
, which was automatically loaded for you by the pyspark
application.
A basic template for writing a Spark application in Python is as follows:
## Spark Application - execute with spark-submit
## Imports
from pyspark import SparkConf, SparkContext
## Module Constants
APP_NAME = "My Spark Application"
## Closure Functions
## Main functionality
def main(sc):
pass
if __name__ == "__main__":
# Configure Spark
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster("local[*]")
sc = SparkContext(conf=conf)
# Execute Main functionality
main(sc)
This template gives you a sense of what is needed in a Spark application: imports for various Python libraries, module constants, an identifying application name for debugging and for the Spark UI, closures or other custom operation functions, and finally, some main analytical methodology that is run as the driver. In our ifmain
, we create the SparkContext
and execute main with the context as configured. This will allow us to easily import driver code into the pyspark
context without execution. Note that here a Spark configuration is hard coded into the SparkConf
via the setMaster
method, but typically you would just allow this value to be configured from the command line, so you will see this line commented out.
To close or exit the program use sc.stop()
or sys.exit(0)
.
In order to demonstrate a common use of Spark, let‘s take a look at a common use case where we read in a CSV file of data and compute some aggregate statistic. In this case, we‘re looking at the on-time flight data set from the U.S. Department of Transportation, recording all U.S. domestic flight departure and arrival times along with their departure and arrival delays for the month of April, 2014. I typically use this data set because one month is manageable for exploration, but the entire data set needs to be computed upon with a cluster. The entire app is as follows:
## Spark Application - execute with spark-submit
## Imports
import csv
import matplotlib.pyplot as plt
from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext
## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"
fields = (‘date‘, ‘airline‘, ‘flightnum‘, ‘origin‘, ‘dest‘, ‘dep‘,
‘dep_delay‘, ‘arv‘, ‘arv_delay‘, ‘airtime‘, ‘distance‘)
Flight = namedtuple(‘Flight‘, fields)
## Closure Functions
def parse(row):
"""
Parses a row and returns a named tuple.
"""
row[0] = datetime.strptime(row[0], DATE_FMT).date()
row[5] = datetime.strptime(row[5], TIME_FMT).time()
row[6] = float(row[6])
row[7] = datetime.strptime(row[7], TIME_FMT).time()
row[8] = float(row[8])
row[9] = float(row[9])
row[10] = float(row[10])
return Flight(*row[:11])
def split(line):
"""
Operator function for splitting a line with csv module
"""
reader = csv.reader(StringIO(line))
return reader.next()
def plot(delays):
"""
Show a bar chart of the total delay per airline
"""
airlines = [d[0] for d in delays]
minutes = [d[1] for d