ZBlog

1567329814966

Flink详细介绍

  • 1567329798038
  • 1567329931681

DataSource

  • source是程序的数据源输入,可以通过StreamExecutionEnvironment.addSource(sourceFunction来给程序添加一个source

  • Flink提供了大量已经实现好的source方法,我们也可以自定义source

    • 通过实现sourceFunction接口来自定义无并行度的source
    • 或者你也可以通过实现ParallelSourceFunction 接口
      or 继承RichParallelSourceFunction 来自定义有并行度的source。
  • source的类型

    • 基于socket
      • socketTextStream
        从socket中读取数据,元素可以通过一个分隔符切开。
    • 基于集合
      • fromCollection(Collection)
      • 通过java的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。
    • 自定义输入
      • addSource 可以实现读取第三方数据源的数据
      • 系统内置提供了一批connectors,连接器会提供对应的source支持
  • 内置connectors 连接器

    • Apache Kafka (source/sink)
    • RabbitMQ (source/sink)
    • Apache ActiveMQ (source/sink)
  • source的容错性保证

    • Source 语义保证 备注
      kafka exactly once(仅一次) 建议使用0.10及以上
      Collections exactly once
      Files exactly once
      Sockets at most once

Transformation

  • map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
  • flatmap:输入一个元素,可以返回零个,一个或者多个元素
  • filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
  • keyby:根据指定的key进行分组,相同key的数据会进入同一个分区
  • reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
  • aggregations:sum(),min(),max()等
  • window:后面会详细描述
  • union:合并多个流,新的流会包含所有流中的数据 注意:所有合并的流类型必须是一致的
  • connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法。
    • CoMap, CoFlatMap:在ConnectedStreams中需要使用这种函数,类似于map和flatmap

sink

  • 输出的类型

    • writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
    • print()/printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
    • 自定义输出addSink 例如:kafka Redis等
  • 内置connectors连接器

    • Apache Kafka (source/sink)
    • Apache Cassandra (sink)
    • Elasticsearch (sink)
    • Hadoop FileSystem (sink)
    • RabbitMQ (source/sink)
    • Apache ActiveMQ (source/sink)
    • Redis (sink)
  • sink的容错性保证

    • Sink 语义保证 备注
      hdfs exactly once
      elasticsearch at least once
      kafka produce at least once/exactly once Kafka 0.9 and 0.10提供at least once Kafka 0.11提供exactly once
      file at least once
      redis at least once
  • 自定义sink

    • 实现自定义sink
      • 实现SinkFunction接口
      • 继承RichSinkFunction

DataSource

  • 基于文件
    • readTextFile(path)
  • 基于集合
    • fromCollection(Collection)

Transformations

  • map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
  • flatmap:输入一个元素,可以返回零个,一个或者多个元素
  • mappartition:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】
  • filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
  • reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
  • Aggregate:sum、max、min等
  • distinct:返回一个数据集中去重之后的元素,data.distinct()
  • cross:获取两个数据集的笛卡尔积
  • union:返回两个数据集的总和,数据类型需要一致
  • first-n: 获取集合中的前N个元素
  • Sort Partition:在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用来完成对多个字段的排序

sink

  • writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
  • writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法
  • print():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
  • Flink针对流处理和批处理提供了相关的API-Table API和SQL。
  • 注意:目前Table API和SQL功能尚未全部完成,官方正在积极开发中。
  • 暂时不推荐使用 故暂时不解释其用法

Broadcast广播变量

  • 广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks
  • 广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的
  • 一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
  • 用法:
    • 初始化数据
      • DataSet toBroadcast = env.fromElements(1, 2, 3);
    • 广播数据
      • .withBroadcastSet(toBroadcast,”broadcastSetName”);
    • 获取广播数据
      • Collection broadcastSet =
        getRuntimeContext().getBroadcastVariable(“broadcastSetName”);
  • 注意:
    • 广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束
    • 广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。

Accumulators 累加器

  • Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化
  • 可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。

Counters 计数器

  • Counter是一个具体的累加器(Accumulator)实现
    • IntCounter, LongCounter 和 DoubleCounter
  • 用法:
    • 创建累加器
      • private IntCounter numLines = new IntCounter();
    • 注册累加器
      • getRuntimeContext().addAccumulator(“num-lines”,this.numLines);
    • 使用累加器
      • this.numLines.add(1);
    • 获取累加器的结果
      • myJobExecutionResult.getAccumulatorResult(“num-lines”)

广播变量 Broadcast 与 累加变量 Accumlators的区别

  • Broadcast(广播变量)允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可以进行共享,但是不可以进行修改
  • Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作。

Window窗口

  • 1567337912453
  • 聚合事件例如计数、求和,在流上的工作方式与批处理不同
    • 比如,对流中的所有元素进行计数是不可能的,因为通常流是无限的(无界的)。所以,流上的聚合需要由window 来划定范围,比如 “计算过去的5分钟” ,或者“最后100个元素的和”
    • window是一种可以把无限数据切割为有限数据块的手段
  • 窗口可以是 时间驱动的 【Time Window】(比如:每30秒)或者数据驱动的【Count Window】(比如:每100个元素)
  • widow窗口的类型
    • 1567338078663
    • tumbling windows:滚动窗口 【没有重叠】
      • 1567338109467
    • sliding windows:滑动窗口 【有重叠】
      • 1567338132782
  • widow窗口的应用
    • TimeWindow的应用
      • 1567338320496
    • Count window
      • 1567338375799
    • 自定义window
      • 1567338475470
      • 1567338483107
      • 1567338514024
  • window聚合分类
    • 增量聚合
      • 在窗口中每进一条数据就进行一次聚合计算
      • reduce(reduceFunction)
      • aggregate(aggregateFunction)
      • sum(),min(),max()
      • 1567338666708
      • 实现过程
        • 1567338695987
        • 1567338704045
    • 全量聚合
      • 等窗口中的所有数据到齐以后才进行聚合计算,可以实现对窗口内所有数据的排序等需求
      • apply(windowFunction)
      • process(processWindowFunction) 提供了更多上下文的信息
      • 1567338848386
      • 1567338884621
      • 1567338900562
      • 1567338909177

Time介绍

  • 针对stream中的时间,可以分为以下三种

    • Event Time:事件产生的时间,通常由事件中的时间戳来描述
    • Ingestion Time: 事件进入Flink的时间
    • Processing time : 时间倍处理时当前系统时间
    • 1567339213593
  • Time实例分析

    • 原始日志

      • 2018-10-10 10:00:01,134 INFO executor.Executor: Finished task in state 0.0
    • 数据进入Flink的时间是:2018-10-10 20:00:00,102

    • 数据到达window进行处理的时间是:2018-10-10 20:00:01,100

    • 如果我们想要统计每分钟内接口调用失败的错误日志个数,使用哪个时间才有意义?

      • Flink中,默认Time是ProcessingTime
      • 可以在代码中设置
        • 1567339399845
      • 但是Processing time 并不是我们特别想要的,因为日志文件在传输过程中顺序可能已经发生了变化,而我们想要的是错误日志产生时的信息,就是Event Time,所以我们需要考虑一下Event Time的乱序问题
    • EventTime和Watermarks

    • 在使用eventTime的时候如何处理乱序数据?

      • 我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络延迟等原因,导致乱序的产生,特别是使用kafka的话,多个分区的数据无法保证有序。所以在进行window计算的时候,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark,watermark是用于处理乱序事件的。
      • watermark可以翻译为水位线
    • 有序流与无序流图解

      • 有序流
      • 1567340864838
      • 指定在那个时间点断开进行计算就可以直接断开进行计算
      • 无序流
      • 1567340912113
      • 因为顺序被打乱 指定在某时间断开 其中的数据是错乱的无法进行立即聚合处理
    • watermarks的使用

    • watermarks的生成方式有两种

      • With Periodic Watermarks:周期性的触发watermark 的生成和发送
      • With Punctuated Watermarks:基于某些事件触发watermark 的生成和发送
      • 第一种是我们常用的方法,所以就第一种来进行详细的分析
    • 参考官方文档中With Periodic Watermarks的使用方法

      • 1567341414170

      • 代码中的extractTimestamp方法是从数据本身提取数据的Event time,getCurrentWatermar方法是获取当前水位线,利用currentMaxTimestamp - maxOutOfOrderness ,maxOutOfOrderness表示的是允许数据最大乱序时间

        所以在这里我们需要使用的话就需要实现接口AssignerWithPeriodicWatermarks

      • 1567341604475

    • 实现watermark的相关代码

    • 从socket模拟接收数据,然后使用map进行处理,后面在调用assignTimestampsAndWatermarks方法抽取timestamp并生成watermark。在调用window 打印信息来验证window被触发的时机

    • 具体代码如下:

    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      package Flink;

      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.java.tuple.Tuple;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.streaming.api.TimeCharacteristic;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
      import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
      import org.apache.flink.streaming.api.watermark.Watermark;
      import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
      import org.apache.flink.streaming.api.windowing.time.Time;
      import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
      import org.apache.flink.util.Collector;
      import javax.annotation.Nullable;
      import java.text.SimpleDateFormat;
      import java.util.ArrayList;
      import java.util.Collections;
      import java.util.Iterator;
      import java.util.List;
      /**
      * Watermark 案例
      */

      public class StreamingWindowWatermark {
      public static void main(String[] args) throws Exception {
      //定义 socket 的端口号
      int port = 9001;
      // 获取运行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      //设置使用 eventtime,默认是使用 processtime
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      //设置并行度为 1,默认并行度是当前机器的 cpu 数量
      env.setParallelism(1);
      //连接 socket 获取输入的数据
      DataStream<String> text = env.socketTextStream("hadoop100", port, "\n");
      //解析输入的数据
      DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
      @Override
      public Tuple2<String, Long> map(String value)
      throws Exception {
      String[] arr = value.split(",");
      return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
      }
      });
      //抽取 timestamp 和生成 watermark
      DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
      Long currentMaxTimestamp = 0L;
      // 最大允许的乱序时间是 10s
      final Long maxOutOfOrderness = 10000L;
      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

      /***
      * 定义生成 watermark 的逻辑 * 默认 100ms 被调用一次
      * */
      @Nullable
      @Override
      public Watermark getCurrentWatermark() {
      return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
      }

      //定义如何提取 timestamp
      @Override
      public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
      long timestamp = element.f1;
      currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
      System.out.println("key:" + element.f0 + ",eventtime:[" + element.f1 + "|" + sdf.format(element.f1) + "], currentMaxTimestamp:[" + currentMaxTimestamp + "|" + sdf.format(currentMaxTimestamp) + "],watermark:[" + getCurrentWatermark().getTimestamp() + "|" + sdf.format(getCurrentWatermark().getTimestamp()) + "]");
      return timestamp;
      }
      });
      //分组,聚合
      DataStream<String> window = waterMarkStream.keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))// 按 照 消 息 的 EventTime 分配窗口,和调用 TimeWindow 效果一样
      .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
      /*** 对 window 内的数据进行排序,保证数据的顺序
      * * @param tuple * @param window * @param input * @param out * @throws Exception
      * */
      @Override
      public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
      String key = tuple.toString();
      List<Long> arrarList = new ArrayList<Long>();
      Iterator<Tuple2<String, Long>> it = input.iterator();
      while (it.hasNext()) {
      Tuple2<String, Long> next = it.next();
      arrarList.add(next.f1);
      }
      Collections.sort(arrarList);
      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
      String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1)) + "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
      out.collect(result);
      }
      });
      //测试-把结果打印到控制台即可
      window.print();
      //注意:因为 flink 是懒加载的,所以必须调用 execute 方法,上面的代码才会执行
      env.execute("eventtime-watermark");
      }
      }
    • 程序简单解释

      • 接收socket传来的数据
      • 将每行数据按照逗号分隔,每行数据调用map 转换成tuple<String,Long>类型。其中tuple 中的第一个元素代表具体的数据,第二个元素代表数据的eventtime
      • 抽取timestamp , 生成watermar , 允许的最大乱序时间是10s , 并打印
        (key,eventtime,currentMaxTimestamp,watermark)等信息
      • 分组聚合,window 窗口大小为3 秒,输出(key,窗口内元素个数,窗口内最早元素的
        时间,窗口内最晚元素的时间,窗口自身开始时间,窗口自身结束时间)
    • 测试数据

      • 1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        测试数据-1如下:watermark+window处理乱序数据
        0001,1538359882000 2018-10-01 10:11:22
        0001,1538359886000 2018-10-01 10:11:26
        0001,1538359892000 2018-10-01 10:11:32
        0001,1538359893000 2018-10-01 10:11:33
        0001,1538359894000 2018-10-01 10:11:34
        0001,1538359896000 2018-10-01 10:11:36
        0001,1538359897000 2018-10-01 10:11:37

        0001,1538359899000 2018-10-01 10:11:39
        0001,1538359891000 2018-10-01 10:11:31
        0001,1538359903000 2018-10-01 10:11:43

        0001,1538359892000 2018-10-01 10:11:32
        0001,1538359891000 2018-10-01 10:11:31



        测试数据-2如下:延迟数据被丢弃
        0001,1538359890000 2018-10-01 10:11:30
        0001,1538359903000 2018-10-01 10:11:43


        0001,1538359890000 2018-10-01 10:11:30
        0001,1538359891000 2018-10-01 10:11:31
        0001,1538359892000 2018-10-01 10:11:32



        测试数据-3如下:allowedLateness
        0001,1538359890000 2018-10-01 10:11:30
        0001,1538359903000 2018-10-01 10:11:43
        0001,1538359890000 2018-10-01 10:11:30
        0001,1538359891000 2018-10-01 10:11:31
        0001,1538359892000 2018-10-01 10:11:32
        0001,1538359904000 2018-10-01 10:11:44
        0001,1538359890000 2018-10-01 10:11:30
        0001,1538359891000 2018-10-01 10:11:31
        0001,1538359892000 2018-10-01 10:11:32
        0001,1538359905000 2018-10-01 10:11:45
        0001,1538359890000 2018-10-01 10:11:30
        0001,1538359891000 2018-10-01 10:11:31
        0001,1538359892000 2018-10-01 10:11:32


        测试数据-4如下:sideOutputLateData
        0001,1538359890000 2018-10-01 10:11:30
        0001,1538359903000 2018-10-01 10:11:43
        0001,1538359890000 2018-10-01 10:11:30
        0001,1538359891000 2018-10-01 10:11:31
        0001,1538359892000 2018-10-01 10:11:32


        测试数据-5如下:多并行度下的watermark-8
        0001,1538359882000 2018-10-01 10:11:22
        0001,1538359886000 2018-10-01 10:11:26
        0001,1538359892000 2018-10-01 10:11:32
        0001,1538359893000 2018-10-01 10:11:33
        0001,1538359894000 2018-10-01 10:11:34
        0001,1538359896000 2018-10-01 10:11:36
        0001,1538359897000 2018-10-01 10:11:37



        测试数据-6如下:
        0001,1538359890000 2018-10-01 10:11:30
        0001,1538359903000 2018-10-01 10:11:43
        0001,1538359908000 2018-10-01 10:11:48
    • 1567345375589

    • 注意:多并行度的情况下,watermark对齐会取所有channel最小的watermark

    • watermarks的生成方式

      • With Periodic Watermarks

        周期性的触发watermark的生成和发送,默认是100ms

        每隔N秒自动向流里注入一个WATERMARK 时间间隔由ExecutionConfig.setAutoWatermarkInterval 决定. 每次调用getCurrentWatermark 方法, 如果得到的WATERMARK 不为空并且比之前的大就注入流中

        可以定义一个最大允许乱序的时间,这种比较常用

        实现AssignerWithPeriodicWatermarks接口

      • With Punctuated Watermarks

        基于某些事件触发watermark的生成和发送

        基于事件向流里注入一个WATERMARK,每一个元素都有机会判断是否生成一个WATERMARK. 如果得到的WATERMARK 不为空并且比之前的大就注入流中

        实现AssignerWithPunctuatedWatermarks接口

    • watermark 与 event time结合使用

      • 触发widow进行合并的条件是 watermark >= window_end_time 并且当前窗口内有数据
      • 这样就可以在允许最大乱序时间内将同一个窗口的数据进行处理,如果数据超过了这个最大允许乱序时间,要怎么解决呢
    • late element 延迟数据的处理方案

      • 丢弃 系统默认的方法
        • 直接将超过允许最大乱序时间的数据丢弃,不做任何处理
      • allowedLateness 指定允许数据延迟时间
        • 再给迟到的数据一个提供一个宽容时间
        • 加上这个时间以后 在超过最大允许乱序时间以后 在宽容时间内 如果数据出现了 依然可以出发widow执行。
      • sideOutputLateDate 收集迟到的数据
        • 通过sideOutputLateDate将迟到的数据进行统一收集进行存储,方便 以后的问题排查处理
    • Flink应该如何设置最大乱序时间

      • 这个要结合自己的业务以及数据情况去设置。如果maxOutOfOrderness设置的太小,而自身数据发送时由于网络等原因导致乱序或者late太多,那么最终的结果就是会有很多单条的数据在window中被触发,数据的正确性影响太大
      • 对于严重乱序的数据,需要严格统计数据最大延迟时间,才能保证计算的数据准确,延时设置太小会影响数据准确性,延时设置太大不仅影响数据的实时性,更加会加重Flink作业的负担,不是对eventTime要求特别严格的数据,尽量不要采用eventTime方式来处理,会有丢数据的风险。

TaskManager 与 Slot

  • Flink的每个TaskManager为集群提供solt。 solt的数量通常与每个TaskManager节点的可用CPU内核数成比例。一般情况下你的slot数是你每个节点的cpu的核数。
    • 1567433392285
    • 1567433422808

并行度(Parallel)

  • 一个Flink程序由多个任务组成(source、transformation和 sink)。 一个任务由多个并行的实例(线程)来执行, 一个任务的并行实例(线程)数目就被称为该任务的并行度。
  • 一个任务的并行度设置可以从多个层次指定
    • Operator Level(算子层次)
    • Execution Environment Level(执行环境层次)
    • Client Level(客户端层次)
    • System Level(系统层次)
  • 并行度设置之Operator Level
    • 一个算子、数据源和sink的并行度可以通过调用 setParallelism()方法来指定
    • 1567433659150
  • 并行度设置之Execution Environment Level
    • 执行环境(任务)的默认并行度可以通过调用setParallelism()方法指定。为了以并行度3来执行所有的算子、数据源和data sink, 可以通过如下的方式设置执行环境的并行度
    • 执行环境的并行度可以被算子的并行度覆盖重写
    • 1567433733701
  • 并行度设置之Client Level
    • 并行度可以在客户端将job提交到Flink时设定。
    • 对于CLI客户端,可以通过-p参数指定并行度
    • ./bin/flink run -p 10 WordCount-java.jar
  • 并行度设置之System Level
    • 在系统级可以通过设置flink-conf.yaml文件中的parallelism.default属性来指定所有执行环境的默认并行度
  • 1567433861193
  • 1567433868135

Flink的checkpoint机制

checkpoint简介

  • 为了保证state的容错性,Flink需要对state进行checkpoint
  • Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常
  • Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提:
    • 持久化的source,它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等)
    • 用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)

checkpoint的配置

  • 默认的情况下 checkpoint是disabled不可用状态,想要使用的时候先开启

  • checkpoint的checkPointMode有两种,Exactly-once(默认)和At-least-once

  • Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
    env.enableCheckpointing(1000);
    // 高级选项:
    // 设置模式为exactly-once (这是默认值)
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    // 同一时间只允许进行一个检查点
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
    env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

state Backend 状态的后端存储

  • 默认情况下,state会保存在taskmanager的内存中,checkpoint会存储在JobManager的内存中。

  • state 和checkpoint的存储位置取决于State Backend的配置 可以在程序中修改

    • env.setStateBackend(…)
  • 三种state Backend

  • MemoryStateBackend

    • state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中
    • 基于内存的state backend在生产环境下不建议使用
  • FsStateBackend

    • state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中
    • 可以使用hdfs等分布式文件系统
  • RocksDBStateBackend

    • RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到filesystem中。fail over的时候从filesystem中恢复到本地
    • RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用
  • 修改state Backend

    • 单任务调整

    • 修改当前任务代码

    • 1
      2
      3
      env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
      or new MemoryStateBackend()
      or new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】
    • 全局调整

    • 修改flink-conf.yaml

    • 1
      2
      3
      state.backend: filesystem
      state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
      注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

kafka-connector简介

  • Kafka中的partition机制和Flink的并行度机制深度结合
  • Kafka可以作为Flink的source和sink
  • 任务失败,通过设置kafka的offset来恢复应用

kafka-consumer消费者策略

  • setStartFromGroupOffsets()【默认消费策略】
    • 默认读取上次保存的offset信息
    • 如果是应用第一次启动,读取不到上次的offset信息,则会根据这个参数auto.offset.reset的值来进行消费数据
  • setStartFromEarliest()
    • 从最早的数据开始进行消费,忽略存储的offset信息
  • setStartFromLatest()
    • 从最新的数据进行消费,忽略存储的offset信息
  • setStartFromSpecificOffsets(Map<KafkaTopicPartition,Long>)
    • 1567435618888

kafka的容错

  • 当checkpoint机制开启的时候,Kafka Consumer会定期把kafka的offset信息还有其他operator的状态信息一块保存起来。当job失败重启的时候,Flink会从最近一次的checkpoint中进行恢复数据,重新消费kafka中的数据。
  • 为了能够使用支持容错的kafka Consumer,需要开启checkpoint
    • env.enableCheckpointing(5000); // 每5s checkpoint一次

动态加载topic

  • 1567435791533

kafka consumer offset 自动提交

  • 针对job是否开启checkpoint来区分
    • Checkpoint关闭时: 可以通过下面两个参数配置
      • enable.auto.commit
      • auto.commit.interval.ms
    • Checkpoint开启时:当执行checkpoint的时候才会保存offset,这样保证了kafka的offset和checkpoint的状态偏移量保持一致。
      • 可以通过这个参数设置setCommitOffsetsOnCheckpoints(boolean)这个参数默认就是true。表示在checkpoint的时候提交offset 此时,kafka中的自动提交机制就会被忽略

Kafka Producer

  • 1567436059578

kafka producer 的容错 kafka0.9 与 0.10

  • 如果Flink开启了checkpoint,针对FlinkKafkaProducer09和FlinkKafkaProducer010 可以提供 at-least-once的语义,还需要配置下面两个参数
    • setLogFailuresOnly(false)
    • setFlushOnCheckpoint(true)
  • 注意:建议修改kafka 生产者的重试次数
    • retries【这个参数的值默认是0】

Kafka Producer的容错-Kafka 0.11

  • 如果Flink开启了checkpoint,针对FlinkKafkaProducer011 就可以提供
    exactly-once的语义
  • 但是需要选择具体的语义
    • Semantic.NONE
    • Semantic.AT_LEAST_ONCE【默认】
    • Semantic.EXACTLY_ONCE

博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议

本站使用 Blog Zhao 作为主题 , 总访问量为 次 。
载入天数...载入时分秒...