ZBlog

1567669552400

Storm的详细分析

Storm人的概述

  • Storm是Twitter开源的一个实时处理框架
  • Storm能够实现高频数据和大规模数据的实时处理
Storm与MapReduce的区别Storm
type MapReduce Storm
数据来源 hdfs上TB级别历史数据 实时新增的某一条数据
处理过程 map阶段和reduce阶段 可以有很多阶段包含spout以及bolt
是否会结束 执行完结束 不会结束
处理速度 主要以执行TB级别数据速度较慢 只处理新增数据速度很快
适用场景 处理批数据不讲时效性 处理新增数据将时效性
Spark Streaming与Storm的区别
type Spark Streaming Storm
计算模型 是近实时处理框架 全实时处理框架
延迟度 最高支持秒级别的延迟 可以支持毫秒级别的延迟
吞吐量 因为是批处理所以吞吐量高 吞吐量相对来说较低
动态调整并行度 不支持 支持
事务机制 支持但是不够完善 支持且完善
Storm各个组件解释
  • Topology:用于封装一个实时计算应用程序的逻辑
  • Stream:消息流,是一个没有边界的tuple序列,这些tuple会以分布式的方式进行创建以及处理
  • Spout:消息源,消息的生产者,会从外部源获取消息然后向Topology发出:tuple
  • Bolt:消息处理者,消息的处理逻辑被封装到bolt之中,处理输入的数据然后产生新的输出数据流
Storm的设计思想
  • 是对stream流的一个抽象即一个不间断的连续tuple
  • 将流中的元素抽象为一个tuple,一个tuple就是一个值列表value list,list中的每个value都有一个name,并且这个value可以是很多数据类型例如基本类型、字符类型等
  • 每一个stream流都有一个数据源,称为Spout
  • stream从spout中获取不间断数据tuple需要经过处理。处理的过程就是stream流转换的过程称为bolt,bolt可以消费任意数量的流,它是将stream汇总的tuple挨个实时进行处理转换成一个新的stream流经过多个bolt处理就可以得到目标数据
  • spout+tuple+bolt这个过程可以称为是Topology拓扑。Topology是Storm中最高的一个抽象概念他可以被提交到集群中执行
  • Topology的每个节点都要指定他所发射数据的name,其他节点只需要订阅该name就可以接收数据进行处理
Topology的整个流程
  • 如果将stream比作是一列火车的话 spout就是这列火车的始发站每一节车厢就是一个tuple乘客就是tuple中的values 中间的站点就相当于是bolt进行处理上下乘客终点站就相当于stream的目标数据

1566209818229

Storm的整体架构图

1566215107025

Storm的简单实例开发
  • 需求:一个源源不断的数据1,2,3,4……求每出现一个数字就要计算出现的所有数字的和

  • 开发过程

    • 在IDE中创建maven工程

    • 在pom中添加、Storm依赖

      1
      2
      3
      4
      5
      6
      <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>1.0.6</version>
      <scope>provided</scope>
      </dependency>
    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      package Storme;

      import java.util.Map;

      import org.apache.storm.Config;
      import org.apache.storm.LocalCluster;
      import org.apache.storm.generated.StormTopology;
      import org.apache.storm.spout.SpoutOutputCollector;
      import org.apache.storm.task.OutputCollector;
      import org.apache.storm.task.TopologyContext;
      import org.apache.storm.topology.OutputFieldsDeclarer;
      import org.apache.storm.topology.TopologyBuilder;
      import org.apache.storm.topology.base.BaseRichBolt;
      import org.apache.storm.topology.base.BaseRichSpout;
      import org.apache.storm.tuple.Fields;
      import org.apache.storm.tuple.Tuple;
      import org.apache.storm.tuple.Values;
      import org.apache.storm.utils.Utils;

      /**
      * 需求:实现数字累加求和
      * 分析:
      * 需要有一个spout负责源源不断的产生从1开始的递增数字
      * 还需要有一个bolt负责对spout产生的数据进行累加求和,并且把结果打印到控制台
      * 最后把这个spout和bolt组装成一个topology
      *
      */
      public class WordCount {
      /**
      * 实现自己的数据源spout,
      * 该spout负责源源不断产生从1开始的递增数字
      *
      */
      public static class MySpout extends BaseRichSpout{

      private Map conf;//这里面存储配置信息
      private TopologyContext context;//代表上下文
      private SpoutOutputCollector collector;//收集器,主要负责向外面发射数据

      /**
      * 是一个初始化的方法,这个方法在本实例运行的之后,首先被调用,仅且仅被调用一次
      * 所以这个方法内一般放一些初始化的代码
      * 例子:针对操作mysql数据的案例,使用jdbc获取数据库连接的代码需要放到这里面实现
      */
      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.conf = conf;
      this.context = context;
      this.collector = collector;
      //System.err.println("spout------"+conf.get("name"));
      }
      /**
      * 这个方法会被循环调用
      */
      int i = 1;
      public void nextTuple() {
      // 注意:针对需要发射的数据,需要封装成tuple,可以使用storm中的values对象快速封装tuple
      System.out.println("spout:"+i);
      this.collector.emit(new Values(i++));
      // 让线程每发射一条数据,休息1秒
      Utils.sleep(1000);
      }
      /**
      * 声明输出字段
      * 定义两个组件之间数据传输的一个规则
      * 注意:只要这个组件(spout/spout)向外发射了数据,那么这个declareOutputFields就需要实现
      */
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
      // 注意:Fields中的字段列表和Values中的数据列表是一一对应的
      declarer.declare(new Fields("num"));
      }
      }
      /**
      * 聚合的Bolt,负责把Spout发射出来的数据进行累加求和,并且打印到控制台
      *
      */
      public static class SumBolt extends BaseRichBolt{
      private Map stormConf;
      private TopologyContext context;
      private OutputCollector collector;
      /**
      * prepare是一个初始化方法,只会执行一次,这里面也是可以放一些初始化的代码
      */
      public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
      this.stormConf = stormConf;
      this.context = context;
      this.collector = collector;
      //System.err.println("bolt------"+stormConf.get("name"));
      }
      int sum = 0;
      /**
      * 这个方法也是会被循环调用
      * 主要上一个组件向外发射了数据,那么这个方法就会被调用一次
      */
      public void execute(Tuple input) {
      //input.getInteger(0);// 通过角标获取数据
      Integer num = input.getIntegerByField("num");
      sum += num;
      System.out.println("和为:"+sum);
      }
      /**
      * 注意:这个方法在这里就不需要实现了,因为这个bolt没有向下一个组件发射数据
      */
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
      }
      }
      public static void main(String[] args) {
      TopologyBuilder builder = new TopologyBuilder();
      // 组装spout
      builder.setSpout("spoutid", new MySpout());
      // 组装bolt,并且告诉bolt接收哪个组件的数据
      builder.setBolt("bolt-1", new SumBolt()).shuffleGrouping("spoutid");
      StormTopology createTopology = builder.createTopology();
      // 通过代码创建一个本地集群
      LocalCluster localCluster = new LocalCluster();
      String topologyName = WordCount.class.getSimpleName();
      Config config = new Config();
      config.put("name", "zs");
      // 把代码提交到本地集群中运行
      localCluster.submitTopology(topologyName, config, createTopology);
      }
      }

Storm核心之并行度

组件解释
  • worker:worker是一个进程,每一个worker进程里面都执行的是一个Topology的任务(不会出现一个worker执行多个Topology的任务)。一个worker中会启动一个或多个executor线程来执行Topology的spout或者bolt组件。一个Topology会使用一个或者多个worker来执行任务
  • executor:是worker进程内部启动的独立线程,每一个executor会产生一个或者多个task(storm默认是一个task即一个spout或者bolt有一个task,如果有多个task,executor会循环调用所有task中的实例)
  • task:是最终运行spout或者bolt中具体代码的执行单元。Topology启动(spout或者bolt)的task的数目是不变的,但是executor线程的数量可以动态进行调整(例如:1个executor线程可以执行该(spout或bolt)的1个或多个task实例)。这意味着,对于1个(spout或bolt)存在这样的条件:#threads<=#tasks(即:线程数小于等于task数目)。默认情况下task的数目等于executor线程数目,即1个executor线程只运行1个task。
  • 默认情况下,一个supervisor节点最多可以启动4个worker进程,每一个topology默认占用一个worker进程,每个spout或者bolt会占用1个executor,每个executor启动1个task。

1566213893905

提高Storm组件的并行度
  • worker(slot)【间接】
    • 默认一个节点最多可以启动四个worker进程,可以修改进程数量strom-core.jar/defaults.yaml/supervisor.slots.ports
    • 默认一个Topology只有一个worker进程,可以通过代码指定一个Topology使用多个worker进程config.setNumWorkers(workersnum)
    • 注意:如果worker使用完在提交Topology就不会执行,会处于等待状态。worker之是通过Netty通信的
  • executor【直接】
    • 默认情况下一个executor只会运行一个task,可以直接通过代码修改增加task数量,会直接提高Storm组件的并行度
    • builder.setSpout(id, spout,parallelism_hint);
    • builder.setBolt(id, bolt,parallelism_hint);
  • task【间接】
    • 通过boltDeclarer.setNumTasks(num);来设置实例的个数
    • executor的数量会小于等于task的数量(为了rebalance)
弹性计算rebalance
  • 前提是Topology中的task数量要大于executor线程数
  • 通过shell调整
    • storm rebalance mytopology -w 10 -n 5 -e blue-spout=3 -e yellow-bolt=10
    • 注意:acker的树木运行时是不会变化的,所以多指定几个worker进程,acker的数量也不会增加
    • -w:表示超时时间,Rebalance会在一个超时时间内注销掉Topology,然后在集群中重新分配worker
    • -n:表示的是worker的数量
    • -e:调整组件的并行度
    • 注:-n 以及 -e 都可以单独使用或者组合起来使用
  • 通过UI界面进行调整,不建议使用所以就不具体解释使用方法了
并行度设置多少合适
  • 单spout每秒大概可以发送500个tuple
  • 单bolt每秒大概可以接收2000个tuple
  • 单acker每秒大概可以接收6000个tuple
  • 根据需要进行调整

博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议

本站使用 Blog Zhao 作为主题 , 总访问量为 次 。
载入天数...载入时分秒...