aggregateByKey(initValue)(seqQp, combineOp [,numberTask]): 是一個shuffle類的算子,與reduceByKey類似,隻是它在每個分區seqOp後要應用一次初始值。
代碼示例:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object AggregateByKeyOp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName(AggregateByKeyOp.getClass().getSimpleName)
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val rdd1: RDD[(String, Int)] = sc.parallelize(Array(
("劉備", 1), ("曹操", 2), ("劉備", 3), ("曹操", 4), ("劉備", 100), ("曹操", 200), ("劉備", 300), ("曹操", 400)
), 2)
val rdd2: RDD[String] = rdd1.mapPartitionsWithIndex((index, iter) => {
iter.toList.map(x => "【分區号為:" index ", 值為:" x "】").iterator
})
rdd2.foreach(println)
// 【分區号為:0, 值為:(劉備,1)】
// 【分區号為:0, 值為:(曹操,2)】
// 【分區号為:0, 值為:(劉備,3)】
// 【分區号為:0, 值為:(曹操,4)】
// 【分區号為:1, 值為:(劉備,100)】
// 【分區号為:1, 值為:(曹操,200)】
// 【分區号為:1, 值為:(劉備,300)】
// 【分區号為:1, 值為:(曹操,400)】
val rdd3 = rdd1.aggregateByKey(1000)(_ _, _ _)
rdd3.mapPartitionsWithIndex((index, iter) => {
iter.toList.map(x => "【分區号為:" index ", 值為:" x "】").iterator
}).foreach(println)
// 【分區号為:0, 值為:(曹操,2606)】
// 【分區号為:1, 值為:(劉備,2404)】
rdd3.foreach(println)
// (曹操,2606)
// (劉備,2404)
sc.stop()
}
}
,
更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!