job: action分割
task: shuffle分割
application: 多个job
join\cogroup: 确保Rdd1.partitioner = Rdd2.partitioner = join.partitioner
换句话就是 rdd1的key和rdd2的key,还有join时用的key,三者是相等的。
语法概要
官方示例代码库:
http://spark.apache.org/examples.html
此外还有安装目录的examples
目录。
生成rdd
1 | // (1) 从数组: |
RDD操作
RDD分为:
- 普通RDD;
- PairRDD。
两者能用的函数大不相同,每次操作RDD前需要复核一下到底是普通RDD还是Pair RDD。
方法分为:
- transform: 变换结构;
- action: 真正有输出,有动作(特例是forEachPartition这种类似遍历,反函数式编程的)。
普通RDD-transform
1 | // scala用箭头=>: |
也可以传递函数:
1 | // scala借用object单例: |
其他函数汇总:(因为不是pair rdd,因此基本都无shuffle)
无shuffle,1对1
1 | map(func): |
无shuffle,1对多、多对多
1 | flatMap(func) : 每个item可以返回一个seq; |
集合运算
1 | sample(withReplacement, fraction, seed) |
此外还有一个forEachPartition
,返回值为空,是个遍历的action。
有shuffle的变换一般都可以定义[numTasks]
,也就是可以定义并行度。
闭包
闭包是指 executor
要在RDD
上进行计算时必须对执行节点可见的那些变量和方法。闭包被序列化并被发送到每个 executor
。
闭包的变量副本(序列化后)发给每个 executor
.
Pair RDD-transform
加上了很多shuffle操作的函数(算子)
1 | val pairs = lines.map(s => (s, 1)) |
其他函数汇总:
1 | groupByKey([numTasks]): 返回(K, Iterable<V>),一般用reduceByKey代替这个算子 |
Action汇总:
可以注意到凡是最终需要输出、反函数式编程的(遍历)就是action,最后汇聚到driver单点处理。
1 | reduce(func): 注意和map相反,reduce是action。数据最后聚合成单点。 |
自定义UDAF的核心: combineByKey
大部分shuffle算子都是调用combineByKey
实现的,可以说combineByKey
就是shuffle的核心。
combineByKey定义:
1 | def combineByKey[C]( |
示例
1 | val pairs = sc.parallelize(List(("prova", 1), ("ciao", 2), |
combineByKey比aggregateByKey更加通用,区别是它的第一个参数创建初始聚合器都是函数,而aggregateByKey第一个参数是一个初始值。
缓存
1 | cache() |
在 shuffle 操作中(例如 reduceByKey),即便是用户没有调用 persist 方法,Spark 也会自动缓存部分中间数据.
广播变量
immutable
先用action从sc(driver)上广播出去,然后用.value
访问。
1 | scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) |
累加器
数值型。
可变。
1 | scala> val accum = sc.longAccumulator("My Accumulator") |
自定义累加器需要实现的3个方法:(类似于combineByKey
)
1 | reset : 清零。防止重算。 |
示例:
1 | class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] { |