job: action分割 task: shuffle分割 application: 多个job
join\cogroup: 确保Rdd1.partitioner = Rdd2.partitioner = join.partitioner 换句话就是 rdd1的key和rdd2的key,还有join时用的key,三者是相等的。
语法概要 官方示例代码库:http://spark.apache.org/examples.html 此外还有安装目录的examples
目录。
生成rdd 1 2 3 4 5 6 data = [1 , 2 , 3 , 4 , 5 ] distData = sc.parallelize(data) scala> val distFile = sc.textFile("data.txt" ) distFile: org.apache.spark.rdd.RDD [String ]
RDD操作 RDD分为:
普通RDD;
PairRDD。 两者能用的函数大不相同,每次操作RDD前需要复核一下到底是普通RDD还是Pair RDD。
方法分为:
transform: 变换结构;
action: 真正有输出,有动作(特例是forEachPartition这种类似遍历,反函数式编程的)。
1 2 3 4 5 6 val lineLengths = lines.map(s => s.length)val totalLength = lineLengths.reduce((a, b) => a + b)lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
也可以传递函数:
1 2 3 4 5 object MyFunctions { def func1 (s: String ): String = { ... } } myRdd.map(MyFunctions .func1)
其他函数汇总:(因为不是pair rdd,因此基本都无shuffle)
无shuffle,1对1 1 2 map(func): filter(func):无shuffle
无shuffle,1对多、多对多 1 2 3 flatMap(func) : 每个item可以返回一个seq; mapPartitions(func) : 输入迭代器,返回迭代器 mapPartitionsWithIndex(func): 输入迭代器,返回迭代器
集合运算 1 2 3 4 5 6 7 sample(withReplacement, fraction, seed) union(otherDataset) intersection(otherDataset) distinct([numTasks])): 如果是pairRDD,有shuffle,可以定义并行度。 coalesce(numPartitions) repartition(numPartitions) repartitionAndSortWithinPartitions(partitioner)
此外还有一个forEachPartition
,返回值为空,是个遍历的action。 有shuffle的变换一般都可以定义[numTasks]
,也就是可以定义并行度。
闭包 闭包是指 executor
要在RDD
上进行计算时必须对执行节点可见的那些变量和方法。闭包被序列化并被发送到每个 executor
。
闭包的变量副本(序列化后)发给每个 executor
.
加上了很多shuffle操作的函数(算子)
1 2 val pairs = lines.map(s => (s, 1 ))val counts = pairs.reduceByKey((a, b) => a + b)
其他函数汇总:
1 2 3 4 5 6 7 8 groupByKey([numTasks]): 返回(K , Iterable <V >),一般用reduceByKey代替这个算子 reduceByKey(func, [numTasks]) aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) sortByKey([ascending], [numTasks]) join(otherDataset, [numTasks]): (K , V ) 和 (K , W ) => (K , (V , W )) cogroup(otherDataset, [numTasks]) cartesian(otherDataset) partitionBy(partitioner): 一般比repartition靠谱,因为下一步能用到key,而不是随机划分
Action汇总: 可以注意到凡是最终需要输出、反函数式编程的(遍历)就是action,最后汇聚到driver单点处理。
1 2 3 4 5 6 7 8 9 10 11 reduce(func): 注意和map相反,reduce是action。数据最后聚合成单点。 collect() count() first() take(n) takeSample(withReplacement, num, [seed]) takeOrdered(n, [ordering]) saveAsTextFile(path) saveAsSequenceFile(path) countByKey() : 因为整合了reduce的功能 foreach(func) 以及foreachPartition等。
自定义UDAF的核心: combineByKey 大部分shuffle算子都是调用combineByKey
实现的,可以说combineByKey
就是shuffle的核心。 combineByKey定义:
1 2 3 4 5 6 def combineByKey [C ]( createCombiner: V => C , mergeValue: (C , V ) => C , mergeCombiners: (C , C ) => C ): RDD [(K , C )] = self.withScope { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null ) }
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 val pairs = sc.parallelize(List (("prova" , 1 ), ("ciao" , 2 ), ("prova" , 2 ), ("ciao" , 4 ), ("prova" , 3 ), ("ciao" , 6 ))) pairs.aggregateByKey(List [Any ]())( (aggr, value) => aggr ::: (value :: Nil ), (aggr1, aggr2) => aggr1 ::: aggr2 ).collect().toMap pairs.combineByKey( (value) => List (value), (aggr: List [Any ], value) => aggr ::: (value :: Nil ), (aggr1: List [Any ], aggr2: List [Any ]) => aggr1 ::: aggr2 ).collect().toMap
combineByKey比aggregateByKey更加通用,区别是它的第一个参数创建初始聚合器都是函数,而aggregateByKey第一个参数是一个初始值。
缓存 1 2 cache() persist([LEVEL])
在 shuffle 操作中(例如 reduceByKey),即便是用户没有调用 persist 方法,Spark 也会自动缓存部分中间数据.
广播变量 immutable
先用action从sc(driver)上广播出去,然后用.value
访问。
1 2 3 4 5 scala> val broadcastVar = sc.broadcast(Array (1 , 2 , 3 )) broadcastVar: org.apache.spark.broadcast.Broadcast [Array [Int ]] = Broadcast (0 ) scala> broadcastVar.value res0: Array [Int ] = Array (1 , 2 , 3 )
累加器 数值型。 可变。
1 2 3 4 5 6 7 8 9 scala> val accum = sc.longAccumulator("My Accumulator" ) accum: org.apache.spark.util.LongAccumulator = LongAccumulator (id: 0 , name: Some (My Accumulator ), value: 0 ) scala> sc.parallelize(Array (1 , 2 , 3 , 4 )).foreach(x => accum.add(x)) ... 10 /09 /29 18 :41 :08 INFO SparkContext : Tasks finished in 0.317106 sscala> accum.value res2: Long = 10
自定义累加器需要实现的3个方法:(类似于combineByKey
)
1 2 3 reset : 清零。防止重算。 add: 累加 merge: 合并累加器。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class VectorAccumulatorV2 extends AccumulatorV2 [MyVector , MyVector ] { private val myVector: MyVector = MyVector .createZeroVector def reset (): Unit = { myVector.reset() } def add (v: MyVector ): Unit = { myVector.add(v) } ... } val myVectorAcc = new VectorAccumulatorV2 sc.register(myVectorAcc, "MyVectorAcc1" )