`
m635674608
  • 浏览: 4928826 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

spark学习笔记:Spark Streaming

 
阅读更多

Spark的streaming机制简单来说,就是将连续的时间序列切割成不同的离散时间段。针对某个时间段,将该时间段内的所有输入数据组成一个 RDD,接下来的工作就如同一个传统的sprark应用一样,对这个RDD进行各种变换,直到最终输出数据。可以认为,Spark Streaming就是在时间维度上,为每个时间段都创建了同一个spark应用,这样表面上看起来就像是流式的工作方式。

其中每个方框都是一个RDD;一个横排的所有RDD组成一个DStream;一个纵列的所有RDD是在某一个时刻的流式作业。DStream是在Spark Streaming编程中的一个基本数据单位(执行变换、输出等操作),就像是Spark编程中的RDD一样。

 

 

 

API的使用示例

在进行Streaming编程前,需要依赖spark- streaming_*.jar包,这个jar包已经包含在spark- assembly-*.jar里了,所以如果之前已经加入了这个包的依赖,不需要额外的工作。另外,如果流式的数据来源是其他系统,如kafka、 flume等,需要加入这些jar包依赖,各个jar包名如下所示,可以通过各方途径得到。

Kafka        spark-streaming-kafka_2.10

Flume       spark-streaming-flume_2.10

Kinesis      spark-streaming-kinesis-asl_2.10[Amazon Software License]

Twitter     spark-streaming-twitter_2.10

ZeroMQ   spark-streaming-zeromq_2.10

MQTT       spark-streaming-mqtt_2.10

一个基本的流式应用如下所示。

importorg.apache.spark._

importorg.apache.spark.streaming._

importorg.apache.spark.streaming.StreamingContext._

 

// Create a localStreamingContext with two working thread and batch interval of 1 second.

// The masterrequires 2 cores to prevent from a starvation scenario.

 

val conf = newSparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

val ssc = newStreamingContext(conf, Seconds(1))

val lines =ssc.socketTextStream("localhost", 9999)

val words =lines.flatMap(_.split(" "))

val pairs =words.map(word => (word, 1))

val wordCounts =pairs.reduceByKey(_ + _)

wordCounts.print()

ssc.start()

ssc.awaitTermination()

 

         可以看出,除了最后的ssc.start()和ssc.awaitTermination(),其余部分和传统spark编程向乎一模一样。

StreamingContext

就像SparkContext是spark应用的入口一样,StreamingContext是流式应用的入口。实际上,StreamingContext本身已经包含了SparkContext。初始化StreamingContext的方式:

newStreamingContext(conf, Seconds(1))

newStreamingContext(new SparkContext(…), Seconds(1))

         conf是一个SparkConf对象,这个对象的初始化已经在上文中介绍。其中Seconds(1)代表输入的数据每一秒钟打一个batch包。一个 batch包内可以有多个block,一个block对应一个task。这个将在以后解释。这个batch间隔可以理解成是流式中每个应用的运行周期。

Receiver

Receiver可分为以下几类:

原生类型(fileStream、akka的actorStream、用于测试的由RDD队列组成的queueStream),原生类型可以直接使用,不需要依赖其他包。

高级类型(Twitte、Flume、Kafak、Kinesis),这种类型需要依赖相应jar包。

用户自定义类型,不讨论。

 

对 于每种输入数据的DStream,都有一个Receiver对象与之相关联,对于每个Receiver对象,又有一个Source与之相对应。每 一个Receiver对象代表一个数据接收端实例(即只有一个executor使用一个core来接收数据,并发度为1),如果要提高并发度,可以通过创 建多个Receiver对象来实现,方法如下。

val numStreams = 5

val kafkaStreams =(1 to numStreams).map { i => KafkaUtils.createStream(...) }

val unifiedStream =streamingContext.union(kafkaStreams)

unifiedStream.print()

         其中,streamingContext.union()把多个Receiver的数据合并。

给应用分配的cores数量必须大于Receiver数目,这样才能保证每个Receiver占用一个core的同时,至少还有另一个cores来处理数据。无论master是local的还是集群的,都应该这样配置,否则数据将得不到线程资源来处理。

DStream的变换

查看相应api文档,很多和RDD的变换很像。其中着重介绍三种特殊的transform(func)、updateStateByKey(func)和window Operation。

1.      transform

transform(func)需要传入函数类型是RDDto RDD,这个函数可以由用户自己定义,目的是实现API所没有提供的复杂变换。

2.      updateStateByKey

updateStateByKey(func)常用于有状态的应用,它的基本功能是用当前的RDD数据,更新之前记录下的状态(状态也是个RDD?),并将这个新的状态记录下来,用于下次一更新操作。一个示例如下:

defupdateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {

val newCount = … //将newValues累加到runningCount上,形成新的newCount

Some(newCount)

}

val runningCounts =pairs.updateStateByKey[Int](updateFunction _)

         以上代码实现了将每次的word记数累加到状态上,并记录下来。

3.      Window Operation

Window Operation简单来说,就是在某个时间段,计算之前一定数量时间段内的所有RDD。如下图所示。

 

         上图中,从originalDStream到windowed DStream的变换,就是window Operation。这个变换分别在时间点3、5…计算之前3个时间段内的3个RDD,产生一个新的RDD。一个window Operation的示例如下所示。

val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int)=> (a + b), Seconds(30), Seconds(10))

         reduceByKeyAndWindow函数第一个参数是变换函数,第二个参数代表每次计算截取该时间间隔内的所有RDD,第三个参数代表计算周期。即,每隔10秒钟,计算前30秒内的每个key值对应数据之和。

         所有的windowOperation操作见API文档。

DStream输出操作

DStream 的输出操作包括:print()、saveAsTextFiles(prefix,[suffix])、 saveAsObjectFiles(prefix, [suffix])、saveAsHadoopFiles(prefix,[suffix])、foreachRDD(func),具体说明见API文 档。

其中,除foreachRDD(func)之外的所以操作都是直接将数据存成文件,而foreachRDD(func)的作用是将数据网络传输到外部系统。其中func是RDD to Unit类型的函数。一种错误的使用方法是:

dstream.foreachRDD { rdd =>

valconnection = createNewConnection()  //executed at the driver

rdd.foreach{ record =>

connection.send(record) // executed at the worker

}

}

按 道理,rdd =>{}这个函数应该在每个worker上调用createNewConnection()来创建网络连接,而实际上这个操作是在driver端完 成的,因此有问题。解决办法是将createNewConnection()调用挪到record =>{}操作中来实现,这又会导致每条record创建一个连接,效率太低,解决办法是使用RDD的foreachPartition{}函数,这 个函数将参数内的函数操作放到每个worker上执行,如下所示。

dstream.foreachRDD { rdd =>

rdd.foreachPartition{ partitionOfRecords =>

// ConnectionPool is a static, lazily initialized pool ofconnections

val connection = ConnectionPool.getConnection()

     partitionOfRecords.foreach(record=> connection.send(record))

ConnectionPool.returnConnection(connection)  // return to the pool for future reuse

}

}

         其中ConnectionPool使用连接池,这样在流式应用的每个时间段内,不用创建连接,而且直接从连接池中取用,进一步提高了效率。

 

start()和awaitTermination()

streamingContext 类的start()函数开始这个流式应用的运行,开始运行后,start()函数返回。调用 awaitTermination(),driver将阻塞在这里,直到流式应用意外退出。另外,通过调用stop()函数可以优雅退出流式应用,通过将 传入的stopSparkContext参数设置为false,可以只停止StreamingContext而不停止SparkContext(目前不知道这样做的目的)。流式应用退出后,不可以通过调用start()函数再次启动。

 

持久化操作

基于窗口的DStream操作、以及有状态的变换(如上所述的updateStateByKey(func)),系统自动地把数据持久化到内存中,即隐含地调用了persist()操作。

eConteCheckpointing

Spark Streaming支持对两种数据做checkpoint:

1.      元数据,包括流式应用的配置、流式没崩溃之前定义的各种操作、未完成所有操作的batch。元数据被存储到容忍失败的存储系统上,如HDFS。这种ckeckpoint主要针对driver失败后的修复。

2.      流 式数据,也是存储到容忍失败的存储系统上,如HDFS。这种ckeckpoint主要针对window operation、有状态的操作。无论是driver失败了,还是worker失败了,这种checkpoint都够快速恢复,而不需要将很长的历史数 据都重新计算一遍(以便得到当前的状态)。

 

如果要使用checkpoint机制,有几个工作需要完成。

1.      修改之前的代码,如下所示。

// Function tocreate and setup a new StreamingContext

deffunctionToCreateContext(): StreamingContext = {

val ssc = new StreamingContext(...)   // new context

val lines = ssc.socketTextStream(...) //create DStreams

lines.checkpoint(…)                             //设置DStream做checkpoint的周期。

...

ssc.checkpoint(checkpointDirectory)   // set checkpoint directory

ssc

}

 

// GetStreamingContext from checkpoint data or create a new one

val context =StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additionalsetup on context that needs to be done,

// irrespective ofwhether it is being started or restarted

context. ...

 

// Start thecontext

context.start()

context.awaitTermination()

         其中比较关键的操作是:使用StreamingContext.getOrCreate来创建StreamingContext对象,传入的第一个参数是 checkpoint的存放目录,第二参数是生成StreamingContext对象的用户自定义函数。如果checkpoint的存放目录存在,则从 这个目录中生成StreamingContext对象;如果不存在,才会调用第二个函数来生成新的StreamingContext对象。

         在functionToCreateContext函数中,除了生成一个新的StreamingContext操作,还需要完成各种操作,然后调用 ssc.checkpoint(checkpointDirectory)来初始化checkpoint功能,最后再返回 StreamingContext对象。

         这样,在StreamingContext.getOrCreate之后,就可以直接调用start()函数来启动(或者是从中断点继续运行)流式应用了。如果有其他在启动或继续运行都要做的工作,可以在start()调用前执行。

 

2.      设置流式数据checkpoint的周期

         对于一个需要做checkpoint的DStream结构,可以通过调用DStream.checkpoint(checkpointInterval) 来设置ckeckpoint的周期,经验上一般将这个checkpoint周期设置成batch周期的5至10倍。

 

3.      确保driver可以自动重启

重启driver的方法已经在应用提交中提及。

 

4.      使用write ahead logs功能

这 是一个可选功能,建议加上。在输入RDD的Receiver中开启这个功能将使得输入数据写入之前配置的checkpoint目录。这样有状态的 数据可以从上一个checkpoint开始计算。开启的方法是把 spark.streaming.receiver.writeAheadLogs.enable这个property设置为true。

另 外,由于输入RDD的默认StorageLevel是MEMORY_AND_DISK_2,即数据会在两台worker上做 replication。实际上,Spark Streaming模式下,任何从网络输入数据的Receiver(如kafka、flume、socket)都会在两台机器上做数据备份。如果开启了 write ahead logs的功能,建议把StorageLevel改成MEMORY_AND_DISK_SER。修改的方法是,在创建RDD时由参数传入。

 

补充一点,使用以上的checkpoint机制,确实可以保证数据0丢失。但是一个前提条件是,数据发送端必须要有缓存功能,这样才能保证在spark应用重启期间,数据发送端不会因为spark streaming服务不可用而把数据丢弃。

性能调优

对spark应用做性能调优主要包括以下几个方面

1.      提高Receiver的并发度,具体的方法在上文中已经提及。

2.      调整Receiver的RDD数据分区时间隔

batch interval是接收一个batch数据的时间;而blockinterval则是接收一个block数据的时间。一个block的数据对应一个 task。所以一个receiver接收一个batch数据后,执行简单RDD操作需要的task数是:(batch interval/block interval)。所以减少block interval可以提高task数,即提高了并发度。

block interval通过spark.streaming.blockInterval这个property进行配置。最小值是50ms。通过ReceiverInputDStream.Repartition()也可以配置固定数目的分区。

3.      调整数据处理的并发度

reduceByKey 类型的操作,结果RDD的分区数可以通过参数传入。否则其结果RDD分区数由spark.default.parallelism来决定。这个property的默认值是父RDD的分区数。

4.      调整数据序列化方式

5.      调整batch interval

Spark实现流式计算的方式,其实相当于把数据分割成很小的时间段,在每个小时间段内做Spark批量计算。所以,这个时间段的大小直接决定了流式系统的性能。如果设置的太大,时效性不好,如果设置的太小,很可能计算数率赶不上数据流入的速度。

一 般决定合适的batch interval的方式是:先用较大的batch interval和较低的数据量运行流式应用,从web UI上观察数据平均end-to-end时延,如果平稳且较低,则可以逐步减小batch interval或增大数据量,直至end-to-end时延平稳、小于batch interval并在一个等级上。这个时候一般是合适的batch interval。

6.      内存调优

DStream(非网络input stream)的默认 StorageLevel是MEMORY_ONLY_SER,RDD的默认 StorageLevel是MEMORY_ONLY  。(待续)

Spark Streaming的数据可靠性

上 文说过的ckeckpoint机制、write ahead log机制、Receiver缓存机器、可靠的Receiver(即数据接收并备份成功后会发送ackownage),可以保证无论是worker失效还 是driver失效,都是数据0丢失。原因是:如果没有Receiver服务的worker失效了,RDD数据可以依赖血统来重新计算;如果 Receiver所在worker失败了,由于Reciever是可靠的,并有write ahead log机制,则收到的数据可以保证不丢;如果driver失败了,同理。

http://www.chinahadoop.cn/group/3/thread/1751

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics