tft每日頭條

 > 生活

 > spark含有的數據分析算法

spark含有的數據分析算法

生活 更新时间:2025-01-08 17:01:19
aggregateByKey(initValue)(seqQp, combineOp [,numberTask])

aggregateByKey(initValue)(seqQp, combineOp [,numberTask]): 是一個shuffle類的算子,與reduceByKey類似,隻是它在每個分區seqOp後要應用一次初始值。

代碼示例:

import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object AggregateByKeyOp { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName(AggregateByKeyOp.getClass().getSimpleName) val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val rdd1: RDD[(String, Int)] = sc.parallelize(Array( ("劉備", 1), ("曹操", 2), ("劉備", 3), ("曹操", 4), ("劉備", 100), ("曹操", 200), ("劉備", 300), ("曹操", 400) ), 2) val rdd2: RDD[String] = rdd1.mapPartitionsWithIndex((index, iter) => { iter.toList.map(x => "【分區号為:" index ", 值為:" x "】").iterator }) rdd2.foreach(println) // 【分區号為:0, 值為:(劉備,1)】 // 【分區号為:0, 值為:(曹操,2)】 // 【分區号為:0, 值為:(劉備,3)】 // 【分區号為:0, 值為:(曹操,4)】 // 【分區号為:1, 值為:(劉備,100)】 // 【分區号為:1, 值為:(曹操,200)】 // 【分區号為:1, 值為:(劉備,300)】 // 【分區号為:1, 值為:(曹操,400)】 val rdd3 = rdd1.aggregateByKey(1000)(_ _, _ _) rdd3.mapPartitionsWithIndex((index, iter) => { iter.toList.map(x => "【分區号為:" index ", 值為:" x "】").iterator }).foreach(println) // 【分區号為:0, 值為:(曹操,2606)】 // 【分區号為:1, 值為:(劉備,2404)】 rdd3.foreach(println) // (曹操,2606) // (劉備,2404) sc.stop() } }

圖解aggregateByKey

spark含有的數據分析算法(九Spark之圖解aggregateByKey算子)1

,

更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!

查看全部

相关生活资讯推荐

热门生活资讯推荐

网友关注

Copyright 2023-2025 - www.tftnews.com All Rights Reserved