实战 | Kafka + Flink + Redis 的电商大屏实时计算案

栏目: IT技术 · 发布时间: 5年前

内容简介:做积极的人,而不是积极废人

点击上方 匠心零度 ,选择“ 设为星标

做积极的人,而不是积极废人

实战 | Kafka + Flink + Redis 的电商大屏实时计算案

来源:LittleMagic

cloud.tencent.com/developer/article/1558372

目录

  • 前言

  • 数据格式与接入

  • 统计站点指标

  • 商品Top N

  • The End

前言

阿里的双11销量大屏可以说是一道特殊的风景线。实时大屏(real-time dashboard)正在被越来越多的企业采用,用来及时呈现关键的数据指标。并且在实际操作中,肯定也不会仅仅计算一两个维度。由于Flink的“真·流式计算”这一特点,它比Spark Streaming要更适合大屏应用。本文从笔者的实际工作经验抽象出简单的模型,并简要叙述计算流程(当然大部分都是源码)。

实战 | Kafka + Flink + Redis 的电商大屏实时计算案

数据格式与接入

简化的子订单消息体如下。

{
    "userId": 234567,
    "orderId": 2902306918400,
    "subOrderId": 2902306918401,
    "siteId": 10219,
    "siteName": "site_blabla",
    "cityId": 101,
    "cityName": "北京市",
    "warehouseId": 636,
    "merchandiseId": 187699,
    "price": 299,
    "quantity": 2,
    "orderStatus": 1,
    "isNewOrder": 0,
    "timestamp": 1572963672217
}

由于订单可能会包含多种商品,故会被拆分成子订单来表示,每条JSON消息表示一个子订单。现在要按照自然日来统计以下指标,并以1秒的刷新频率呈现在大屏上:

  • 每个站点(站点ID即siteId)的总订单数、子订单数、销量与GMV;

  • 当前销量排名前N的商品(商品ID即merchandiseId)与它们的销量。

由于大屏的最大诉求是实时性,等待迟到数据显然不太现实,因此我们采用处理时间作为时间特征,并以1分钟的频率做checkpointing。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);

然后订阅Kafka的订单消息作为数据源。

Properties consumerProps = ParameterUtil.getFromResourceFile("kafka.properties");
    DataStream<String> sourceStream = env
      .addSource(new FlinkKafkaConsumer011<>(
        ORDER_EXT_TOPIC_NAME,                        // topic
        new SimpleStringSchema(),                    // deserializer
        consumerProps                                // consumer properties
      ))
      .setParallelism(PARTITION_COUNT)
      .name("source_kafka_" + ORDER_EXT_TOPIC_NAME)
      .uid("source_kafka_" + ORDER_EXT_TOPIC_NAME);

给带状态的算子设定算子ID(通过调用uid()方法)是个好习惯,能够保证Flink应用从保存点重启时能够正确恢复状态现场。为了尽量稳妥,Flink官方也建议为每个算子都显式地设定ID,参考: https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job

接下来将JSON数据转化为POJO,JSON框架采用FastJSON。

DataStream<SubOrderDetail> orderStream = sourceStream
      .map(message -> JSON.parseObject(message, SubOrderDetail.class))
      .name("map_sub_order_detail").uid("map_sub_order_detail");

JSON已经是预先处理好的标准化格式,所以POJO类SubOrderDetail的写法可以通过Lombok极大地简化。如果JSON的字段有不规范的,那么就需要手写Getter和Setter,并用@JSONField注解来指明。

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class SubOrderDetail implements Serializable {
  private static final long serialVersionUID = 1L;

  private long userId;
  private long orderId;
  private long subOrderId;
  private long siteId;
  private String siteName;
  private long cityId;
  private String cityName;
  private long warehouseId;
  private long merchandiseId;
  private long price;
  private long quantity;
  private int orderStatus;
  private int isNewOrder;
  private long timestamp;
}

统计站点指标

将子订单流按站点ID分组,开1天的滚动窗口,并同时设定ContinuousProcessingTimeTrigger触发器,以1秒周期触发计算。注意处理时间的时区问题,这是老生常谈了。

WindowedStream<SubOrderDetail, Tuple, TimeWindow> siteDayWindowStream = orderStream
      .keyBy("siteId")
      .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)));

接下来写个聚合函数。

DataStream<OrderAccumulator> siteAggStream = siteDayWindowStream
      .aggregate(new OrderAndGmvAggregateFunc())
      .name("aggregate_site_order_gmv").uid("aggregate_site_order_gmv");
  public static final class OrderAndGmvAggregateFunc
    implements AggregateFunction<SubOrderDetail, OrderAccumulator, OrderAccumulator> {
    private static final long serialVersionUID = 1L;

    @Override
    public OrderAccumulator createAccumulator() {
      return new OrderAccumulator();
    }

    @Override
    public OrderAccumulator add(SubOrderDetail record, OrderAccumulator acc) {
      if (acc.getSiteId() == 0) {
        acc.setSiteId(record.getSiteId());
        acc.setSiteName(record.getSiteName());
      }
      acc.addOrderId(record.getOrderId());
      acc.addSubOrderSum(1);
      acc.addQuantitySum(record.getQuantity());
      acc.addGmv(record.getPrice() * record.getQuantity());
      return acc;
    }

    @Override
    public OrderAccumulator getResult(OrderAccumulator acc) {
      return acc;
    }

    @Override
    public OrderAccumulator merge(OrderAccumulator acc1, OrderAccumulator acc2) {
      if (acc1.getSiteId() == 0) {
        acc1.setSiteId(acc2.getSiteId());
        acc1.setSiteName(acc2.getSiteName());
      }
      acc1.addOrderIds(acc2.getOrderIds());
      acc1.addSubOrderSum(acc2.getSubOrderSum());
      acc1.addQuantitySum(acc2.getQuantitySum());
      acc1.addGmv(acc2.getGmv());
      return acc1;
    }
  }

累加器类OrderAccumulator的实现很简单,看源码就大概知道它的结构了,因此不再多废话。唯一需要注意的是订单ID可能重复,所以需要用名为orderIds的HashSet来保存它。HashSet应付我们目前的数据规模还是没太大问题的,如果是海量数据,就考虑换用HyperLogLog吧。

接下来就该输出到 Redis 供呈现端查询了。这里有个问题:一秒内有数据变化的站点并不多,而ContinuousProcessingTimeTrigger每次触发都会输出窗口里全部的聚合数据,这样做了很多无用功,并且还会增大Redis的压力。所以,我们在聚合结果后再接一个ProcessFunction,代码如下。

DataStream<Tuple2<Long, String>> siteResultStream = siteAggStream
      .keyBy(0)
      .process(new OutputOrderGmvProcessFunc(), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {}))
      .name("process_site_gmv_changed").uid("process_site_gmv_changed");
  public static final class OutputOrderGmvProcessFunc
    extends KeyedProcessFunction<Tuple, OrderAccumulator, Tuple2<Long, String>> {
    private static final long serialVersionUID = 1L;

    private MapState<Long, OrderAccumulator> state;

    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      state = this.getRuntimeContext().getMapState(new MapStateDescriptor<>(
        "state_site_order_gmv",
        Long.class,
        OrderAccumulator.class)
      );
    }

    @Override
    public void processElement(OrderAccumulator value, Context ctx, Collector<Tuple2<Long, String>> out) throws Exception {
      long key = value.getSiteId();
      OrderAccumulator cachedValue = state.get(key);

      if (cachedValue == null || value.getSubOrderSum() != cachedValue.getSubOrderSum()) {
        JSONObject result = new JSONObject();
        result.put("site_id", value.getSiteId());
        result.put("site_name", value.getSiteName());
        result.put("quantity", value.getQuantitySum());
        result.put("orderCount", value.getOrderIds().size());
        result.put("subOrderCount", value.getSubOrderSum());
        result.put("gmv", value.getGmv());
        out.collect(new Tuple2<>(key, result.toJSONString());
        state.put(key, value);
      }
    }

    @Override
    public void close() throws Exception {
      state.clear();
      super.close();
    }
  }

说来也简单,就是用一个MapState状态缓存当前所有站点的聚合数据。由于数据源是以子订单为单位的,因此如果站点ID在MapState中没有缓存,或者缓存的子订单数与当前子订单数不一致,表示结果有更新,这样的数据才允许输出。

最后就可以安心地接上Redis Sink了,结果会被存进一个Hash结构里。

// 看官请自己构造合适的FlinkJedisPoolConfig
    FlinkJedisPoolConfig jedisPoolConfig = ParameterUtil.getFlinkJedisPoolConfig(false, true);
    siteResultStream
      .addSink(new RedisSink<>(jedisPoolConfig, new GmvRedisMapper()))
      .name("sink_redis_site_gmv").uid("sink_redis_site_gmv")
      .setParallelism(1);
  public static final class GmvRedisMapper implements RedisMapper<Tuple2<Long, String>> {
    private static final long serialVersionUID = 1L;
    private static final String HASH_NAME_PREFIX = "RT:DASHBOARD:GMV:";

    @Override
    public RedisCommandDescription getCommandDescription() {
      return new RedisCommandDescription(RedisCommand.HSET, HASH_NAME_PREFIX);
    }

    @Override
    public String getKeyFromData(Tuple2<Long, String> data) {
      return String.valueOf(data.f0);
    }

    @Override
    public String getValueFromData(Tuple2<Long, String> data) {
      return data.f1;
    }

    @Override
    public Optional<String> getAdditionalKey(Tuple2<Long, String> data) {
      return Optional.of(
        HASH_NAME_PREFIX +
        new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) +
        "SITES"
      );
    }
  }

商品Top N

我们可以直接复用前面产生的orderStream,玩法与上面的GMV统计大同小异。这里用1秒滚动窗口就可以了。

WindowedStream<SubOrderDetail, Tuple, TimeWindow> merchandiseWindowStream = orderStream
      .keyBy("merchandiseId")
      .window(TumblingProcessingTimeWindows.of(Time.seconds(1)));

    DataStream<Tuple2<Long, Long>> merchandiseRankStream = merchandiseWindowStream
      .aggregate(new MerchandiseSalesAggregateFunc(), new MerchandiseSalesWindowFunc())
      .name("aggregate_merch_sales").uid("aggregate_merch_sales")
      .returns(TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() { }));

聚合函数与窗口函数的实现更加简单了,最终返回的是商品ID与商品销量的二元组。

public static final class MerchandiseSalesAggregateFunc
    implements AggregateFunction<SubOrderDetail, Long, Long> {
    private static final long serialVersionUID = 1L;

    @Override
    public Long createAccumulator() {
      return 0L;
    }

    @Override
    public Long add(SubOrderDetail value, Long acc) {
      return acc + value.getQuantity();
    }

    @Override
    public Long getResult(Long acc) {
      return acc;
    }

    @Override
    public Long merge(Long acc1, Long acc2) {
      return acc1 + acc2;
    }
  }


  public static final class MerchandiseSalesWindowFunc
    implements WindowFunction<Long, Tuple2<Long, Long>, Tuple, TimeWindow> {
    private static final long serialVersionUID = 1L;

    @Override
    public void apply(
      Tuple key,
      TimeWindow window,
      Iterable<Long> accs,
      Collector<Tuple2<Long, Long>> out) throws Exception {
      long merchId = ((Tuple1<Long>) key).f0;
      long acc = accs.iterator().next();
      out.collect(new Tuple2<>(merchId, acc));
    }
  }

既然数据最终都要落到Redis,那么我们完全没必要在Flink端做Top N的统计,直接利用Redis的有序集合(zset)就行了,商品ID作为field,销量作为分数值,简单方便。不过flink-redis-connector项目中默认没有提供ZINCRBY命令的实现(必须再吐槽一次),我们可以自己加,步骤参照之前写过的那篇加SETEX的命令的文章,不再赘述。RedisMapper的写法如下。

public static final class RankingRedisMapper implements RedisMapper<Tuple2<Long, Long>> {
    private static final long serialVersionUID = 1L;
    private static final String ZSET_NAME_PREFIX = "RT:DASHBOARD:RANKING:";

    @Override
    public RedisCommandDescription getCommandDescription() {
      return new RedisCommandDescription(RedisCommand.ZINCRBY, ZSET_NAME_PREFIX);
    }

    @Override
    public String getKeyFromData(Tuple2<Long, Long> data) {
      return String.valueOf(data.f0);
    }

    @Override
    public String getValueFromData(Tuple2<Long, Long> data) {
      return String.valueOf(data.f1);
    }

    @Override
    public Optional<String> getAdditionalKey(Tuple2<Long, Long> data) {
      return Optional.of(
        ZSET_NAME_PREFIX +
        new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) + ":" +
        "MERCHANDISE"
      );
    }
  }

后端取数时,用ZREVRANGE命令即可取出指定排名的数据了。只要数据规模不是大到难以接受,并且有现成的Redis,这个方案完全可以作为各类Top N需求的通用实现。

The End

大屏的实际呈现需要保密,截图自然是没有的。以下是提交执行时Flink Web UI给出的执行计划(实际有更多的统计任务,不止3个Sink)。通过复用源数据,可以在同一个Flink job内实现更多统计需求。

实战 | Kafka + Flink + Redis 的电商大屏实时计算案

END

如果读完觉得有收获的话,欢迎点【好看】,关注【匠心零度】,查阅更多精彩历史!!!

实战 | Kafka + Flink + Redis 的电商大屏实时计算案

让我“ 好看 ”  实战 | Kafka + Flink + Redis 的电商大屏实时计算案


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Practical Django Projects, Second Edition

Practical Django Projects, Second Edition

James Bennett / Apress / 2009 / 44.99

Build a django content management system, blog, and social networking site with James Bennett as he introduces version 1.1 of the popular Django framework. You’ll work through the development of ea......一起来看看 《Practical Django Projects, Second Edition》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具