Flink DataStreamAPI与DataSetAPI应用案例实战-Flink牛刀小试

栏目: 服务器 · 发布时间: 7年前

内容简介:版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

1 DataStreamAPI

1.1 DataStream Data Sources

  • source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。

  • flink提供了大量的已经实现好的source方法,可以自定义source 通过实现sourceFunction接口来自定义无并行度的source。

    1 使用并行度为1的source
      public class MyNoParalleSource implements SourceFunction<Long>{
      
          private long count = 1L;
      
          private boolean isRunning = true;
      
          /**
           * 主要的方法
           * 启动一个source
           * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
           *
           * @param ctx
           * @throws Exception
           */
          @Override
          public void run(SourceContext<Long> ctx) throws Exception {
              while(isRunning){
                  ctx.collect(count);
                  count++;
                  //每秒产生一条数据
                  Thread.sleep(1000);
              }
          }
          * 取消一个cancel的时候会调用的方法
          @Override
          public void cancel() {
              isRunning = false;
          }
      }
      
      2 Main方法执行
      public class StreamingDemoWithMyNoPralalleSource {
          public static void main(String[] args) throws Exception {
          //获取Flink的运行环境
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          //获取数据源
          DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1); //注意:针对此source,并行度只能设置为1
          DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
              @Override
              public Long map(Long value) throws Exception {
                  System.out.println("接收到数据:" + value);
                  return value;
              }
          });
    
      //每2秒钟处理一次数据
      DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
      //打印结果
      sum.print().setParallelism(1);
      String jobName = StreamingDemoWithMyNoPralalleSource.class.getSimpleName();
      env.execute(jobName);
       }
      }   
    复制代码
  • 可以通过实现ParallelSourceFunction接口或者继承RichParallelSourceFunction来自定义有并行度的source。继承RichParallelSourceFunction的那些SourceFunction意味着它们都是并行执行的并且可能有一些资源需要open/close

    public class MyParalleSource implements ParallelSourceFunction<Long> {
      private long count = 1L;
      private boolean isRunning = true;
    
      /**
       * 主要的方法
       * 启动一个source
       * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
       *
       * @param ctx
       * @throws Exception
       */
      @Override
      public void run(SourceContext<Long> ctx) throws Exception {
          while(isRunning){
              ctx.collect(count);
              count++;
              //每秒产生一条数据
              Thread.sleep(1000);
          }
      }
      /**
       * 取消一个cancel的时候会调用的方法
       *
       */
      @Override
      public void cancel() {
          isRunning = false;
       }
    }
    
    public class StreamingDemoWithMyPralalleSource {
        public static void main(String[] args) throws Exception {
          //获取Flink的运行环境
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
          //获取数据源
          DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(2);
    
          DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
              @Override
              public Long map(Long value) throws Exception {
                  System.out.println("接收到数据:" + value);
                  return value;
              }
          });
    
          //每2秒钟处理一次数据
          DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
    
          //打印结果
          sum.print().setParallelism(1);
    
          String jobName = StreamingDemoWithMyPralalleSource.class.getSimpleName();
          env.execute(jobName);
          }
       }
    
    
       public class MyRichParalleSource extends RichParallelSourceFunction<Long> {
          private long count = 1L;
          private boolean isRunning = true;
          /**
           * 主要的方法
           * 启动一个source
           * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
           * @param ctx
           * @throws Exception
           */
          @Override
          public void run(SourceContext<Long> ctx) throws Exception {
              while(isRunning){
                  ctx.collect(count);
                  count++;
                  //每秒产生一条数据
                  Thread.sleep(1000);
              }
          }
          /**
           * 取消一个cancel的时候会调用的方法
           *
           */
          @Override
          public void cancel() {
              isRunning = false;
          }
          /**
           * 这个方法只会在最开始的时候被调用一次
           * 实现获取链接的代码
           * @param parameters
           * @throws Exception
           */
          @Override
          public void open(Configuration parameters) throws Exception {
              System.out.println("open.............");
              super.open(parameters);
          }
      
          /**
           * 实现关闭链接的代码
           * @throws Exception
           */
          @Override
          public void close() throws Exception {
              super.close();
          }
      }
      
       public class StreamingDemoWithMyRichPralalleSource {
       
           public static void main(String[] args) throws Exception {
              //获取Flink的运行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              //获取数据源
              DataStreamSource<Long> text = env.addSource(new MyRichParalleSource()).setParallelism(2);
      
              DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
                  @Override
                  public Long map(Long value) throws Exception {
                      System.out.println("接收到数据:" + value);
                      return value;
                  }
              });
      
              //每2秒钟处理一次数据
              DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
      
              //打印结果
              sum.print().setParallelism(1);
      
              String jobName = StreamingDemoWithMyRichPralalleSource.class.getSimpleName();
              env.execute(jobName);
              }
          }   
    复制代码
  • 基于文件 readTextFile(path) 读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。

  • 基于socket socketTextStream从socker中读取数据,元素可以通过一个分隔符切开。

    public class SocketDemoFullCount {
          public static void main(String[] args) throws Exception{
              //获取需要的端口号
              int port;
              try {
                  ParameterTool parameterTool = ParameterTool.fromArgs(args);
                  port = parameterTool.getInt("port");
              }catch (Exception e){
                  System.err.println("No port set. use default port 9010--java");
                  port = 9010;
              }
              //获取flink的运行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              String hostname = "SparkMaster";
              String delimiter = "\n";
              //连接socket获取输入的数据
              DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
      
              DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() {
                  @Override
                  public Tuple2<Integer,Integer> map(String value) throws Exception {
                      return new Tuple2<>(1,Integer.parseInt(value));
                  }
              });
      
              intData.keyBy(0)
                      .timeWindow(Time.seconds(5))
                      .process(new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() {
                          @Override
                          public void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out) throws Exception {
                              System.out.println("执行process");
                              long count = 0;
                               for(Tuple2<Integer,Integer> element: elements){
                                  count++;
                              }
                              out.collect("window:"+context.window()+",count:"+count);
                          }
                      }).print();
              //这一行代码一定要实现,否则程序不执行
              env.execute("Socket window count");
          }
      }
    复制代码
  • 基于集合 fromCollection(Collection) 通过 java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。

    public class StreamingFromCollection {
          public static void main(String[] args) throws Exception {
              //获取Flink的运行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              ArrayList<Integer> data = new ArrayList<>();
              data.add(10);
              data.add(15);
              data.add(20);
      
              //指定数据源
              DataStreamSource<Integer> collectionData = env.fromCollection(data);
              //通map对数据进行处理
              DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() {
                  @Override
                  public Integer map(Integer value) throws Exception {
                      return value + 1;
                  }
              });
              //直接打印
              num.print().setParallelism(1);
              env.execute("StreamingFromCollection");
          }
    复制代码

    }

  • 自定义输入 addSource 可以实现读取第三方数据源的数据 系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】

1.2 DataStream Transformations

  • map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作

  • flatmap:输入一个元素,可以返回零个,一个或者多个元素

  • keyBy:根据指定的key进行分组,相同key的数据会进入同一个分区

    dataStream.keyBy("someKey") // 指定对象中的 "someKey"字段作为分组key
      dataStream.keyBy(0) //指定Tuple中的第一个元素作为分组key
      
      注意:以下类型是无法作为key的
      1:一个实体类对象,没有重写hashCode方法,并且依赖object的hashCode方法
      2:一个任意形式的数组类型
      3:基本数据类型,int,long
    复制代码
  • filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下。

    public class StreamingDemoFilter {
          public static void main(String[] args) throws Exception {
              //获取Flink的运行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              //获取数据源
              DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
              DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
                  @Override
                  public Long map(Long value) throws Exception {
                      System.out.println("原始接收到数据:" + value);
                      return value;
                  }
              });
      
              //执行filter过滤,满足条件的数据会被留下
              DataStream<Long> filterData = num.filter(new FilterFunction<Long>() {
                  //把所有的奇数过滤掉
                  @Override
                  public boolean filter(Long value) throws Exception {
                      return value % 2 == 0;
                  }
              });
      
              DataStream<Long> resultData = filterData.map(new MapFunction<Long, Long>() {
                  @Override
                  public Long map(Long value) throws Exception {
                      System.out.println("过滤之后的数据:" + value);
                      return value;
                  }
              });
      
      
              //每2秒钟处理一次数据
              DataStream<Long> sum = resultData.timeWindowAll(Time.seconds(2)).sum(0);
      
              //打印结果
              sum.print().setParallelism(1);
      
              String jobName = StreamingDemoFilter.class.getSimpleName();
              env.execute(jobName);
          }
      }
    复制代码
  • reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值

  • aggregations:sum(),min(),max()等

  • window:在后面单独详解

  • Union:合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的。

    public class StreamingDemoUnion {
      public static void main(String[] args) throws Exception {
          //获取Flink的运行环境
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
          //获取数据源
          DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
    
          DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
    
          //把text1和text2组装到一起
          DataStream<Long> text = text1.union(text2);
    
          DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
              @Override
              public Long map(Long value) throws Exception {
                  System.out.println("原始接收到数据:" + value);
                  return value;
              }
          });
          //每2秒钟处理一次数据
          DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
          //打印结果
          sum.print().setParallelism(1);
          String jobName = StreamingDemoUnion.class.getSimpleName();
          env.execute(jobName);
      }
    复制代码

    }

  • Connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法。

  • CoMap, CoFlatMap:在ConnectedStreams中需要使用这种函数,类似于map和flatmap

    public class StreamingDemoConnect {
      
          public static void main(String[] args) throws Exception {
              //获取Flink的运行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              //获取数据源
              DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
              DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
              SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
                  @Override
                  public String map(Long value) throws Exception {
                      return "str_" + value;
                  }
              });
      
              ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);
      
              SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
                  @Override
                  public Object map1(Long value) throws Exception {
                      return value;
                  }
                  @Override
                  public Object map2(String value) throws Exception {
                      return value;
                  }
              });
              //打印结果
              result.print().setParallelism(1);
              String jobName = StreamingDemoConnect.class.getSimpleName();
              env.execute(jobName);
          }
      }
    复制代码
  • Split:根据规则把一个数据流切分为多个流:

  • Select:和split配合使用,选择切分后的流

    public class StreamingDemoSplit {
          public static void main(String[] args) throws Exception {
              //获取Flink的运行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              //获取数据源
              DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
      
              //对流进行切分,按照数据的奇偶性进行区分
              SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {
                  @Override
                  public Iterable<String> select(Long value) {
                      ArrayList<String> outPut = new ArrayList<>();
                      if (value % 2 == 0) {
                          outPut.add("even");//偶数
                      } else {
                          outPut.add("odd");//奇数
                      }
                      return outPut;
                  }
              });
              
              //选择一个或者多个切分后的流
              DataStream<Long> evenStream = splitStream.select("even");
              DataStream<Long> oddStream = splitStream.select("odd");
      
              DataStream<Long> moreStream = splitStream.select("odd","even");
      
      
              //打印结果
              moreStream.print().setParallelism(1);
      
              String jobName = StreamingDemoSplit.class.getSimpleName();
              env.execute(jobName);
          }
      }
    复制代码

以上所述就是小编给大家介绍的《Flink DataStreamAPI与DataSetAPI应用案例实战-Flink牛刀小试》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Out of their Minds

Out of their Minds

Dennis Shasha、Cathy Lazere / Springer / 1998-07-02 / USD 16.00

This best-selling book is now available in an inexpensive softcover format. Imagine living during the Renaissance and being able to interview that eras greatest scientists about their inspirations, di......一起来看看 《Out of their Minds》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具