调试 Flink 源码

栏目: 后端 · 发布时间: 7年前

内容简介:首先复制flink的github地址接着在idea点击路径

调试 Flink 源码 本文主要是讲讲flink的源码编译,案例运行,flink源码调试过程。调试flink的源码及案例,需要先clone工程,编一下源码,去掉规范检查,修改工程,最后才是调试运行。

1. clone工程

首先复制flink的github地址

git@github.com:apache/flink.git

接着在idea点击路径

File--->New--->Project from Version Control--->git

弹出窗口

调试 Flink 源码

把刚刚复制的flink的github地址粘贴到url输入栏,点击clone按钮,然后等待构建完成,工程有点大需要点时间。

工程clone完成之后,可以在idea 的右下角切换到自己所用的分支,我的分支是1.6.

调试 Flink 源码

切换完成之后,分支显示为:

调试 Flink 源码

2. 编译源码

源码编译可以直接用idea的maven插件。

调试 Flink 源码

报错如下:

调试 Flink 源码

修改一下根目录下的pom.xml文件

去掉代码风格检查,注释掉这个的主要原因是我们要改源码,不注释掉无法编译通过。

<plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-checkstyle-plugin</artifactId>
          <version>2.17</version>
          <dependencies>
            <dependency>
              <groupId>com.puppycrawl.tools</groupId>
              <artifactId>checkstyle</artifactId>
              <!-- Note: match version with docs/internals/ide_setup.md -->
              <version>8.4</version>
            </dependency>
          </dependencies>
          <executions>
            <execution>
              <id>validate</id>
              <phase>validate</phase>
              <goals>
                <goal>check</goal>
              </goals>
            </execution>
          </executions>
          <configuration>
            <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
            <includeTestSourceDirectory>true</includeTestSourceDirectory>
            <configLocation>/tools/maven/checkstyle.xml</configLocation>
            <logViolationsToConsole>true</logViolationsToConsole>
            <failOnViolation>true</failOnViolation>
          </configuration>
        </plugin>

再次编译,即可。

3. 运行kafka案例

点开工程栏,找到flink-examples模块,然后找到kafka案例,如下:

调试 Flink 源码

将kafka的example修改为可运行的案例,官方demo是通过打包提交到集群的方式运行,需要传参的,而我们直接在idea中运行,不需要穿参数。代码修改如下:

Properties props = new Properties();
    props.put("bootstrap.servers", "mt-mdh.local:9093");
    props.put("zookeeper.connect","localhost:2181");
    props.put("group.id","test");

    props.put("metadata.fetch.timeout.ms","10000");
    props.put("metadata.max.age.ms","30000");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
    env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    DataStream<KafkaEvent> input = env
        .addSource(
          new FlinkKafkaConsumer010<>(
            "",
            new KafkaEventSchema(),
            props)
          .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
        .keyBy("word")
        .map(new RollingAdditionMapper());

    input.addSink(
        new FlinkKafkaProducer010<>(
            "bar",
            new KafkaEventSchema(),
            props));

    env.execute("Kafka 0.10 Example");

然后,右键,run。发现,并不能顺心如意的运行,还是报了一堆错误。。。

调试 Flink 源码

实际上,只需要改一些run的运行配置即可避免该错误。

在导航栏,run---> Edit Configurations

调试 Flink 源码

修改为

调试 Flink 源码

再运行,就ok了。

关于debug,只要run运行成功之后,直接可以debug的。。。

flink的源码调试debug及阅读经验,敬请期待后续,文章,也可以点击原文阅读加入浪尖知识星球。

推荐阅读:

Flink异步IO第一讲

flink 有状态udf 引起血案一

结合Spark讲一下Flink的runtime

一文精通kafka 消费者的三种语义

调试 Flink 源码

点赞,然后分享给小伙伴吧~


以上所述就是小编给大家介绍的《调试 Flink 源码》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Intersectional Internet

The Intersectional Internet

Safiya Umoja Noble、Brendesha M. Tynes / Peter Lang Publishing / 2016

From race, sex, class, and culture, the multidisciplinary field of Internet studies needs theoretical and methodological approaches that allow us to question the organization of social relations that ......一起来看看 《The Intersectional Internet》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

SHA 加密
SHA 加密

SHA 加密工具