Flink DataStreamAPI与DataSetAPI应用案例实战-Flink牛刀小试

栏目: 服务器 · 发布时间: 6年前

内容简介:版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

1 DataStreamAPI

1.1 DataStream Data Sources

  • source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。

  • flink提供了大量的已经实现好的source方法,可以自定义source 通过实现sourceFunction接口来自定义无并行度的source。

    1 使用并行度为1的source
      public class MyNoParalleSource implements SourceFunction<Long>{
      
          private long count = 1L;
      
          private boolean isRunning = true;
      
          /**
           * 主要的方法
           * 启动一个source
           * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
           *
           * @param ctx
           * @throws Exception
           */
          @Override
          public void run(SourceContext<Long> ctx) throws Exception {
              while(isRunning){
                  ctx.collect(count);
                  count++;
                  //每秒产生一条数据
                  Thread.sleep(1000);
              }
          }
          * 取消一个cancel的时候会调用的方法
          @Override
          public void cancel() {
              isRunning = false;
          }
      }
      
      2 Main方法执行
      public class StreamingDemoWithMyNoPralalleSource {
          public static void main(String[] args) throws Exception {
          //获取Flink的运行环境
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          //获取数据源
          DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1); //注意:针对此source,并行度只能设置为1
          DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
              @Override
              public Long map(Long value) throws Exception {
                  System.out.println("接收到数据:" + value);
                  return value;
              }
          });
    
      //每2秒钟处理一次数据
      DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
      //打印结果
      sum.print().setParallelism(1);
      String jobName = StreamingDemoWithMyNoPralalleSource.class.getSimpleName();
      env.execute(jobName);
       }
      }   
    复制代码
  • 可以通过实现ParallelSourceFunction接口或者继承RichParallelSourceFunction来自定义有并行度的source。继承RichParallelSourceFunction的那些SourceFunction意味着它们都是并行执行的并且可能有一些资源需要open/close

    public class MyParalleSource implements ParallelSourceFunction<Long> {
      private long count = 1L;
      private boolean isRunning = true;
    
      /**
       * 主要的方法
       * 启动一个source
       * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
       *
       * @param ctx
       * @throws Exception
       */
      @Override
      public void run(SourceContext<Long> ctx) throws Exception {
          while(isRunning){
              ctx.collect(count);
              count++;
              //每秒产生一条数据
              Thread.sleep(1000);
          }
      }
      /**
       * 取消一个cancel的时候会调用的方法
       *
       */
      @Override
      public void cancel() {
          isRunning = false;
       }
    }
    
    public class StreamingDemoWithMyPralalleSource {
        public static void main(String[] args) throws Exception {
          //获取Flink的运行环境
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
          //获取数据源
          DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(2);
    
          DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
              @Override
              public Long map(Long value) throws Exception {
                  System.out.println("接收到数据:" + value);
                  return value;
              }
          });
    
          //每2秒钟处理一次数据
          DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
    
          //打印结果
          sum.print().setParallelism(1);
    
          String jobName = StreamingDemoWithMyPralalleSource.class.getSimpleName();
          env.execute(jobName);
          }
       }
    
    
       public class MyRichParalleSource extends RichParallelSourceFunction<Long> {
          private long count = 1L;
          private boolean isRunning = true;
          /**
           * 主要的方法
           * 启动一个source
           * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
           * @param ctx
           * @throws Exception
           */
          @Override
          public void run(SourceContext<Long> ctx) throws Exception {
              while(isRunning){
                  ctx.collect(count);
                  count++;
                  //每秒产生一条数据
                  Thread.sleep(1000);
              }
          }
          /**
           * 取消一个cancel的时候会调用的方法
           *
           */
          @Override
          public void cancel() {
              isRunning = false;
          }
          /**
           * 这个方法只会在最开始的时候被调用一次
           * 实现获取链接的代码
           * @param parameters
           * @throws Exception
           */
          @Override
          public void open(Configuration parameters) throws Exception {
              System.out.println("open.............");
              super.open(parameters);
          }
      
          /**
           * 实现关闭链接的代码
           * @throws Exception
           */
          @Override
          public void close() throws Exception {
              super.close();
          }
      }
      
       public class StreamingDemoWithMyRichPralalleSource {
       
           public static void main(String[] args) throws Exception {
              //获取Flink的运行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              //获取数据源
              DataStreamSource<Long> text = env.addSource(new MyRichParalleSource()).setParallelism(2);
      
              DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
                  @Override
                  public Long map(Long value) throws Exception {
                      System.out.println("接收到数据:" + value);
                      return value;
                  }
              });
      
              //每2秒钟处理一次数据
              DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
      
              //打印结果
              sum.print().setParallelism(1);
      
              String jobName = StreamingDemoWithMyRichPralalleSource.class.getSimpleName();
              env.execute(jobName);
              }
          }   
    复制代码
  • 基于文件 readTextFile(path) 读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。

  • 基于socket socketTextStream从socker中读取数据,元素可以通过一个分隔符切开。

    public class SocketDemoFullCount {
          public static void main(String[] args) throws Exception{
              //获取需要的端口号
              int port;
              try {
                  ParameterTool parameterTool = ParameterTool.fromArgs(args);
                  port = parameterTool.getInt("port");
              }catch (Exception e){
                  System.err.println("No port set. use default port 9010--java");
                  port = 9010;
              }
              //获取flink的运行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              String hostname = "SparkMaster";
              String delimiter = "\n";
              //连接socket获取输入的数据
              DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
      
              DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() {
                  @Override
                  public Tuple2<Integer,Integer> map(String value) throws Exception {
                      return new Tuple2<>(1,Integer.parseInt(value));
                  }
              });
      
              intData.keyBy(0)
                      .timeWindow(Time.seconds(5))
                      .process(new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() {
                          @Override
                          public void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out) throws Exception {
                              System.out.println("执行process");
                              long count = 0;
                               for(Tuple2<Integer,Integer> element: elements){
                                  count++;
                              }
                              out.collect("window:"+context.window()+",count:"+count);
                          }
                      }).print();
              //这一行代码一定要实现,否则程序不执行
              env.execute("Socket window count");
          }
      }
    复制代码
  • 基于集合 fromCollection(Collection) 通过 java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。

    public class StreamingFromCollection {
          public static void main(String[] args) throws Exception {
              //获取Flink的运行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              ArrayList<Integer> data = new ArrayList<>();
              data.add(10);
              data.add(15);
              data.add(20);
      
              //指定数据源
              DataStreamSource<Integer> collectionData = env.fromCollection(data);
              //通map对数据进行处理
              DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() {
                  @Override
                  public Integer map(Integer value) throws Exception {
                      return value + 1;
                  }
              });
              //直接打印
              num.print().setParallelism(1);
              env.execute("StreamingFromCollection");
          }
    复制代码

    }

  • 自定义输入 addSource 可以实现读取第三方数据源的数据 系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】

1.2 DataStream Transformations

  • map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作

  • flatmap:输入一个元素,可以返回零个,一个或者多个元素

  • keyBy:根据指定的key进行分组,相同key的数据会进入同一个分区

    dataStream.keyBy("someKey") // 指定对象中的 "someKey"字段作为分组key
      dataStream.keyBy(0) //指定Tuple中的第一个元素作为分组key
      
      注意:以下类型是无法作为key的
      1:一个实体类对象,没有重写hashCode方法,并且依赖object的hashCode方法
      2:一个任意形式的数组类型
      3:基本数据类型,int,long
    复制代码
  • filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下。

    public class StreamingDemoFilter {
          public static void main(String[] args) throws Exception {
              //获取Flink的运行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              //获取数据源
              DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
              DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
                  @Override
                  public Long map(Long value) throws Exception {
                      System.out.println("原始接收到数据:" + value);
                      return value;
                  }
              });
      
              //执行filter过滤,满足条件的数据会被留下
              DataStream<Long> filterData = num.filter(new FilterFunction<Long>() {
                  //把所有的奇数过滤掉
                  @Override
                  public boolean filter(Long value) throws Exception {
                      return value % 2 == 0;
                  }
              });
      
              DataStream<Long> resultData = filterData.map(new MapFunction<Long, Long>() {
                  @Override
                  public Long map(Long value) throws Exception {
                      System.out.println("过滤之后的数据:" + value);
                      return value;
                  }
              });
      
      
              //每2秒钟处理一次数据
              DataStream<Long> sum = resultData.timeWindowAll(Time.seconds(2)).sum(0);
      
              //打印结果
              sum.print().setParallelism(1);
      
              String jobName = StreamingDemoFilter.class.getSimpleName();
              env.execute(jobName);
          }
      }
    复制代码
  • reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值

  • aggregations:sum(),min(),max()等

  • window:在后面单独详解

  • Union:合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的。

    public class StreamingDemoUnion {
      public static void main(String[] args) throws Exception {
          //获取Flink的运行环境
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
          //获取数据源
          DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
    
          DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
    
          //把text1和text2组装到一起
          DataStream<Long> text = text1.union(text2);
    
          DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
              @Override
              public Long map(Long value) throws Exception {
                  System.out.println("原始接收到数据:" + value);
                  return value;
              }
          });
          //每2秒钟处理一次数据
          DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
          //打印结果
          sum.print().setParallelism(1);
          String jobName = StreamingDemoUnion.class.getSimpleName();
          env.execute(jobName);
      }
    复制代码

    }

  • Connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法。

  • CoMap, CoFlatMap:在ConnectedStreams中需要使用这种函数,类似于map和flatmap

    public class StreamingDemoConnect {
      
          public static void main(String[] args) throws Exception {
              //获取Flink的运行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              //获取数据源
              DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
              DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
              SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
                  @Override
                  public String map(Long value) throws Exception {
                      return "str_" + value;
                  }
              });
      
              ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);
      
              SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
                  @Override
                  public Object map1(Long value) throws Exception {
                      return value;
                  }
                  @Override
                  public Object map2(String value) throws Exception {
                      return value;
                  }
              });
              //打印结果
              result.print().setParallelism(1);
              String jobName = StreamingDemoConnect.class.getSimpleName();
              env.execute(jobName);
          }
      }
    复制代码
  • Split:根据规则把一个数据流切分为多个流:

  • Select:和split配合使用,选择切分后的流

    public class StreamingDemoSplit {
          public static void main(String[] args) throws Exception {
              //获取Flink的运行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              //获取数据源
              DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
      
              //对流进行切分,按照数据的奇偶性进行区分
              SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {
                  @Override
                  public Iterable<String> select(Long value) {
                      ArrayList<String> outPut = new ArrayList<>();
                      if (value % 2 == 0) {
                          outPut.add("even");//偶数
                      } else {
                          outPut.add("odd");//奇数
                      }
                      return outPut;
                  }
              });
              
              //选择一个或者多个切分后的流
              DataStream<Long> evenStream = splitStream.select("even");
              DataStream<Long> oddStream = splitStream.select("odd");
      
              DataStream<Long> moreStream = splitStream.select("odd","even");
      
      
              //打印结果
              moreStream.print().setParallelism(1);
      
              String jobName = StreamingDemoSplit.class.getSimpleName();
              env.execute(jobName);
          }
      }
    复制代码

以上所述就是小编给大家介绍的《Flink DataStreamAPI与DataSetAPI应用案例实战-Flink牛刀小试》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Hard Thing About Hard Things

The Hard Thing About Hard Things

Ben Horowitz / HarperBusiness / 2014-3-4 / USD 29.99

Ben Horowitz, cofounder of Andreessen Horowitz and one of Silicon Valley's most respected and experienced entrepreneurs, offers essential advice on building and running a startup—practical wisdom for ......一起来看看 《The Hard Thing About Hard Things》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器