liiklus:基于事件的Reactive(RSocket/gRPC)系统

栏目: 编程语言 · 发布时间: 6年前

内容简介:Liiklus [li:klus](爱沙尼亚语中的“流量”) - 基于gRPC的网关,用于基于事件的系统,如果你认为Kafka实现事件系统过于底层,可以使用该系统:目前谁在使用?开始使用:

Liiklus [li:klus](爱沙尼亚语中的“流量”) - 基于gRPC的网关,用于基于事件的系统,如果你认为Kafka实现事件系统过于底层,可以使用该系统:

  • 水平可扩展的gRPC流媒体网关
  • 支持与gRPC一样多的客户端语言(Java,Go,C ++,Python等)
  • 响应reactive第一
  • 每个分区有背压感知源
  • 至少一次/最多一次交付保证
  • 可插拔事件存储(Kafka,Pulsar,Kinesis等......)
  • 可插拔位置存储(DynamoDB,Cassandra,Redis等...)
  • WIP:冷事件存储支持(S3,Minio,SQL,键/值等...)

目前谁在使用?

  • https://vivy.com/ - 25多个微服务,Kafka面前的共享日志基础设施(事件溯源/ CQRS)抽象

开始使用:

$ docker run \
    -e kafka_bootstrapServers=some.kafka.host:9092 \
    -e storage_positions_type=MEMORY \ # only <b>for</b> testing, DO NOT use in production
    -p 6565:6565 \
    bsideup/liiklus:$LATEST_VERSION

现在可以使用 LiiklusService.proto 生成您的客户端。

客户端必须实现以下算法:

  1. 订阅作业:
    stub.subscribe(SubscribeRequest(    topic=<font>"your-topic"</font><font>,    group=</font><font>"your-consumer-group"</font><font>,    [autoOffsetReset=</font><font>"earliest|latest"</font><font>]))
    </font>
  2. 接受:对Subscribe使用相同频道的每个发出的回复:
    stub.receive(ReceiveRequest(    assignment=reply.getAssignment()))
  3. 确认
    stub.ack(AckRequest(    assignment=reply.getAssignment(),    offset=record.getOffset()))

注1:如果在处理之前确认记录是最多一次,在处理后确认记录是至少一次

注意2:建议每隔n个记录确认一次,或者每隔n秒确认一次,以减少位置存储库的负载

使用 Project Reactorreactive-grpc的 示例代码:

<b>var</b> stub = ReactorLiiklusServiceGrpc.newReactorStub(channel);
stub
    .subscribe(
        SubscribeRequest.newBuilder()
            .setTopic(<font>"user-events"</font><font>)
            .setGroup(</font><font>"analytics"</font><font>)
            .setAutoOffsetReset(AutoOffsetReset.EARLIEST)
            .build()
    )
    .flatMap(reply -> stub
        .receive(ReceiveRequest.newBuilder().setAssignment(reply.getAssignment()).build())
        .window(1000) </font><font><i>// ACK every 1000th records</i></font><font>
        .concatMap(
            batch -> batch
                .map(ReceiveReply::getRecord)
                </font><font><i>// TODO process instead of Mono.delay(), i.e. by indexing to ElasticSearch</i></font><font>
                .concatMap(record -> Mono.delay(Duration.ofMillis(100)))
                .sample(Duration.ofSeconds(5)) </font><font><i>// ACK every 5 seconds</i></font><font>
                .onBackpressureLatest()
                .delayUntil(record -> stub.ack(
                    AckRequest.newBuilder()
                        .setAssignment(reply.getAssignment())
                        .setOffset(record.getOffset())
                        .build()
                )),
            1
        )
    )
    .blockLast()
</font>

以上所述就是小编给大家介绍的《liiklus:基于事件的Reactive(RSocket/gRPC)系统》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

网站项目管理

网站项目管理

[美] 阿什利·弗里德莱因 / 李保庆、杨磊、王增东 / 电子工业出版社 / 2002-11 / 32.00元

这本书全方位地介绍了如何建立和最终交付一个具有很高商业价值的成功网站,讲解从项目管理的角度入手,撇开烦琐的技术细节,更加关注Web项目实施中诸如成本、进度、工作范围等问题,涉及了一个商业网站在实施过程中可能遇到的所有管理细节。书内附国际一流网站开发专家的深邃见解;涵盖了网络项目管理的关键原则及案例研究;通过友情链接,还为读者提供了模板、论坛、术语表、相关链接以及有关因特网知识的测验题。一起来看看 《网站项目管理》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

URL 编码/解码
URL 编码/解码

URL 编码/解码