Spark API

Spark RDD API使用说明(一)

1、aggregate

1.1  函数声明

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U)=> U): U

1.2函数说明

aggregate函数通过两个函数来操作RDD。第一个reduce函数(seqOp)对每个partition聚合,然后将初始值(zeroValue)和所有partitions的结果进行(combOp)操作,生成最终结果。应用两个reduce函数十分方便,比如:第一个用于求各个partition的最大值,第二个用于汇总每个partition的和。

1.3  实例

scala> def seqOp(a:Int , b:Int) : Int = {
     | math.max(a, b)
     | }
seqOp: (a: Int, b: Int)Int

scala> def combOp(a:Int, b:Int) : Int = {
     | a + b
     | }
combOp: (a: Int, b: Int)Int

scala> val v = sz.parallelize(List(1,2,3,4,5,6), 3)
scala> v.aggregate(4)(seqOp, combOp)
结果:18
计算过程:
	首先分为三个区:(1,2) ,(3,4) ,(5,6)
	然后 调用max(4,1,2),max(4,3,4),max(4,5,6)。计算三个区的结果分别为:4,4,6
	在对三个区和初始值4进行聚合:4+4=8;8+4=12;12+6=18
注意:初始值zeroValue使用了两次:1在各个partition使用;2,对partition合并的适合使用

2、aggregateByKey

2.1函数声明       

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicitarg0: ClassTag[U]): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicitarg0: ClassTag[U]): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicitarg0: ClassTag[U]): RDD[(K, U)]

2.2 函数说明

         和aggregate类似,区别是:1,只对具有相同key的值进行聚合;2,初始值只出现在第一个reduce函数(seqOp)

2.3 实例        

val pariRDD = sc.parallelize(List( ("cat", 2), ("cat", 5),("mouse", 4), ("cat", 12), ("dog", 12),("mouse", 2)), 2)
 //定义函数,显示分区情况
def func(index:Int, iter:Iterator[(String, Int)]):Iterator[String] = {
           iter.toList.map(x => "[partID:" +index + ", val:" + x + "]").iterator
}
//查看分区情况
pairRDD.mapPartitionsWithIndex(func).collect
//结果:Array[String]= Array([partID:0, val:(cat,2)], [partID:0, val:(cat,5)], [partID:0,val:(mouse,4)], [partID:1, val:(cat,12)], [partID:1, val:(dog,12)], [partID:1,val:(mouse,2)])
pairRDD.aggregateByKey((0))(math.max(_,_),_+_).collect
//结果:Array[(String,Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_,_), _ + _).collect
//结果:Array((dog,100), (cat,200), (mouse,200))

3、cartesian

3.1函数声明

def cartesian[U:ClassTag](other: RDD[U]): RDD[(T, U)]

3.2 函数说明

         计算两个RDD的笛卡尔积,然后返回一个新的RDD(注意:应用此函数内存消耗很快)

3.3 实例

valx = sc.parallelize(List(1,2,3,4,5))
val y = sc.parallelize(List(6,7,8,9,10))
x.cartesian(y).collect
res0: Array[(Int, Int)] = Array((1,6),(1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7),(3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10),(5,9), (5,10))

4、coalesce,repartition

4.1函数声明

         def coalesce ( numPartitions : Int , shuffle : Boolean = false ): RDD [T]

def repartition( numPartitions : Int ): RDD [T]

4.2 函数说明

         将partition合并成指定数目的partition

4.3实例

val y = sc.parallelize(1 to 10, 10)
val z = y.coalesce(2, false)
z.partitions.length
结果: Int = 2

更多相关文章
  • 随着越来越多的O2O服务走进人们的生活,涉及到衣食住行这几个全民刚需的服务也越来越被用户依赖.移动互联网下,生鲜蔬果作为人们生活的必需品得到了互联网行业的重视.生鲜O2O领域开始获得越来越多资本的关注,大量生鲜O2O平台也如雨后春笋般兴起.巨头入局生鲜020,中小企业何去何从?O2O日益火爆不仅创业 ...
  • 本文主要是讨论在最近项目中遇到的一个下拉刷新控件,这个控件的效果如下图:   在这里会用两篇博文的篇幅来解析这个控件,第一篇解析控件的框架,第二篇解析动画.源代码可以在下面的链接下载: TestPullToRefresh.zip    1.这个控件由以下几个文件组成:GMPullToAction.C ...
  • 在互联网已经成功"挟持"我们的现在,假如未来某天早晨起床后发现,网络瘫痪,服务器宕机,我们早已习惯了的"秩序"轰然倒塌,那会是何种场景?   人始终有种不满足的心态,希望身边的一切都是完美的,但现实却总不能如愿.就像服务器,谁也不敢说能够达到100%的可靠性. ...
  • 美国官员告诉美国全国广播公司(NBC)说,俄罗斯对五角大楼联合参谋部的非机密电子邮件系统发起了“精密的网络攻击”.该非保密电子邮件系统已被关闭近两周.该官员说,俄罗斯在7月25日左右发起了“复杂的网络入侵”.这影响了约4000名在联合参谋部工作的军人和平民.   消息人士向NBC表示,网络攻击似乎依 ...
  • [DllImport("user32.dll", EntryPoint = "FindWindow")]         public static extern int FindWindow(             string lpClassName, ...
  • 文/李书航昨天腾讯在全球移动互联网大会上推出了腾讯操作系统TOS,这是一个手机的ROM,以及多种针对不同种类智能硬件所开发的固件的集合.根据腾讯的介绍,未来将会进一步把所有可以称为智能硬件的设备,都装上TOS,并且实现和QQ.微信以及其他腾讯服务之间的更紧密的连接.而现在所披露的手表,游戏机和虚拟现 ...
一周排行
  • 原创文章,转载请注明:转载自Keegan小钢 写于5 去年10月底换到了新公司,做移动研发组的负责人,刚开始接手android项目时,发现该项目真的是一团糟.首先是其架构,是按功能模块进行划分的,本 ...
  • H3C OSPF动态路由网络的建立   1.组网描述       OSPF路由协议是企业级网络中最常用到的动态路由协议,现就以下面一个模拟组网来介绍一下华为路由器运行OSPF的配置和状态,该网络由4台路由器构成,中心 ...
  • Ubuntu问题解决汇总   1.Ubuntu支持安装多媒体播放插件(新系统安装后必备)   ubuntu-restricted-extras package allows users to install abil ...
  • 连续看了DeepID和FaceNet后,看了更早期的一篇论文,即FB的DeepFace.这篇论文早于DeepID和FaceNet,但其所使用的方法在后面的论文中都有体现,可谓是早期的奠基之作.因而特写博文以记之. D ...
  • 点击打开链接 Full Tank? Time Limit: 3000/1000 MS (Java/Others) Memory Limit: 32768/32768 K (Java/Others) Total Sub ...
  • 这段时间确实是很忙,至从进了工场以后,工作压力陡增.进之前没有系统接受项目培训,直接进的项目组,软件架构自己也不怎么熟悉(只给看数据模型,也看不出啥啊).再加上用的技术自己以前不是很熟悉(几乎就是不了解),创业团队的 ...
  • CentOS下配置VNC Server,重启服务,配置仍然生效的方法: 本文前提:系统已安装好gnome桌面,如果没有请执行下面的命令安装即可. yum groupinstall "Desktop" ...
  • 分数加减法 Time Limit: 1MS   Memory Limit: 65536K Total Submissions: 12903   Accepted: 4 Description 编写一个C程序,实现两个 ...
  • Array.prototype.del=function(n) { //n表示第几项,从0开始算起.//prototype为对象原型,注意这里为对象增加自定义方法的方法. if(n<0) //如果n<0, ...
  • Windows Presentation Foundation (WPF) 样式设置和模板化是指一套功能(样式.模板.触发器和演示图板),应用程序.文档或用户界面 (UI) 的设计人员使用这些功能可以创建更好的视觉效 ...