ZBlog

1567669566793

Storm高级

Storm核心之流分组

1566219022407

stream grouping 分类
  • Shuffle Grouping:随机分组。将stream中的tuple缓存后随机发放给所有bolt,可以使每个bolt中的数据量大致相等(可以较好的实现负载均衡)
  • Fields Grouping:按字段分组,例如按groupID字段进行分组,将同一个分组的tuple分到统一任务中
  • All Grouping:广播发送,每一个tuple都会发送到所有任务中,所以每一个bolt都会有所有的tuple
  • Global Grouping:全局分组,这个tuple会被分配到storm中的某一个bolt,具体一点就是分配到ID值最小的一个bolt之中
  • Non Grouping:随机分派,效果和shuffle一样
  • Direct Grouping:直接分组,将tuple发送给制定好的任务中
  • localOrShuffleGrouping:指如果目标Bolt 中的一个或者多个Task 和当前产生数据的Task在同一个Worker 进程里面,那么就走内部的线程间通信,将Tuple 直接发给在当前Worker进程的目的Task。否则,同shuffleGrouping。

Storm可靠性剖析

Storm可能出现的问题
  • worker进程死掉
  • supervisor进程死掉
  • nimbus进程死掉
  • 节点宕机
解决方案
  • (acker机制)ack/fail消息确认机制(确保一个tuple被完全处理)
    • 在spout中发射tuple的时候需要同时发送messageid,这样才相当于开启了消息确认机制
    • 如果你的topology里面的tuple比较多的话,那么把acker的数量设置多一点,效率会高一点。
    • 通过config.setNumAckers(num)来设置一个topology里面的acker的数量,默认值是1。
    • 注意:acker用了特殊的算法,使得对于追踪每个spout tuple的状态所需要的内存量是恒定的(20 bytes)
    • 注意:如果一个tuple在指定的timeout(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS默认值为30秒)时间内没有被成功处理,那么这个tuple会被认为处理失败了。

Storm定时器分析

  • 可以指定每隔一段时间将数据整合一次存入数据库
    • 在main中设置conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);// 设置本Bolt定时发射数据
    • 在bolt中使用下面代码判断是否是触发用的bolt tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
    • 1566305060434

StormUI的详解

1566219892284

  • deactive:未激活(暂停)
  • emitted:emitted tuple数
    • 与emitted的区别:如果一个task,emitted一个tuple到2个task中,则transferred tuple数是emitted tuple数的两倍
  • completelatency: spout emitting 一个tuple到spout ack这个tuple的平均时间(可以认为是tuple以及该tuple树的整个处理时间)
  • processlatency: bolt收到一个tuple到bolt ack这个tuple的平均时间,如果没有启动acker机制,那么值为0
  • execute latency:bolt处理一个tuple的平均时间,不包含acker操作,单位是毫秒(也就是bolt
    执行 execute 方法的平均时间)
  • capacity:这个值越接近1,说明bolt或者spout基本一直在调用execute方法,说明并行度不够,需要扩展这个组件的executor数量。(调整组件并行度的依据)
  • 总结:execute latency和proces latnecy是处理消息的时效性,而capacity则表示处理能力是否已经饱和,从这3个参数可以知道topology的瓶颈所在。

Storm的优化

并行度的优化
  • worker为storm提供工作进程,程序的并行度可以设置(包括spout和bolt的并行度,如果有acker的话还包括acker的并行度),并行度即为executor的数目。
  • 一般情况下worker与executor的比例是一比十到十五,也可以根据实际需要修改。
worker的优化
  • CPU 16核,建议配置20个worker。CPU 24或32核,30个worker
  • 默认情况下,Storm启动worker进程时,JVM的最大内存是768M,可以通过在Strom的配置文件storm.yaml中设置worker的启动参数worker.childopts: “-Xmx2048m”
  • 一个topology使用的worker数量,12个是比较合理的,这个时候吞吐量和整体性能最优。如果多增加worker进程的话,会将一些原本线程间的内存通信变为进程间的网络通信。
acker优化
  • 如果可靠性对你来说不是那么重要,那么你可以通过不跟踪这些tuple树来获取更好的性能。不去跟踪消息的话会使得系统里面的消息数量减少一半,因为对于每一个tuple都要发送一个ack消息。
  • 三种去掉可靠性的方法
    • 第一是把config.setNumAckers(0)设置为0,在这种情况下,storm会在spout发射一个tuple之后马上调用spout的ack方法。也就是说这个tuple树不会被跟踪。
    • 第二个方法是在tuple层面去掉可靠性。你可以在发射tuple的时候不指定messageid来达到不跟踪spout中tuple的目的。
    • 最后一个方法是如果你对于一个tuple树里面的某一部分到底成不成功不是很关心,那么可以在发射这些tuple的时候unanchor它们(anchor是锚定的意思,unanchor表示不把当前这个tuple包含到tuple树中,也就是说不跟踪这个消息了)。这样这些tuple就不在tuple树里面, 也就不会被跟踪了。

雪崩问题的出现原因以及解决方法

  • 原因:spout发送的速度大于bolt接收的速度,导致数据堆积,不断消耗内存,最终系统崩溃,并引起数据链上多节点down掉。
  • 解决方案
    • 增加bolt的并行度 增加它接收的速度
    • 可以通过topology.max.spout.pending来控制spout发送消息的速度,通过代码这样设置config.setMaxSpoutPending(num);
      • 注意:这个参数表示,当下游的bolt还有topology.max.spout.pending个 tuple 没有消费完时,spout会停止调用nexttuple方法发射数据。等待下游bolt去消费,当tuple的个数少于topology.max.spout.pending个数时,spout 会继续发射数据(这个属性只对可靠消息处理有用,也就是说需要启用acker消息确认机制,在spout中emit数据的时候需要带有messageid)

博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议

本站使用 Blog Zhao 作为主题 , 总访问量为 次 。
载入天数...载入时分秒...