Founding committer of Spark, Patrick Wendell, gave this talk at 2015 Strata London about Apache Spark.
These slides provides an introduction to Spark, and delves into future developments, including DataFrames, Datasource API, Catalyst logical optimizer, and Project Tungsten.
2. About Me
Founding committer of Spark at Berkeley AMPLab
Co-founder of Databricks, now manage Spark roadmap
and community
Release manager for Spark 1.3 and 1.4
Set Footer from Insert Dropdown Menu 2
3. About Databricks
Founded by Apache Spark creators
Largest contributor to Spark project, committed to
keeping Spark 100% open source
End-to-End hosted platform, Databricks Cloud
4. Show of Hands
A. New to Spark
B. Played around with Spark API in training or examples
C. Have written a full application on Spark (POC or
production)
Set Footer from Insert Dropdown Menu 4
6. What Is Spark?
A fast and general execution engine for big data processing
Fast to write code
High level API’s in Python, Java, and Scala
Multi paradigm (streaming, batch, and interactive)
Fast to run code
Low overhead scheduling
Optimized engine
Can exploit in-memory when available
6
10. Contributors per Month to Spark
0
20
40
60
80
100
2011 2012 2013 2014 2015
Most active project at Apache,
More than 500 known production deployments
11. Set Footer from Insert Dropdown Menu 11
What’s New In Spark?
12. Some Spark 1.4 and 1.5 Initiatives
MLlib
Pipelines API for machine learning
Dozens of new algorithms/utils
Streaming
Metric viz and monitoring
Deeper Kafka integrations
Spark SQL
Support all Hive metastore versions
HQL/SQL coverage (window functions, etc)
Set Footer from Insert Dropdown Menu 12
DataFrames
Storage integrations
Math and stats functions
Code generation
Core Runtime
Managed memory
Cache aware data structures
13. From MapReduce to Spark
public static class WordCountMapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}
public static class WorkdCountReduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
14. RDD API
Most data is structured (JSON, CSV, Avro, Parquet, Hive …)
• Programming RDDs inevitably ends up with a lot of tuples (_1, _2, …)
Functional transformations (e.g. map/reduce) are not as
intuitive
Memory management with arbitrary Java objects is doable, but
challenging
14
16. DataFrames in Spark
Distributed collection of data grouped into named
columns (i.e. RDD with schema)
Domain-specific functions designed for common tasks
• Metadata
• Sampling
• Project, filter, aggregation, join, …
• UDFs
Available in Python, Scala, Java, and R
16
18. From DataFrames Spring…
1. A DataFrame model for expressive and concise
programs
2. A pluggable Datasource API API for reading and
writing data frames while minimizing IO
3. The Catalyst logical optimizer for speeding up data
frame operations
4. Project Tungsten – optimized physical execution
throughout Spark
18
19. Data Sources: Input & Output
Spark’s Data Source API can read and write DataFrames
using a variety of formats.
19
{ JSON }
Built-In Packages
JDBC
and more…
20. Read Less Data
The fastest way to process big data is to never read it.
DataFrames can help you read less data automatically:
1Only supported for Parquet and Hive, more support coming in Spark 1.4 - 2Turned off by default in Spark 1.3 20
• Columnar formats can skip fields (i.e. parquet)
• Using partitioning (i.e., /year=2014/month=02/…)1
• Skipping data using statistics (i.e., min, max)2
• Pushing predicates into storage systems (i.e., JDBC)
21. Plan Optimization & Execution
21
SQL AST
DataFrame
Unresolved
Logical Plan
Logical Plan
Optimized
Logical Plan
RDDs
Selected
Physical Plan
Analysis
Logical
Optimization
Physical
Planning
CostModel
Physical
Plans
Code
Generation
Catalog
DataFrames and SQL share the same optimization/execution pipeline
22. Optimization happens as late as possible, therefore
Spark SQL can optimize even across functions.
22
23. 23
def
add_demographics(events):
u
=
sqlCtx.table("users")
#
Load
Hive
table
events
.join(u,
events.user_id
==
u.user_id)
#
Join
on
user_id
.withColumn("city",
zipToCity(u.zip))
#
Run
udf
to
add
city
column
events
=
add_demographics(sqlCtx.load("/data/events",
"json"))
training_data
=
events.where(events.city
==
"New
York").select(events.timestamp).collect()
Logical Plan
filter
join
events file users table
expensive
only join
relevent users
Physical Plan
join
scan
(events)
filter
scan
(users)
24. 24
def
add_demographics(events):
u
=
sqlCtx.table("users")
#
Load
partitioned
Hive
table
events
.join(u,
events.user_id
==
u.user_id)
#
Join
on
user_id
.withColumn("city",
zipToCity(u.zip))
#
Run
udf
to
add
city
column
Physical Plan
with Predicate Pushdown
and Column Pruning
join
optimized
scan
(events)
optimized
scan
(users)
events
=
add_demographics(sqlCtx.load("/data/events",
"parquet"))
training_data
=
events.where(events.city
==
"New
York").select(events.timestamp).collect()
Logical Plan
filter
join
events file users table
Physical Plan
join
scan
(events)
filter
scan
(users)
25. Physical Execution: Unified Across Languages
25
0 2 4 6 8 10
RDD Scala
RDD Python
DataFrame Scala
DataFrame Python
DataFrame SQL
Time to Aggregate 10 million int pairs (secs)
26. Physical Execution:
Fully Managed Memory
Spark’s core API uses raw Java objects and Java GC
for aggregations and joins
DataFrame’s will use a custom binary format and off-
heap managed memory
Both faster computationally and “GC-free”
Set Footer from Insert Dropdown Menu 26
27. Set Footer from Insert Dropdown Menu 27
Physical Execution:
CPU Efficient Data Structures
Keep data closure to CPU cache
29. Other Optimizations
Code Generation
Avoid interpretation overhead on records
Already used in Spark, but will expand
Vectorized record processing
Process in small batches to avoid function call overhead
In some cases can exploit GPU’s
29
31. 31
Spark Tomorrow vs. Today
Today
RDD API
Hadoop InputFormat
Java objects for aggregations
Java GC
Tomorrow
DataFrame API
Spark Data Source
Spark binary format
Spark managed memory
32. Set Footer from Insert Dropdown Menu 32
Learn More About Spark
Docs:
h(p://spark.apache.org/docs/latest/
Site:
h(p://spark.apache.org/
Databricks
Cloud:
h(p://go.databricks.com/register-‐for-‐dbc