@Test
public void testValueStateForQuery() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createRemoteEnvironment("192.168.99.100", 8081, SubmitTest.JAR_FILE);
env.addSource(new RandomTuple2Source())
.keyBy(0) //key by first value of tuple
.flatMap(new CountWindowAverage())
.print();
JobExecutionResult result = env.execute("testQueryableState");
LOGGER.info("submit job result:{}",result);
}
复制代码
- 这里运行一个job,它对tuple的第一个值作为key,然后flatMap操作使用的是CountWindowAverage
CountWindowAverage
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
Tuple2<Long, Long> currentSum = sum.value();
if(currentSum == null){
currentSum = Tuple2.of(1L,input.f1);
}else{
currentSum.f0 += 1;
currentSum.f1 += input.f1;
}
sum.update(currentSum);
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
descriptor.setQueryable("query-name");
sum = getRuntimeContext().getState(descriptor);
}
}
复制代码
- CountWindowAverage通过ValueStateDescriptor的setQueryable("query-name")方法,将state声明为是queryable的
QueryableStateClient
@Test
public void testQueryStateByJobId() throws InterruptedException, IOException {
//get jobId from flink ui running job page
JobID jobId = JobID.fromHexString("793edfa93f354aa0274f759cb13ce79e");
long key = 1L;
//flink-core-1.7.0-sources.jar!/org/apache/flink/configuration/QueryableStateOptions.java
QueryableStateClient client = new QueryableStateClient("192.168.99.100", 9069);
// the state descriptor of the state to be fetched.
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);
LOGGER.info("get kv state return future, waiting......");
// org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException: Queryable State Server : No state for the specified key/namespace.
ValueState<Tuple2<Long, Long>> res = resultFuture.join();
LOGGER.info("query result:{}",res.value());
client.shutdownAndWait();
}
复制代码
- 这里通过QueryableStateClient连接QueryableStateClientProxy进行query state;这里的jobId可以在job提交之后,通过ui界面查询得到,然后使用JobID.fromHexString方法转为JobID对象
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
C#入门经典
[美] Karli Watson、Christian Nagel / 齐立波、黄静 / 清华大学出版社 / 2008-12 / 118.00元
这是一本成就无数C#程序员的经典名著,厚而不“重”,可帮助您轻松掌握C#的各种编程知识,为您的职业生涯打下坚实的基础,《C#入门经典》自第1版出版以来,全球销量已经达数万册,在中国也有近8万册的销量,已经成为广大初级C#程序员首选的入门教程,也是目前国内市场上最畅销的C#专业店销书,曾两次被CSDN、《程序员》等机构和读者评选为“最受读者喜爱的十大技术开发类图书”!第4版面向C#2008和.NET......一起来看看 《C#入门经典》 这本书的介绍吧!