@Test public void testValueStateForQuery() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment .createRemoteEnvironment("192.168.99.100", 8081, SubmitTest.JAR_FILE); env.addSource(new RandomTuple2Source()) .keyBy(0) //key by first value of tuple .flatMap(new CountWindowAverage()) .print(); JobExecutionResult result = env.execute("testQueryableState"); LOGGER.info("submit job result:{}",result); } 复制代码
- 这里运行一个job,它对tuple的第一个值作为key,然后flatMap操作使用的是CountWindowAverage
CountWindowAverage
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum @Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { Tuple2<Long, Long> currentSum = sum.value(); if(currentSum == null){ currentSum = Tuple2.of(1L,input.f1); }else{ currentSum.f0 += 1; currentSum.f1 += input.f1; } sum.update(currentSum); if (currentSum.f0 >= 2) { out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0)); sum.clear(); } } @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information descriptor.setQueryable("query-name"); sum = getRuntimeContext().getState(descriptor); } } 复制代码
- CountWindowAverage通过ValueStateDescriptor的setQueryable("query-name")方法,将state声明为是queryable的
QueryableStateClient
@Test public void testQueryStateByJobId() throws InterruptedException, IOException { //get jobId from flink ui running job page JobID jobId = JobID.fromHexString("793edfa93f354aa0274f759cb13ce79e"); long key = 1L; //flink-core-1.7.0-sources.jar!/org/apache/flink/configuration/QueryableStateOptions.java QueryableStateClient client = new QueryableStateClient("192.168.99.100", 9069); // the state descriptor of the state to be fetched. ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture = client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor); LOGGER.info("get kv state return future, waiting......"); // org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException: Queryable State Server : No state for the specified key/namespace. ValueState<Tuple2<Long, Long>> res = resultFuture.join(); LOGGER.info("query result:{}",res.value()); client.shutdownAndWait(); } 复制代码
- 这里通过QueryableStateClient连接QueryableStateClientProxy进行query state;这里的jobId可以在job提交之后,通过ui界面查询得到,然后使用JobID.fromHexString方法转为JobID对象
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
人类思维如何与互联网共同进化
[美] 约翰·布罗克曼 / 付晓光 / 浙江人民出版社 / 2017-3 / 79.90元
➢人类是否因互联网的诞生进入了公平竞争的场域? “黑天鹅事件”频频发生,我们的预测能力是否正在退化? 智人的第四阶段有哪些特征? 全球脑会使人类成为“超级英雄”吗? 虚拟现实技术会不会灭绝人类的真实体验? 还有更多不可预知答案的问题,你将在本书中找到属于自己的答案! ➢ 我们的心智正和互联网发生着永无止境的共振,人类思维会因此产生怎样的进化效应?本书编者约翰•布......一起来看看 《人类思维如何与互联网共同进化》 这本书的介绍吧!
Markdown 在线编辑器
Markdown 在线编辑器
HEX CMYK 转换工具
HEX CMYK 互转工具