内容简介:版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。
版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。
1 DataStreamAPI
1.1 DataStream Data Sources
-
source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。
-
flink提供了大量的已经实现好的source方法,可以自定义source 通过实现sourceFunction接口来自定义无并行度的source。
1 使用并行度为1的source public class MyNoParalleSource implements SourceFunction<Long>{ private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 启动一个source * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了 * * @param ctx * @throws Exception */ @Override public void run(SourceContext<Long> ctx) throws Exception { while(isRunning){ ctx.collect(count); count++; //每秒产生一条数据 Thread.sleep(1000); } } * 取消一个cancel的时候会调用的方法 @Override public void cancel() { isRunning = false; } } 2 Main方法执行 public class StreamingDemoWithMyNoPralalleSource { public static void main(String[] args) throws Exception { //获取Flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //获取数据源 DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1); //注意:针对此source,并行度只能设置为1 DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("接收到数据:" + value); return value; } }); //每2秒钟处理一次数据 DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0); //打印结果 sum.print().setParallelism(1); String jobName = StreamingDemoWithMyNoPralalleSource.class.getSimpleName(); env.execute(jobName); } } 复制代码
-
可以通过实现ParallelSourceFunction接口或者继承RichParallelSourceFunction来自定义有并行度的source。继承RichParallelSourceFunction的那些SourceFunction意味着它们都是并行执行的并且可能有一些资源需要open/close
public class MyParalleSource implements ParallelSourceFunction<Long> { private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 启动一个source * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了 * * @param ctx * @throws Exception */ @Override public void run(SourceContext<Long> ctx) throws Exception { while(isRunning){ ctx.collect(count); count++; //每秒产生一条数据 Thread.sleep(1000); } } /** * 取消一个cancel的时候会调用的方法 * */ @Override public void cancel() { isRunning = false; } } public class StreamingDemoWithMyPralalleSource { public static void main(String[] args) throws Exception { //获取Flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //获取数据源 DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(2); DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("接收到数据:" + value); return value; } }); //每2秒钟处理一次数据 DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0); //打印结果 sum.print().setParallelism(1); String jobName = StreamingDemoWithMyPralalleSource.class.getSimpleName(); env.execute(jobName); } } public class MyRichParalleSource extends RichParallelSourceFunction<Long> { private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 启动一个source * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了 * @param ctx * @throws Exception */ @Override public void run(SourceContext<Long> ctx) throws Exception { while(isRunning){ ctx.collect(count); count++; //每秒产生一条数据 Thread.sleep(1000); } } /** * 取消一个cancel的时候会调用的方法 * */ @Override public void cancel() { isRunning = false; } /** * 这个方法只会在最开始的时候被调用一次 * 实现获取链接的代码 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { System.out.println("open............."); super.open(parameters); } /** * 实现关闭链接的代码 * @throws Exception */ @Override public void close() throws Exception { super.close(); } } public class StreamingDemoWithMyRichPralalleSource { public static void main(String[] args) throws Exception { //获取Flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //获取数据源 DataStreamSource<Long> text = env.addSource(new MyRichParalleSource()).setParallelism(2); DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("接收到数据:" + value); return value; } }); //每2秒钟处理一次数据 DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0); //打印结果 sum.print().setParallelism(1); String jobName = StreamingDemoWithMyRichPralalleSource.class.getSimpleName(); env.execute(jobName); } } 复制代码
-
基于文件 readTextFile(path) 读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。
-
基于socket socketTextStream从socker中读取数据,元素可以通过一个分隔符切开。
public class SocketDemoFullCount { public static void main(String[] args) throws Exception{ //获取需要的端口号 int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); }catch (Exception e){ System.err.println("No port set. use default port 9010--java"); port = 9010; } //获取flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String hostname = "SparkMaster"; String delimiter = "\n"; //连接socket获取输入的数据 DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter); DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() { @Override public Tuple2<Integer,Integer> map(String value) throws Exception { return new Tuple2<>(1,Integer.parseInt(value)); } }); intData.keyBy(0) .timeWindow(Time.seconds(5)) .process(new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() { @Override public void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out) throws Exception { System.out.println("执行process"); long count = 0; for(Tuple2<Integer,Integer> element: elements){ count++; } out.collect("window:"+context.window()+",count:"+count); } }).print(); //这一行代码一定要实现,否则程序不执行 env.execute("Socket window count"); } } 复制代码
-
基于集合 fromCollection(Collection) 通过 java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。
public class StreamingFromCollection { public static void main(String[] args) throws Exception { //获取Flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ArrayList<Integer> data = new ArrayList<>(); data.add(10); data.add(15); data.add(20); //指定数据源 DataStreamSource<Integer> collectionData = env.fromCollection(data); //通map对数据进行处理 DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return value + 1; } }); //直接打印 num.print().setParallelism(1); env.execute("StreamingFromCollection"); } 复制代码
}
-
自定义输入 addSource 可以实现读取第三方数据源的数据 系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】
1.2 DataStream Transformations
-
map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
-
flatmap:输入一个元素,可以返回零个,一个或者多个元素
-
keyBy:根据指定的key进行分组,相同key的数据会进入同一个分区
dataStream.keyBy("someKey") // 指定对象中的 "someKey"字段作为分组key dataStream.keyBy(0) //指定Tuple中的第一个元素作为分组key 注意:以下类型是无法作为key的 1:一个实体类对象,没有重写hashCode方法,并且依赖object的hashCode方法 2:一个任意形式的数组类型 3:基本数据类型,int,long 复制代码
-
filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下。
public class StreamingDemoFilter { public static void main(String[] args) throws Exception { //获取Flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //获取数据源 DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1 DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("原始接收到数据:" + value); return value; } }); //执行filter过滤,满足条件的数据会被留下 DataStream<Long> filterData = num.filter(new FilterFunction<Long>() { //把所有的奇数过滤掉 @Override public boolean filter(Long value) throws Exception { return value % 2 == 0; } }); DataStream<Long> resultData = filterData.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("过滤之后的数据:" + value); return value; } }); //每2秒钟处理一次数据 DataStream<Long> sum = resultData.timeWindowAll(Time.seconds(2)).sum(0); //打印结果 sum.print().setParallelism(1); String jobName = StreamingDemoFilter.class.getSimpleName(); env.execute(jobName); } } 复制代码
-
reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
-
aggregations:sum(),min(),max()等
-
window:在后面单独详解
-
Union:合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的。
public class StreamingDemoUnion { public static void main(String[] args) throws Exception { //获取Flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //获取数据源 DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1 DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1); //把text1和text2组装到一起 DataStream<Long> text = text1.union(text2); DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("原始接收到数据:" + value); return value; } }); //每2秒钟处理一次数据 DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0); //打印结果 sum.print().setParallelism(1); String jobName = StreamingDemoUnion.class.getSimpleName(); env.execute(jobName); } 复制代码
}
-
Connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法。
-
CoMap, CoFlatMap:在ConnectedStreams中需要使用这种函数,类似于map和flatmap
public class StreamingDemoConnect { public static void main(String[] args) throws Exception { //获取Flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //获取数据源 DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1 DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1); SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() { @Override public String map(Long value) throws Exception { return "str_" + value; } }); ConnectedStreams<Long, String> connectStream = text1.connect(text2_str); SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() { @Override public Object map1(Long value) throws Exception { return value; } @Override public Object map2(String value) throws Exception { return value; } }); //打印结果 result.print().setParallelism(1); String jobName = StreamingDemoConnect.class.getSimpleName(); env.execute(jobName); } } 复制代码
-
Split:根据规则把一个数据流切分为多个流:
-
Select:和split配合使用,选择切分后的流
public class StreamingDemoSplit { public static void main(String[] args) throws Exception { //获取Flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //获取数据源 DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1 //对流进行切分,按照数据的奇偶性进行区分 SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() { @Override public Iterable<String> select(Long value) { ArrayList<String> outPut = new ArrayList<>(); if (value % 2 == 0) { outPut.add("even");//偶数 } else { outPut.add("odd");//奇数 } return outPut; } }); //选择一个或者多个切分后的流 DataStream<Long> evenStream = splitStream.select("even"); DataStream<Long> oddStream = splitStream.select("odd"); DataStream<Long> moreStream = splitStream.select("odd","even"); //打印结果 moreStream.print().setParallelism(1); String jobName = StreamingDemoSplit.class.getSimpleName(); env.execute(jobName); } } 复制代码
以上所述就是小编给大家介绍的《Flink DataStreamAPI与DataSetAPI应用案例实战-Flink牛刀小试》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Hard Thing About Hard Things
Ben Horowitz / HarperBusiness / 2014-3-4 / USD 29.99
Ben Horowitz, cofounder of Andreessen Horowitz and one of Silicon Valley's most respected and experienced entrepreneurs, offers essential advice on building and running a startup—practical wisdom for ......一起来看看 《The Hard Thing About Hard Things》 这本书的介绍吧!