`
m635674608
  • 浏览: 4928952 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

spark rdd 转换过程

 
阅读更多



 

从一个简单的例子,来看rdd的演化,和stage是如何生成的(过程灰常之复杂和抽象,请参考附件的图来理解) 

Java代码  收藏代码
  1. object BaiWordCount2 {  
  2.   def main(args: Array[String]) {  
  3.     .....  
  4.     // Create the context  
  5.     val ssc = new SparkContext(args(0), "BaiWordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))  
  6.     val lines = ssc.textFile(args(1))//基于hadoopRdd,创建了一个MapRdd[String]   
  7.     val words = lines.flatMap(_.split(" "))  
  8.     val wordCounts = words.map(x =>{ println("x:"+ x);(x, 1) } )//这回返回的是一个元组了  
  9.     val red = wordCounts.reduceByKey( (a,b)=>{a + b} )  
  10.     red.saveAsTextFile("/root/Desktop/out")   
  11.   }  
  12. }  




我们首先总结出每个操作生成的rdd,这是通过源码追踪得到的: 
textFile:生成一个HadoopRDD 和 一个MapPedRDD[String] 
flatMap:前面生成的MapPedRDD[String],调用flatMap函数,生成了FlatMaPPedRDD[String] 
map:FlatMaPPedRDD[String],调用map函数,创建了一个创建了一个MapPartitionsRDD,再通过隐式转换,生成了PairRDD[String,Int](因为map操作,产生了一对值key和value) 
reduceByKey:生成三个Rdd,首先根据之前的PairRDD,生成一个MapPartitionsRDD(这个RDD起到类似map-reduce里面,combine()的作用),再生成一个shuffledRdd(这个rdd是分割stage的重要依据之一),这之后再生成一个MapPartitionsRDD[String](这个rdd起到hadoop里reducer的作用) 
saveAsTextFile先生成一个MapPedRDD,然后调用runJob函数,将之前生成的rdd链表,提交到spark集群上,spark根局rdd的类型,划分成一个或多个stage(只有shuffledRdd这类的rdd,才会成为stage和stage之间的边界),然后将各个stage,按照依赖的先后顺序,将stage先后提交集群进行计算 
下边通过textFile来详细说明rdd链表的生成过程和主要数据结构,主要注意deps和几种dependency: 
从rdd生成的方式来说可以分成四类:通过外部数据生成rdd,通过transformations函数生成,缓存操作,actions操作 
textFile函数无疑属于第一种,通过外部数据,生成rdd,输入的文件路径,可以是hdfs://开头的hdfs数据,也可以本地文件路径,例如"/root/Desktop/word.text" 
textFile函数调用hadoopFile 函数,生成一个hadoopRdd[K,V],默认情况下,泛型参数K和V,对应HadoopRDD的构造函数里的keyClass和valueClass。 
也就是一个Rdd[LongWritable,Text],通过外部数据生成rdd的第一个rdd的特点是,deps是一个空的list,原因是它是从外部文件生成的,没有父rdd。 
生成了Rdd[LongWritable,Text]后,还要调用transformations函数map:map(pair => pair._2.toString),来生成一个MappedRDD 
MappedRDD(this, sc.clean(f)),这里this,就是之前生成的HadoopRDD,MappedRDD的构造函数,会调用父类的构造函数RDD[U](prev), 
这个this(也就是hadoopRdd),会被赋值给prev,然后调用RDD.scala中,下面的构造函数 

Java代码  收藏代码
  1. def this(@transient oneParent: RDD[_]) =  
  2.   this(oneParent.context , List(new OneToOneDependency(oneParent)))  


这个函数的作用,是把父RDD的SparkContext(oneParent.context),和一个列表List(new OneToOneDependency(oneParent))),传入了另一个RDD的构造函数, 

Java代码  收藏代码
  1. RDD[T: ClassManifest](  
  2.    @transient private var sc: SparkContext,  
  3.    @transient private var deps: Seq[Dependency[_]]  
  4.  )  


这样我们可以看到,在所有有父子关系的RDD,共享的是同一个SparkContext。而子RDD的deps变量,也被赋值为一个List,里面包含一个OneToOneDependency实例,表明父RDD和子RDD之间的关系 
其实大多数的父子关系,包含的都是OneToOneDependency.比较例外的几个,比如join,这个很明显,他的数据不是来自同一个父RDD。而shuffledRdd的Dependency是ShuffledDependency 

父Rdd会在子rdd的构造函数中被传入,然后放入子rdd实例的deps里面,被记录下来。这样,当我们得到一个Rdd之后,就可以向后回溯它的祖先,再结合传入的函数变量f,完整的得到它的构造过程。 
flatMap,map,reduceByKey,saveAsTextFile则按顺序创建各自的rdd,然后在deps中记录父rdd,同时根据rdd的类型,生成各自的不同类型的dependency。 
在saveAsTextFile函数把整个计算任务提交到集群之前,所有的函数进行的操作,仅仅就是生成rdd链表而已。saveAsTextFile是action类型的操作,action的共同特点是,会调用RunJob一类的函数,调用Dagscheduler.runJob,将最后一个rdd(在我们这个例子里,就是saveAsTextFile生成的那个MappedRdd),提交到集群上。集群会以这个rdd为参数之一,生成一个stage,名叫finalStage(故名思意,这是最终的一个stage)。然后调用submitStage,将刚刚生成的finalStage提交到集群上。这个stage是否会被马上执行呢?不一定,因为程序会调用getMissingParentStages,进行寻找,是否有需要先进行提交的stage---这个过程可以这样类比,一个查询操作,在提交之后,要先检查是否有子查询,如果有,先执行子查询,然后在执行父查询,这里的原因很简单,父查询依赖于子查询的数据。同理,在stage执行的过程中,也要先查询,它是否需要其他stage的数据(其实之后一种数据,就是通过shuffle传过来的数据),如果有,那么这些stage,就是它的missingParentStage,它要等他的missingParentStage执行成功,然后通过shuffle机制把数据传给它,才能开始执行。这个过程的执行过程如下:从最后一个rdd起,查看它的dependency的类型,如果是 
shuffledDependency,则创建一个ShuffleMapStage,否则,就向前遍历,依次递归,知道最前面的rdd为止 

Java代码  收藏代码
  1. private def getMissingParentStages(stage: Stage): List[Stage] = {  
  2.    .....  
  3.    def visit(rdd: RDD[_]) {  
  4.      if (!visited(rdd)) {/  
  5.        visited += rdd  
  6.        if (getCacheLocs(rdd).contains(Nil)) {  
  7.          for (dep <- rdd.dependencies) {  
  8.            dep match {  
  9.              case shufDep: ShuffleDependency[_,_] =>  
  10.               val mapStage = getShuffleMapStage(shufDep, stage.jobId)  
  11.              if (!mapStage.isAvailable) {  
  12.                  missing += mapStage  
  13.                }  
  14.              case narrowDep: NarrowDependency[_] =>  
  15.                visit(narrowDep.rdd)  
  16.            }  
  17.          }  
  18.        }  
  19.      }  
  20.    }  
  21.    visit(stage.rdd)  
  22.    missing.toList  
  23.  }  


当getMissingParentStage(stage)的结果为空的时候,表明这个stage没有missingParentStage,或者它的missingParentStage已经都执行完了,则当前这个stage才能被成功的提交到集群去执行,否则,它就要等待,并重复调用getMissingParentStage,直到它的结果为空,才可以被提交。 

 

 

http://baishuo491.iteye.com/blog/2019510

  • 大小: 142 KB
分享到:
评论

相关推荐

    Spark机器学习视频第4课.SparkRDD原理剖析

    课时4:SparkRDD原理剖析 课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理用户的收藏以及订单转换率 ...

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    包括spara rdd api,dataframe action操作、查询操作、join操作,dataframe rdd dataset 相互转换以及spark sql。

    Spark学习--RDD编码

    当Spark对数据操作和转换时,会自动将RDD中的数据分发到集群,并将操作并行化执行。 Spark中的RDD是一个不可变的分布式对象集合。每个RDD都倍分为多个分区,这些分区运行在集群中的不同节点。RDD可以包含Python、...

    Java和scala实现 Spark RDD转换成DataFrame的两种方法小结

    今天小编就为大家分享一篇Java和scala实现 Spark RDD转换成DataFrame的两种方法小结,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

    Spark RDD弹性分布式数据集

    RDD的处理过程 转换算子 行动算子 RDD(Resilient Distributed Datasets弹性分布式数据集)是一个容错的、并行的数据结构,可以简单的把RDD理解成一个提供了许多操作接口的数据集合,和一般数据集不同的是,其实际...

    spark: RDD与DataFrame之间的相互转换方法

    今天小编就为大家分享一篇spark: RDD与DataFrame之间的相互转换方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

    第二章 Spark RDD以及编程接口

    对于所有的Spark程序而言,要进行任何操作,首先要创建一个Spark上下文,在创建上下文的过程中,程序会向集群申请资源以及构建相应的运行环境 需要传入四个变量 1. Spark程序运行的集群地址,如”spark://localho

    Spark机器学习视频第10课.最终获取用户的收藏以及订单转换率

    课时4:SparkRDD原理剖析 课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理用户的收藏以及订单转换率 ...

    浅谈Spark RDD API中的Map和Reduce

    因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果。本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中。 如何创建RDD? RDD可以从普通数组创建...

    【SparkCore篇02】RDD转换算子1

    1.map():每次处理一条数据 2.mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才 1. coalesc

    Spark机器学习第1课.Spark介绍

    课时4:SparkRDD原理剖析 课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理用户的收藏以及订单转换率 ...

    Spark机器学习视频第2课.Spark2集群安装

    课时4:SparkRDD原理剖析 课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理用户的收藏以及订单转换率 ...

    将string类型的数据类型转换为spark rdd时报错的解决方法

    今天小编就为大家分享一篇关于将string类型的数据类型转换为spark rdd时报错的解决方法,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧

    spark-scala-examples:该项目以Scala语言提供了Apache Spark SQL,RDD,DataFrame和Dataset示例

    目录(Scala中的Spark示例)Spark RDD示例火花蓄能器介绍将Spark RDD转换为DataFrame | 数据集 Spark SQL教程Spark创建带有示例的DataFrame Spark DataFrame withColumn 重命名Spark DataFrame上的列的方法Spark –...

    spark rdd转dataframe 写入mysql的实例讲解

    spark在离线批处理或者实时计算中都可以将rdd转成dataframe进而通过简单的sql命令对数据进行操作,对于熟悉sql的人来说在转换和过滤过程很方便,甚至可以有更高层次的应用,比如在实时这一块,传入kafka的topic名称...

    GeoMesa Spark.docx

    geomesa目前支持spark版本2.2.x、2.3.x或2.4.x。geomesa spark允许使用存储在...该库允许创建spark RDD和数据帧,将spark RDD和数据帧写入geomesa accumulo和其他地理工具数据存储,并使用kryo对简单功能进行序列化。

    【SparkCore篇05】RDD缓存和checkpoint1

    (1)创建一个RDD (2)将RDD转换为携带当前时间戳不做缓存 (3)多次打印结果 (4)将RDD转换为携带当前时间戳并做缓存 (5)多次打印做了缓存的结果,

    Spark-Core学习知识笔记整理

    2.4Spark常见转换操作 18 2.5Spark常见行动操作 20 2.6RDD持久化操作 21 2.7注意事项 23 2.7并行度调优 24 2.8分区方式 25 3Examle:PageRank 27 第四章 Spark编程进阶 29 1共享变量 29 1.1累加器 30 1.2广播变量 31 ...

    Spark从入门到精通

    6、大量全网唯一的知识点:基于排序的wordcount,Spark二次排序,Spark分组取topn,DataFrame与RDD的两种转换方式,Spark SQL的内置函数、开窗函数、UDF、UDAF,Spark Streaming的Kafka Direct API、...

Global site tag (gtag.js) - Google Analytics