聊聊flink的Queryable State

栏目: Java · 发布时间: 6年前

@Test
    public void testValueStateForQuery() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .createRemoteEnvironment("192.168.99.100", 8081, SubmitTest.JAR_FILE);
        env.addSource(new RandomTuple2Source())
                .keyBy(0) //key by first value of tuple
                .flatMap(new CountWindowAverage())
                .print();
        JobExecutionResult result = env.execute("testQueryableState");
        LOGGER.info("submit job result:{}",result);
    }
复制代码
  • 这里运行一个job,它对tuple的第一个值作为key,然后flatMap操作使用的是CountWindowAverage

CountWindowAverage

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        Tuple2<Long, Long> currentSum = sum.value();
        if(currentSum == null){
            currentSum = Tuple2.of(1L,input.f1);
        }else{
            currentSum.f0 += 1;
            currentSum.f1 += input.f1;
        }

        sum.update(currentSum);

        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
        descriptor.setQueryable("query-name");
        sum = getRuntimeContext().getState(descriptor);
    }
}
复制代码
  • CountWindowAverage通过ValueStateDescriptor的setQueryable("query-name")方法,将state声明为是queryable的

QueryableStateClient

@Test
    public void testQueryStateByJobId() throws InterruptedException, IOException {
        //get jobId from flink ui running job page
        JobID jobId = JobID.fromHexString("793edfa93f354aa0274f759cb13ce79e");
        long key = 1L;
        //flink-core-1.7.0-sources.jar!/org/apache/flink/configuration/QueryableStateOptions.java
        QueryableStateClient client = new QueryableStateClient("192.168.99.100", 9069);

        // the state descriptor of the state to be fetched.
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average",
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

        CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
                client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);

        LOGGER.info("get kv state return future, waiting......");
        // org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException: Queryable State Server : No state for the specified key/namespace.
        ValueState<Tuple2<Long, Long>> res = resultFuture.join();
        LOGGER.info("query result:{}",res.value());
        client.shutdownAndWait();
    }
复制代码
  • 这里通过QueryableStateClient连接QueryableStateClientProxy进行query state;这里的jobId可以在job提交之后,通过ui界面查询得到,然后使用JobID.fromHexString方法转为JobID对象

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

常用算法程序集

常用算法程序集

清华大学出版社 / 2013-4 / 69.00元

《常用算法程序集(C\C++描述第5版清华大学计算机系列教材)》编著者徐士良、马尔妮。 《常用算法程序集(C\C++描述第5版清华大学计算机系列教材)》是针对工程中常用的行之有效的算法而编写的,主要内容包括多项式的计算、复数运算、随机数的产生、矩阵运算、矩阵特征值与特征向量的计算、线性代数方程组的求解、非线性方程与方程组的求解、插值与逼近、数值积分、常微分方程组的求解、数据处理、极值......一起来看看 《常用算法程序集》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具