Spark的streaming机制简单来说,就是将连续的时间序列切割成不同的离散时间段。针对某个时间段,将该时间段内的所有输入数据组成一个 RDD,接下来的工作就如同一个传统的sprark应用一样,对这个RDD进行各种变换,直到最终输出数据。可以认为,Spark Streaming就是在时间维度上,为每个时间段都创建了同一个spark应用,这样表面上看起来就像是流式的工作方式。
其中每个方框都是一个RDD;一个横排的所有RDD组成一个DStream;一个纵列的所有RDD是在某一个时刻的流式作业。DStream是在Spark Streaming编程中的一个基本数据单位(执行变换、输出等操作),就像是Spark编程中的RDD一样。
API的使用示例
在进行Streaming编程前,需要依赖spark- streaming_*.jar包,这个jar包已经包含在spark- assembly-*.jar里了,所以如果之前已经加入了这个包的依赖,不需要额外的工作。另外,如果流式的数据来源是其他系统,如kafka、 flume等,需要加入这些jar包依赖,各个jar包名如下所示,可以通过各方途径得到。
Kafka spark-streaming-kafka_2.10
Flume spark-streaming-flume_2.10
Kinesis spark-streaming-kinesis-asl_2.10[Amazon Software License]
Twitter spark-streaming-twitter_2.10
ZeroMQ spark-streaming-zeromq_2.10
MQTT spark-streaming-mqtt_2.10
一个基本的流式应用如下所示。
importorg.apache.spark._
importorg.apache.spark.streaming._
importorg.apache.spark.streaming.StreamingContext._
// Create a localStreamingContext with two working thread and batch interval of 1 second.
// The masterrequires 2 cores to prevent from a starvation scenario.
val conf = newSparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = newStreamingContext(conf, Seconds(1))
val lines =ssc.socketTextStream("localhost", 9999)
val words =lines.flatMap(_.split(" "))
val pairs =words.map(word => (word, 1))
val wordCounts =pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
可以看出,除了最后的ssc.start()和ssc.awaitTermination(),其余部分和传统spark编程向乎一模一样。
StreamingContext
就像SparkContext是spark应用的入口一样,StreamingContext是流式应用的入口。实际上,StreamingContext本身已经包含了SparkContext。初始化StreamingContext的方式:
newStreamingContext(conf, Seconds(1))
newStreamingContext(new SparkContext(…), Seconds(1))
conf是一个SparkConf对象,这个对象的初始化已经在上文中介绍。其中Seconds(1)代表输入的数据每一秒钟打一个batch包。一个 batch包内可以有多个block,一个block对应一个task。这个将在以后解释。这个batch间隔可以理解成是流式中每个应用的运行周期。
Receiver
Receiver可分为以下几类:
原生类型(fileStream、akka的actorStream、用于测试的由RDD队列组成的queueStream),原生类型可以直接使用,不需要依赖其他包。
高级类型(Twitte、Flume、Kafak、Kinesis),这种类型需要依赖相应jar包。
用户自定义类型,不讨论。
对 于每种输入数据的DStream,都有一个Receiver对象与之相关联,对于每个Receiver对象,又有一个Source与之相对应。每 一个Receiver对象代表一个数据接收端实例(即只有一个executor使用一个core来接收数据,并发度为1),如果要提高并发度,可以通过创 建多个Receiver对象来实现,方法如下。
val numStreams = 5
val kafkaStreams =(1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream =streamingContext.union(kafkaStreams)
unifiedStream.print()
其中,streamingContext.union()把多个Receiver的数据合并。
给应用分配的cores数量必须大于Receiver数目,这样才能保证每个Receiver占用一个core的同时,至少还有另一个cores来处理数据。无论master是local的还是集群的,都应该这样配置,否则数据将得不到线程资源来处理。
DStream的变换
查看相应api文档,很多和RDD的变换很像。其中着重介绍三种特殊的transform(func)、updateStateByKey(func)和window Operation。
1. transform
transform(func)需要传入函数类型是RDDto RDD,这个函数可以由用户自己定义,目的是实现API所没有提供的复杂变换。
2. updateStateByKey
updateStateByKey(func)常用于有状态的应用,它的基本功能是用当前的RDD数据,更新之前记录下的状态(状态也是个RDD?),并将这个新的状态记录下来,用于下次一更新操作。一个示例如下:
defupdateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = … //将newValues累加到runningCount上,形成新的newCount
Some(newCount)
}
val runningCounts =pairs.updateStateByKey[Int](updateFunction _)
以上代码实现了将每次的word记数累加到状态上,并记录下来。
3. Window Operation
Window Operation简单来说,就是在某个时间段,计算之前一定数量时间段内的所有RDD。如下图所示。
上图中,从originalDStream到windowed DStream的变换,就是window Operation。这个变换分别在时间点3、5…计算之前3个时间段内的3个RDD,产生一个新的RDD。一个window Operation的示例如下所示。
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int)=> (a + b), Seconds(30), Seconds(10))
reduceByKeyAndWindow函数第一个参数是变换函数,第二个参数代表每次计算截取该时间间隔内的所有RDD,第三个参数代表计算周期。即,每隔10秒钟,计算前30秒内的每个key值对应数据之和。
所有的windowOperation操作见API文档。
DStream输出操作
DStream 的输出操作包括:print()、saveAsTextFiles(prefix,[suffix])、 saveAsObjectFiles(prefix, [suffix])、saveAsHadoopFiles(prefix,[suffix])、foreachRDD(func),具体说明见API文 档。
其中,除foreachRDD(func)之外的所以操作都是直接将数据存成文件,而foreachRDD(func)的作用是将数据网络传输到外部系统。其中func是RDD to Unit类型的函数。一种错误的使用方法是:
dstream.foreachRDD { rdd =>
valconnection = createNewConnection() //executed at the driver
rdd.foreach{ record =>
connection.send(record) // executed at the worker
}
}
按 道理,rdd =>{}这个函数应该在每个worker上调用createNewConnection()来创建网络连接,而实际上这个操作是在driver端完 成的,因此有问题。解决办法是将createNewConnection()调用挪到record =>{}操作中来实现,这又会导致每条record创建一个连接,效率太低,解决办法是使用RDD的foreachPartition{}函数,这 个函数将参数内的函数操作放到每个worker上执行,如下所示。
dstream.foreachRDD { rdd =>
rdd.foreachPartition{ partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool ofconnections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record=> connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
其中ConnectionPool使用连接池,这样在流式应用的每个时间段内,不用创建连接,而且直接从连接池中取用,进一步提高了效率。
start()和awaitTermination()
streamingContext 类的start()函数开始这个流式应用的运行,开始运行后,start()函数返回。调用 awaitTermination(),driver将阻塞在这里,直到流式应用意外退出。另外,通过调用stop()函数可以优雅退出流式应用,通过将 传入的stopSparkContext参数设置为false,可以只停止StreamingContext而不停止SparkContext(目前不知道这样做的目的)。流式应用退出后,不可以通过调用start()函数再次启动。
持久化操作
基于窗口的DStream操作、以及有状态的变换(如上所述的updateStateByKey(func)),系统自动地把数据持久化到内存中,即隐含地调用了persist()操作。
eConteCheckpointing
Spark Streaming支持对两种数据做checkpoint:
1. 元数据,包括流式应用的配置、流式没崩溃之前定义的各种操作、未完成所有操作的batch。元数据被存储到容忍失败的存储系统上,如HDFS。这种ckeckpoint主要针对driver失败后的修复。
2. 流 式数据,也是存储到容忍失败的存储系统上,如HDFS。这种ckeckpoint主要针对window operation、有状态的操作。无论是driver失败了,还是worker失败了,这种checkpoint都够快速恢复,而不需要将很长的历史数 据都重新计算一遍(以便得到当前的状态)。
如果要使用checkpoint机制,有几个工作需要完成。
1. 修改之前的代码,如下所示。
// Function tocreate and setup a new StreamingContext
deffunctionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) //create DStreams
lines.checkpoint(…) //设置DStream做checkpoint的周期。
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// GetStreamingContext from checkpoint data or create a new one
val context =StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additionalsetup on context that needs to be done,
// irrespective ofwhether it is being started or restarted
context. ...
// Start thecontext
context.start()
context.awaitTermination()
其中比较关键的操作是:使用StreamingContext.getOrCreate来创建StreamingContext对象,传入的第一个参数是 checkpoint的存放目录,第二参数是生成StreamingContext对象的用户自定义函数。如果checkpoint的存放目录存在,则从 这个目录中生成StreamingContext对象;如果不存在,才会调用第二个函数来生成新的StreamingContext对象。
在functionToCreateContext函数中,除了生成一个新的StreamingContext操作,还需要完成各种操作,然后调用 ssc.checkpoint(checkpointDirectory)来初始化checkpoint功能,最后再返回 StreamingContext对象。
这样,在StreamingContext.getOrCreate之后,就可以直接调用start()函数来启动(或者是从中断点继续运行)流式应用了。如果有其他在启动或继续运行都要做的工作,可以在start()调用前执行。
2. 设置流式数据checkpoint的周期
对于一个需要做checkpoint的DStream结构,可以通过调用DStream.checkpoint(checkpointInterval) 来设置ckeckpoint的周期,经验上一般将这个checkpoint周期设置成batch周期的5至10倍。
3. 确保driver可以自动重启
重启driver的方法已经在应用提交中提及。
4. 使用write ahead logs功能
这 是一个可选功能,建议加上。在输入RDD的Receiver中开启这个功能将使得输入数据写入之前配置的checkpoint目录。这样有状态的 数据可以从上一个checkpoint开始计算。开启的方法是把 spark.streaming.receiver.writeAheadLogs.enable这个property设置为true。
另 外,由于输入RDD的默认StorageLevel是MEMORY_AND_DISK_2,即数据会在两台worker上做 replication。实际上,Spark Streaming模式下,任何从网络输入数据的Receiver(如kafka、flume、socket)都会在两台机器上做数据备份。如果开启了 write ahead logs的功能,建议把StorageLevel改成MEMORY_AND_DISK_SER。修改的方法是,在创建RDD时由参数传入。
补充一点,使用以上的checkpoint机制,确实可以保证数据0丢失。但是一个前提条件是,数据发送端必须要有缓存功能,这样才能保证在spark应用重启期间,数据发送端不会因为spark streaming服务不可用而把数据丢弃。
性能调优
对spark应用做性能调优主要包括以下几个方面
1. 提高Receiver的并发度,具体的方法在上文中已经提及。
2. 调整Receiver的RDD数据分区时间隔
batch interval是接收一个batch数据的时间;而blockinterval则是接收一个block数据的时间。一个block的数据对应一个 task。所以一个receiver接收一个batch数据后,执行简单RDD操作需要的task数是:(batch interval/block interval)。所以减少block interval可以提高task数,即提高了并发度。
block interval通过spark.streaming.blockInterval这个property进行配置。最小值是50ms。通过ReceiverInputDStream.Repartition()也可以配置固定数目的分区。
3. 调整数据处理的并发度
reduceByKey 类型的操作,结果RDD的分区数可以通过参数传入。否则其结果RDD分区数由spark.default.parallelism来决定。这个property的默认值是父RDD的分区数。
4. 调整数据序列化方式
5. 调整batch interval
Spark实现流式计算的方式,其实相当于把数据分割成很小的时间段,在每个小时间段内做Spark批量计算。所以,这个时间段的大小直接决定了流式系统的性能。如果设置的太大,时效性不好,如果设置的太小,很可能计算数率赶不上数据流入的速度。
一 般决定合适的batch interval的方式是:先用较大的batch interval和较低的数据量运行流式应用,从web UI上观察数据平均end-to-end时延,如果平稳且较低,则可以逐步减小batch interval或增大数据量,直至end-to-end时延平稳、小于batch interval并在一个等级上。这个时候一般是合适的batch interval。
6. 内存调优
DStream(非网络input stream)的默认 StorageLevel是MEMORY_ONLY_SER,RDD的默认 StorageLevel是MEMORY_ONLY 。(待续)
Spark Streaming的数据可靠性
上 文说过的ckeckpoint机制、write ahead log机制、Receiver缓存机器、可靠的Receiver(即数据接收并备份成功后会发送ackownage),可以保证无论是worker失效还 是driver失效,都是数据0丢失。原因是:如果没有Receiver服务的worker失效了,RDD数据可以依赖血统来重新计算;如果 Receiver所在worker失败了,由于Reciever是可靠的,并有write ahead log机制,则收到的数据可以保证不丢;如果driver失败了,同理。
http://www.chinahadoop.cn/group/3/thread/1751
相关推荐
Spark Streaming Programming Guide 翻译+个人学习笔记整理
Spark SQL、Spark Streaming、MLlib、GraphX、Spark R等核心组件解决了很多的大数据问题,其完美的框架日受欢迎。其相应的生态环境包括zepplin等可视化方面,正日益壮大。大型公司争相实用spark来代替原有hadoop上...
It covers Spark core and its add-on libraries, including Spark SQL, Spark Streaming, GraphX, and MLlib. Big Data Analytics with Spark is therefore written for busy professionals who prefer learning a...
主要介绍了Spark学习笔记Spark Streaming的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
IT十八掌第三期大数据配套学习笔记! 1.Spark简介 2.Spark部署和运行 3.Spark程序开发 4. Spark编程模型 5.作业执行解析 6.Spark SQL与DataFrame 7.深入Spark Streaming 8.Spark MLlib与机器学习 9.GraphX与SparkR 10...
RoadOfStudySparkspark学习之路,包含spark core,spark sql,spark streaming,spark mlib学习笔记
spark-streamispark-streaming和spark-sql笔记文档。spark-streaming和spark-sql笔记文档。ng和spark-sql笔记文档。
见SparkStreaming.md 奇淫巧技 IDEA,按住ALT,并按住左键进行框选也能实现多行编辑 Linux的vi编辑。 使用A进入输入模式,再进行复制。如果使用a进入输入模式,复制东西时开头几个字符常常会被删除。 分布式的基于...
它还支持丰富的高级工具集,包括用于SQL和DataFrames的Spark SQL,用于机器学习的MLlib,用于图形处理的GraphX和用于流处理的Spark Streaming。 在线文件 您可以在和上找到最新的Spark文档,包括编程指南。 此自述...
IT学习笔记大数据蜂巢卡夫卡猪Power BI Python数据科学库火花火花编码火花流Spark Streaming Real Project教程Kafka客户端应用程序(生产者和消费者) Spark Streaming接收套接字数据并进行字计数。 Spark Streaming...
学习Spark的代码,关于Spark Core、Spark SQL、Spark Streaming、Spark MLLib 说明 开发环境 基于Deepin Linux 15.9版本 基于Hadoop2.6、Spark2.4、Scala2.11、java8等 系列环境搭建相关文章,见下方 更多内容见:...
该项目介绍了如何使用Apache Spark机器学习创建建议。 您可以在IBM Data Science Experience上运行许多jupyter笔记本,并且可以与电影推荐Web应用程序进行实时演示。 该演示还使用IBM Message Hub(kafka)将应用...
学习Spark第二版欢迎使用Learning Spark 2nd Edition的GitHub存储库。 章节 , , ,和包含独立的火花的应用程序。 您可以通过运行Python脚本python build_jars.py来构建每个章节的所有JAR文件。 或者,您可以CD到...
博客 Jupyter笔记本和用于博客文章的其他代码。 ... :使用推荐系统理论和Foursquare评论... :使用Spark Streaming流传输来自NYC DOT提供的纽约市不同路口的传感器的交通数据。 :图像分类深度学习比赛获奖脚本的撰写。