Spark API

Spark RDD API使用说明(一)

1、aggregate

1.1  函数声明

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U)=> U): U

1.2函数说明

aggregate函数通过两个函数来操作RDD。第一个reduce函数(seqOp)对每个partition聚合,然后将初始值(zeroValue)和所有partitions的结果进行(combOp)操作,生成最终结果。应用两个reduce函数十分方便,比如:第一个用于求各个partition的最大值,第二个用于汇总每个partition的和。

1.3  实例

scala> def seqOp(a:Int , b:Int) : Int = {
     | math.max(a, b)
     | }
seqOp: (a: Int, b: Int)Int

scala> def combOp(a:Int, b:Int) : Int = {
     | a + b
     | }
combOp: (a: Int, b: Int)Int

scala> val v = sz.parallelize(List(1,2,3,4,5,6), 3)
scala> v.aggregate(4)(seqOp, combOp)
结果:18
计算过程:
	首先分为三个区:(1,2) ,(3,4) ,(5,6)
	然后 调用max(4,1,2),max(4,3,4),max(4,5,6)。计算三个区的结果分别为:4,4,6
	在对三个区和初始值4进行聚合:4+4=8;8+4=12;12+6=18
注意:初始值zeroValue使用了两次:1在各个partition使用;2,对partition合并的适合使用

2、aggregateByKey

2.1函数声明       

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicitarg0: ClassTag[U]): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicitarg0: ClassTag[U]): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicitarg0: ClassTag[U]): RDD[(K, U)]

2.2 函数说明

         和aggregate类似,区别是:1,只对具有相同key的值进行聚合;2,初始值只出现在第一个reduce函数(seqOp)

2.3 实例        

val pariRDD = sc.parallelize(List( ("cat", 2), ("cat", 5),("mouse", 4), ("cat", 12), ("dog", 12),("mouse", 2)), 2)
 //定义函数,显示分区情况
def func(index:Int, iter:Iterator[(String, Int)]):Iterator[String] = {
           iter.toList.map(x => "[partID:" +index + ", val:" + x + "]").iterator
}
//查看分区情况
pairRDD.mapPartitionsWithIndex(func).collect
//结果:Array[String]= Array([partID:0, val:(cat,2)], [partID:0, val:(cat,5)], [partID:0,val:(mouse,4)], [partID:1, val:(cat,12)], [partID:1, val:(dog,12)], [partID:1,val:(mouse,2)])
pairRDD.aggregateByKey((0))(math.max(_,_),_+_).collect
//结果:Array[(String,Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_,_), _ + _).collect
//结果:Array((dog,100), (cat,200), (mouse,200))

3、cartesian

3.1函数声明

def cartesian[U:ClassTag](other: RDD[U]): RDD[(T, U)]

3.2 函数说明

         计算两个RDD的笛卡尔积,然后返回一个新的RDD(注意:应用此函数内存消耗很快)

3.3 实例

valx = sc.parallelize(List(1,2,3,4,5))
val y = sc.parallelize(List(6,7,8,9,10))
x.cartesian(y).collect
res0: Array[(Int, Int)] = Array((1,6),(1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7),(3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10),(5,9), (5,10))

4、coalesce,repartition

4.1函数声明

         def coalesce ( numPartitions : Int , shuffle : Boolean = false ): RDD [T]

def repartition( numPartitions : Int ): RDD [T]

4.2 函数说明

         将partition合并成指定数目的partition

4.3实例

val y = sc.parallelize(1 to 10, 10)
val z = y.coalesce(2, false)
z.partitions.length
结果: Int = 2

更多相关文章
  • 随着越来越多的O2O服务走进人们的生活,涉及到衣食住行这几个全民刚需的服务也越来越被用户依赖.移动互联网下,生鲜蔬果作为人们生活的必需品得到了互联网行业的重视.生鲜O2O领域开始获得越来越多资本的关注,大量生鲜O2O平台也如雨后春笋般兴起.巨头入局生鲜020,中小企业何去何从?O2O日益火爆不仅创业 ...
  • 本文主要是讨论在最近项目中遇到的一个下拉刷新控件,这个控件的效果如下图:   在这里会用两篇博文的篇幅来解析这个控件,第一篇解析控件的框架,第二篇解析动画.源代码可以在下面的链接下载: TestPullToRefresh.zip    1.这个控件由以下几个文件组成:GMPullToAction.C ...
  • 在互联网已经成功"挟持"我们的现在,假如未来某天早晨起床后发现,网络瘫痪,服务器宕机,我们早已习惯了的"秩序"轰然倒塌,那会是何种场景?   人始终有种不满足的心态,希望身边的一切都是完美的,但现实却总不能如愿.就像服务器,谁也不敢说能够达到100%的可靠性. ...
  • 美国官员告诉美国全国广播公司(NBC)说,俄罗斯对五角大楼联合参谋部的非机密电子邮件系统发起了“精密的网络攻击”.该非保密电子邮件系统已被关闭近两周.该官员说,俄罗斯在7月25日左右发起了“复杂的网络入侵”.这影响了约4000名在联合参谋部工作的军人和平民.   消息人士向NBC表示,网络攻击似乎依 ...
  • [DllImport("user32.dll", EntryPoint = "FindWindow")]         public static extern int FindWindow(             string lpClassName, ...
  • 文/李书航昨天腾讯在全球移动互联网大会上推出了腾讯操作系统TOS,这是一个手机的ROM,以及多种针对不同种类智能硬件所开发的固件的集合.根据腾讯的介绍,未来将会进一步把所有可以称为智能硬件的设备,都装上TOS,并且实现和QQ.微信以及其他腾讯服务之间的更紧密的连接.而现在所披露的手表,游戏机和虚拟现 ...
一周排行
  • 1,最小化安全系统,删除不必要的软件,关闭不必要的服务.# ntsysv以下仅列出需要启动的服务,未列出的服务一律推荐关闭,必要运行的服务再逐个打开.atdcrondirqbalancemicrocode_ctlne ...
  • 还记得在DOS下分区利器DiskMan么,它那快捷的分区操作,方便的调整分区大小,强大的分区表备份恢复修复功能等等让多少用户解决了头疼的硬盘分区问题,可是旧版本有个很明显的弊端,就是只能在纯dos环境下运行.时隔多年 ...
  • switch 语句中包含了C语言中32个关键字中的三个关键字:switch, case , break,这个语句正常的语法如下: switch(expression) {     case value1:       ...
  • 名称空间支持是一项c++特性,是用来解决在编写大型程序中不同文件(厂商)中相同变量名问题.​ 例如:有两个已经封装好的产品(类)中同时包含一个名为wanda()的函数,为了能够准确调用其中一个wanda()函数,我们 ...
  • MYSQL数据库命名与其设计规范   你是否对获得MYSQL数据库命名与其设计规范 的实际操作感到十分头疼?如果是这样子的话,以下的文章将会给你相应的解决方案,以下的文章主要是介绍获得MYSQL数据库命名与其设计规范 ...
  •      最近用一个网上的下载类,MyDownload里面有多线程下载的部分,一开始开三个线程,偶尔会出现崩溃.下面是下载线程 UINT CHttpGet::ThreadDownLoad(void* pParam) ...
  • 荣耀在低端机,中端机,以及最近发布的2000元荣耀7都取得了不错的成绩.其他方面,平板.手环.移动电源.智能路由发展也迅猛,似乎是小米做过的产品荣耀都要应对一下,而且都做的比较的成功.辉煌的背后成功在哪里呢? 华为荣 ...
  •     sizeWithFont:constrainedToSize: Returns the size of the string if it were rendered and constrained to th ...
  • 今天虎嗅关于酷派手机的文章,谈到运营商集采对手机品牌的恶劣影响.作者强调,加入运营商集采就像吸毒,一旦上瘾,欲罢不能.前阵子,也有文章指出联想手机过度依赖运营商,给自己种下了恶果,主要体现在:厂商议价能力下降,损害在 ...
  • 1. 数据库表锁定原理 1.1 目前的C/S,B/S结构都是多用户访问数据库,每个时间点会有成千上万个user来访问DB,其中也会同时存取同一份数据,会造成数据的不一致性或者读脏数据. 1.2 事务的ACID原则 1 ...