内容简介:要想应用能处理一天的大流量 api 的 traces, 自然直接的想法当然是找一个 MapReduce 的 Cluster 将一天的数据 map 成多份来进行处理, 最后 reduce 到一起来算 pXX.有意思的是, 你是可以本地直接
要想应用能处理一天的大流量 api 的 traces, 自然直接的想法当然是找一个 MapReduce 的 Cluster 将一天的数据 map 成多份来进行处理, 最后 reduce 到一起来算 pXX.
1: lazy val env = ExecutionEnvironment.getExecutionEnvironment // <= 1 2: 3: lazy val rawSources = (ZipkinClient.datasources foldMap interp) unsafeRunSync () 4: 5: lazy val datasources = 6: env.fromCollection(rawSources.toList) // <= 2 7: 8: lazy val program: DataSet[Vector[String]] = datasources 9: .faltMap(splitByHour _) // <= 3 10: .rebalance() // <= 4 11: .map(source => 12: (ZipkinClient.fetchTraces(source) foldMap interp) unsafeRunSync ()) // <= 5 13: .groupBy(_._1.endpoint.id) // <= 6 14: .reduce((s1, s2) => s1 |+| s2) // <= 7 15: .map((traces: (DataSource, Vector[Trace])) => 16: (ZipkinClient.aggregate(traces) foldMap interp) unsafeRunSync ()) // <= 8 17: .map((metrics: (Vector[LatencyMetrics], DataSource)) => 18: (CollectorService 19: .writeMetrics(metrics._1, Vector(metrics._2)) foldMap interp) unsafeRunSync ()) 20:
- , DataSet是 flink batch job 的数据结果, DataSet 中的数据意味着可以并行跑在 cluster 中
- , 类似大数据的 helloworld 计算字符个数例子, 你得先把句子拆成字符
- , 类似于把同样的字符都归到一组, 方便下来计算
有意思的是, 你是可以本地直接 sbt run
的, 此时 flink 的 cluster 会跑在 jvm 中.
但是 跑cluster 在单个 jvm 中会耗费大量资源, 所以最佳实践其实是根据运行环境, 选择不同的 ExecutionEnvironment
lazy val env = envOrNone("EXECUTION_IN_LOCAL") .map(_ => ExecutionEnvironment.createCollectionsEnvironment) .getOrElse(ExecutionEnvironment.getExecutionEnvironment)
若是在本地运行, 比如跑功能测试或集成测试, 使用 CollectionsEnvironment
会省不少资源
以上所述就是小编给大家介绍的《使用 Flink 解救多线程 Scala 应用》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- java中线程安全,线程死锁,线程通信快速入门
- ObjC 多线程简析(一)-多线程简述和线程锁的基本应用
- Java多线程之线程中止
- Android 的线程和线程池
- iOS 多线程之线程安全
- java多线程 线程安全问题
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。