`
m635674608
  • 浏览: 4930246 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Storm消息机制

 
阅读更多

这章讨论Storm's reliability capabilities, 如何保证从spout emit出来的所有tuple都被正确的执行(fully processed)?

  

What does it mean for a message to be "fully processed"?

  

首先的问题是, 什么叫tuple或message被fully processed? 因为tuple被emit出去后, 可能会被多级bolt处理, 并且bolt也有可能由该tuple生成多组tuples, 所以情况还是比较复杂的     
最终由一个tuple trigger(触发)的所有tuples会形成一个树或DAG(有向无环图)

  

只有当tuple tree上的所有节点都被成功处理的时候, storm才认为该tuple被fully processed     
如果tuple tree上任一节点失败或者超时, 都被看作该tuple fail, 失败的tuple会被重发      
Storm considers a tuple coming off a spout "fully processed" when the tuple tree has been exhausted and every message in the tree has been processed.     
A tuple is considered failed when its tree of messages fails to be fully processed within a specified timeout.     
This timeout can be configured on a topology-specific basis using the Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS configuration and defaults to 30 seconds.

  

 

  

What happens if a message is fully processed or fails to be fully processed?

  

该机制是如何实现的?     
首先, 所有tuple都有一个唯一标识msgId, 当tuple被emit的时候确定

  

_collector.emit(new Values("field1", "field2", 3) , msgId);

其次, 看看下面的ISpout接口, 除了获取tuple的nextTuple    
还有ack和fail, 当Storm detect到tuple被fully processed, 会调用ack, 如果超时或detect fail, 则调用fail    
此处需要注意的是, tuple只有在被产生的那个spout task上可以被ack或fail, 具体原因看后面的实现解释就理解了

  

a tuple will be acked or failed by the exact same Spout task that created it. So if a Spout is executing as many tasks across the cluster, a tuple won't be acked or failed by a different task than the one that created it.

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

最后, 在spout怎么实现的, 其实比较简单.    
对于Spout queue, get message只是open而不是pop, 并且把tuple状态改为pending, 防止该tuple被多次发送.    
一直等到该tuple被ack, 才真正的pop该tuple, 当然该tuple如果fail, 就重新把状态改回初始状态    
这也解释, 为什么tuple只能在被emit的spout task被ack或fail, 因为只有这个task的queue里面有该tuple

  

When KestrelSpout takes a message off the Kestrel queue, it "opens" the message.     
This means the message is not actually taken off the queue yet, but instead placed in a "pending" state waiting for acknowledgement that the message is completed.     
While in the pending state, a message will not be sent to other consumers of the queue. Additionally, if a client disconnects all pending messages for that client are put back on the queue. 

 

What is Storm's reliability API?

前面一直没有说明的一个问题是, storm本身通过什么机制来判断tuple是否成功被fully processed?

要解决这个问题, 可以分为两个问题,    
1. 如何知道tuple tree的结构?    
2. 如何知道tuple tree上每个节点的运行情况, success或fail?

答案很简单, 你必须告诉它, 如何告诉它?    
1. 对于tuple tree的结构, 需要知道每个tuple由哪些tuple产生, 即tree节点间的link    
   tree节点间的link称为anchoring. 当每次emit新tuple的时候, 必须显式的通过API建立anchoring

  

Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple.    
Each word tuple is anchored by specifying the input tuple as the first argument to emit.

看下面的代码例子, 

  
_collector.emit(tuple, new Values(word)); 

emit的第一个参数是tuple, 这就是用于建anchoring    
当然你也可以直接调用unanchoring的emit版本, 如果不需要保证reliable的话, 这样效率会比较高

_collector.emit(new Values(word));

同时前面说了, 可能一个tuple依赖于多个输入, 

  

An output tuple can be anchored to more than one input tuple.    
This is useful when doing streaming joins or aggregations. A multi-anchored tuple failing to be processed will cause multiple tuples to be replayed from the spouts.

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

对于Multi-anchoring的情况会导致tuple tree变为tuple DGA, 当前storm的版本已经可以很好的支持DAG    
Multi-anchoring adds the output tuple into multiple tuple trees.   
Note that it's also possible for multi-anchoring to break the tree structure and create tuple DAGs,

image 

2. 对于tuple tree上每个节点的运行情况, 你需要在每个bolt的逻辑处理完后, 显式的调用OutputCollector的ack和fail来汇报      
  

    

This is done by using the ack and fail methods on the OutputCollector.      
You can use the fail method on the OutputCollector to immediately fail the spout tuple at the root of the tuple tree.

  

看下面的例子, 在execute函数的最后会调用,     
_collector.ack(tuple); 

我比较迷惑, 为啥ack是OutputCollector的function, 而不是tuple的function?    
而且就算ack也是应该对bolt的input进行ack, 为啥是output, 可能因为所有input都是其他bolt的output产生...这个设计的比较不合理

  
public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;
        
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

storm为了保证reliable, 必然是要牺牲效率的, 此处storm会在task memory里面去记录你汇报的tuple tree的结构和运行情况.     
而只有当某tuple节点被ack或fail后才会被从内存中删除, 所以如果你总是不去ack或fail, 那么会导致task的out of memory

  

Every tuple you process must be acked or failed. Storm uses memory to track each tuple, so if you don't ack/fail every tuple, the task will eventually run out of memory.

 

简单的版本, BasicBolt

面的机制, 会给程序员造成负担, 尤其对于很多简单的case, 比如filter, 每次都要去显式的建立anchoring和ack…

所以storm提供简单的版本, 会自动的建立anchoring, 并在bolt执行完自动调用ack

A lot of bolts follow a common pattern of reading an input tuple, emitting tuples based on it, and then acking the tuple at the end of the execute method. These bolts fall into the categories of filters and simple functions. Storm has an interface called BasicBolt that encapsulates this pattern for you.

  
public class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

 

How do I make my applications work correctly given that tuples can be replayed?

问题是如何保证"fully fault-tolerant exactly-once messaging semantics”, 因为replay会导致一个message在bolt上多次出现, 这样对类似计数这样的应用会有很大影响.    
从Storm0.7开始, 给出的transactional topologies功能就比较好的解决这个问题

As always in software design, the answer is "it depends." Storm 0.7.0 introduced the "transactional topologies" feature, which enables you to get fully fault-tolerant exactly-once messaging semantics for most computations. Read more about transactional topologies here

 

How does Storm implement reliability in an efficient way?

现在讨论的是Storm如何实现reliablility机制, Storm实现一组特殊的'acker’ task来track每一个spout tuple, 同时acker task的个数你可以根据tuple的数量来配置

A Storm topology has a set of special "acker" tasks that track the DAG of tuples for every spout tuple.   
When an acker sees that a DAG is complete, it sends a message to the spout task that created the spout tuple to ack the message.   
You can set the number of acker tasks for a topology in the topology configuration using Config.TOPOLOGY_ACKERS. Storm defaults TOPOLOGY_ACKERS to one task -- you will need to increase this number for topologies processing large amounts of messages. 

所有被产生的tuple都会有一个随机的64bit的id用于被track    
tuple之间通过emit时的anchor形成tuple tree, 并且每个tuple都知道产生它的spout tuple的id (通过不断的copy传递)

当任何tuple被acked的时候, 都会send message到相应的acker, 具体例子如下图

When a tuple is created in a topology, whether in a spout or a bolt, it is given a random 64 bit id. These ids are used by ackers to track the tuple DAG for every spout tuple.

Every tuple knows the ids of all the spout tuples for which it exists in their tuple trees. When you emit a new tuple in a bolt, the spout tuple ids from the tuple's anchors are copied into the new tuple. When a tuple is acked, it sends a message to the appropriate acker tasks with information about how the tuple tree changed. In particular it tells the acker "I am now completed within the tree for this spout tuple, and here are the new tuples in the tree that were anchored to me".

For example, if tuples "D" and "E" were created based on tuple "C", here's how the tuple tree changes when "C" is acked:

image 

 

当然storm具体怎样通过acker task来track所有的tuples, 还需要解决下面几个问题:

1. 当有多个acker的时候, 当一个tuple被acked的时候, 如果知道给哪一个acker发送message?    
因为每个tuple都知道产生它的spout tuple id, 所以使用mod hash(hash方法, m mod n)来分配spout tuple id, 以保证一个spout tuple id所产生的所有tuple tree都会被分配到一个acker上    
当某一个tuple被acked的时候, 只要通过hash找到相应的acker即可

You can have an arbitrary number of acker tasks in a topology. This leads to the following question: when a tuple is acked in the topology, how does it know to which acker task to send that information? Storm uses mod hashing to map a spout tuple id to an acker task. Since every tuple carries with it the spout tuple ids of all the trees they exist within, they know which acker tasks to communicate with. 

2. 如果有多个spout task的时候, storm在最终ack spout tuple的时候, 如何知道对应于哪个spout task, 因为必须在产生tuple的那个spout task进行ack?    
答案很简单, spout task在emit一个新的tuple的时候, 会发message告诉相应的acker它的task id, 所以acker是知道tupleid和taskid的map的

How the acker tasks track which spout tasks are responsible for each spout tuple they're tracking?

When a spout task emits a new tuple, it simply sends a message to the appropriate acker telling it that its task id is responsible for that spout tuple. Then when an acker sees a tree has been completed, it knows to which task id to send the completion message.

3. 如果Acker在内存里面显式的监控所有的tuple tree, 会有扩展问题, 当面对海量tuple或复杂workflow的时候, 很有可能会爆内存, 怎么解决这个问题?    
Storm这里采用了一个特别的方法, 这个是storm的主要的突破之一, 该方法的好处就是对于每个spout tuple, 所需要的内存是固定的无论多复杂, 并且只有about 20 bytes    
Acker只需要为每个spout tuple存储spout tuple id, task id, ack val    
这个ack val, 64 bit number, 用于表示整个tuple tree的状况, 产生方法是tuple tree中所有created和acked的tuple的id进行异或(同为0, 异为1)    
当ack val值为0的时候, 即表示tuple tree被完成

这个思路非常巧妙, 两个相同的数去异或为0, 而created和acked时, 会进行两次异或, 所以所有created的tuple都被acked时, 异或值最终为0    
我考虑到不同的tupleid之间的位有重叠时, 是否会有干扰, 简单的试一下, 没有干扰

具体acker工作原理参考, Twitter Storm源代码分析之acker工作流程

Acker tasks do not track the tree of tuples explicitly. For large tuple trees with tens of thousands of nodes (or more), tracking all the tuple trees could overwhelm the memory used by the ackers. Instead, the ackers take a different strategy that only requires a fixed amount of space per spout tuple (about 20 bytes). This tracking algorithm is the key to how Storm works and is one of its major breakthroughs. An acker task stores a map from a spout tuple id to a pair of values. The first value is the task id that created the spout tuple which is used later on to send completion messages. The second value is a 64 bit number called the "ack val". The ack val is a representation of the state of the entire tuple tree, no matter how big or how small. It is simply the xor of all tuple ids that have been created and/or acked in the tree. When an acker task sees that an "ack val" has become 0, then it knows that the tuple tree is completed.

 

最后, 考虑task fail的情况,     
一般task fail, 导致超时, spout会replay    
Acker task fail, 会导致它跟踪的所有tuple无法被ack, 所以会全部超时被spout重发    
Spout task fail, 如果spout本身fail, 那么需要源头来负责replay, 比如RabbitMQ或Kafka

Now that you understand the reliability algorithm, let's go over all the failure cases and see how in each case Storm avoids data loss:

  • Task dies: In this case the spout tuple ids at the root of the trees for the failed tuple will time out and be replayed. 
  • Acker task dies: In this case all the spout tuples the acker was tracking will time out and be replayed. 
  • Spout task dies: In this case the source that the spout talks to is responsible for replaying the messages. For example, queues like Kestrel and RabbitMQ will place all pending messages back on the queue when a client disconnects. 

As you have seen, Storm's reliability mechanisms are completely distributed, scalable, and fault-tolerant. 

 

Tuning reliability

当然reliability必然会给系统带来较大的overload, 比如number of messages就会翻倍, 由于和acker之间的通信    
所以如果不需要reliability, 可以通过下面的方法将其关闭

Acker tasks are lightweight, so you don't need very many of them in a topology. You can track their performance through the Storm UI (component id "__acker"). If the throughput doesn't look right, you'll need to add more acker tasks. 

If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations -- then you can improve performance by not tracking the tuple tree for spout tuples. Not tracking a tuple tree halves the number of messages transferred since normally there's an ack message for every tuple in the tuple tree. Additionally, it requires less ids to be kept in each downstream tuple, reducing bandwidth usage.

There are three ways to remove reliability. 

1. The first is to set Config.TOPOLOGY_ACKERS to 0. In this case, Storm will call the ack method on the spout immediately after the spout emits a tuple. The tuple tree won't be tracked.

2. The second way is to omit a message id in the SpoutOutputCollector.emit method.

3. Finally, emit them as unanchored tuples

 

【淘宝讲解】

 

  • Storm记录级容错的基本原理

 

首先来看一下什么叫做记录级容错?storm允许用户在spout中发射一个新的源tuple时为其指定一个message id, 这个message id可以是任意的object对象。多个源tuple可以共用一个message id,表示这多个源 tuple对用户来说是同一个消息单元。storm中记录级容错的意思是说,storm会告知用户每一个消息单元是否在指定时间内被完全处理了。那什么叫做完全处理呢,就是该message id绑定的源tuple及由该源tuple后续生成的tuple经过了topology中每一个应该到达的bolt的处理。举个例子。在图4-1中,在spout由message 1绑定的tuple1和tuple2经过了bolt1和bolt2的处理生成两个新的tuple,并最终都流向了bolt3。当这个过程完成处理完时,称message 1被完全处理了。
消息传递
图4-1

在storm的topology中有一个系统级组件,叫做acker。这个acker的任务就是追踪从spout中流出来的每一个message id绑定的若干tuple的处理路径,如果在用户设置的最大超时时间内这些tuple没有被完全处理,那么acker就会告知spout该消息处理失败了,相反则会告知spout该消息处理成功了。在刚才的描述中,我们提到了”记录tuple的处理路径”,如果曾经尝试过这么做的同学可以仔细地思考一下这件事的复杂程度。但是storm中却是使用了一种非常巧妙的方法做到了。在说明这个方法之前,我们来复习一个数学定理。

A xor A = 0.

A xor B…xor B xor A = 0,其中每一个操作数出现且仅出现两次。

storm中使用的巧妙方法就是基于这个定理。具体过程是这样的:在spout中系统会为用户指定的message id生成一个对应的64位整数,作为一个root id。root id会传递给acker及后续的bolt作为该消息单元的唯一标识。同时无论是spout还是bolt每次新生成一个tuple的时候,都会赋予该tuple一个64位的整数的id。Spout发射完某个message id对应的源tuple之后,会告知acker自己发射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一个输入tuple处理完之后,也会告知acker自己处理的输入tuple的id及新生成的那些tuple的id。Acker只需要对这些id做一个简单的异或运算,就能判断出该root id对应的消息单元是否处理完成了。下面通过一个图示来说明这个过程。

图4-1 spout中绑定message 1生成了两个源tuple,id分别是0010和1011.

图4-2 bolt1处理tuple 0010时生成了一个新的tuple,id为0110.

图4-3 bolt2处理tuple 1011时生成了一个新的tuple,id为0111.

图4-4 bolt3中接收到tuple 0110和tuple 0111,没有生成新的tuple.

可能有些细心的同学会发现,容错过程存在一个可能出错的地方,那就是,如果生成的tuple id并不是完全各异的,acker可能会在消息单元完全处理完成之前就错误的计算为0。这个错误在理论上的确是存在的,但是在实际中其概率是极低极低的,完全可以忽略。

【acker数量】

针对源码进行分析。

 

 

http://www.51studyit.com/html/notes/20140329/46.html

分享到:
评论

相关推荐

    storm on yarn概念架构消息机制概述

    storm on yarn概念架构消息机制概述 包括storm job跟mapreduce job对比 storm on yarn架构图 storm关键概念描述 storm消息机制介绍

    Storm Acker机制

    Storm,是于90年代突然崛起,颇受年轻消费者追崇的,一个专注于时尚生活的品牌,主要产品集中在时尚腕表、珠宝首饰、箱包、雨伞以及香水等。她是由其创始人Steve Sun于1989年创立。 自诞生之日起,Storm就以独特的...

    Apache Storm Buffer内部机制简介Prezi幻灯片

    根据Storm官方文档以及LMAX的介绍,使用Prezi工具制作了Storm内部Buffer的简要介绍,没有过多的解释,需要配合网上的其他文档及资源来理解Storm内部机制。

    stormonyarn概念架构消息机制

    stormonyarn概念架构消息机制,基础的storm运行例子

    论文研究-Storm流式计算框架反压机制研究.pdf

    Storm集群提供了强大的实时处理能力,Storm上下游处理节点由于任务差异而导致数据流元组Tuple处理超时从而影响系统吞吐量及其性能。针对该问题,提出了一种能够灵活调节Topology中各环节数据负载的反压机制,该机制...

    细细品味Storm_Storm简介及安装

    第8章探讨Lambda体系结构的实现方法,讲解如何 将批处理机制和实时处理引擎结合起来构建一个可 纠错的分析系统;第9章讲解如何将Pig脚本转化为 topology,并且使用Storm-YARN部署topology,从 而将批处理系统转化为...

    大数据平台Storm入门到精通

    01.Storm基础知识02.Storm集群安装-1-new .avi.baiduyun.p05.Storm配置文件配置项讲解07.Storm基本API介绍08.Storm Topology的并发度09.Strom消息机制原理讲解10.Storm DRPC实战讲解

    storm利用ack保证数据的可靠性源码

    storm利用ack保证数据的可靠性,发送失败时进行重发,保证数据不丢失。

    03_storm.zip

    【Storm篇】--Storm中的同步服务DRPC 【Storm篇】--Storm从初始到分布式搭建 ...【Storm篇】--Storm 容错机制 【Storm篇】--Storm并发机制 【Storm篇】--Storm分组策略 【Storm篇】--Storm基础概念

    Storm的文档详解

    大数据storm流式计算引擎,关于storm的运行机制,运行的的demo,提供详细的应用

    Storm简介.pdf

    内容概要: • 实时计算需要解决一些什么问题 • 实现一个实时计算系统 • Storm基本概念 • Storm使用场景 • Storm分组机制

    Storm如何保证可靠的消息处理

    Storm可以保证从Spout发出的每个消息都能被完全处理。Storm的可靠性机制是完全分布式的(distributed),可伸缩的(scalable),容错的(fault-tolerant)。本文介绍了Storm如何保证可靠性以及作为Storm使用者,我们需要...

    storm-可靠机制

    Storm的可靠性是指Storm会告知用户每一个消息单元是否在一个指定的时间(timeout)内被完全处理。完全处理的意思是该MessageId绑定的源Tuple以及由该源Tuple衍生的所有Tuple都经过了Topology中每一个应该到达的Bolt的...

    storm基础框架分析

    2、Storm处理消息时会根据Topology生成一棵消息树,Storm如何跟踪每个消息、如何保证消息不丢失以及如何实现重发消息机制?上篇:storm是如何保证atleastonce语义的回答了第2个问题。本篇来建立一个基本的背景,来...

    storm流数据处理开发应用实战(linux实验环境,storm搭建完毕后的开发)

    linux实验环境,storm搭建完毕后的开发。eclipse开发环境,大数据界hello world——wordcount详解,bolt、分组机制、storm DRPC详解

Global site tag (gtag.js) - Google Analytics