内容简介:先简单介绍下Dump Plugin的由来,在搜索Dump中心服务化的项目中,我们把Dump中心的增量数据产出分为2个阶段,Loader阶段和Join阶段,Loader阶段把数据准备成Key-Values形式,Join阶段将数据取出,计算各种业务逻辑并产出最终数据。业务逻辑的计算是相当繁琐且易出错,这类事情做一遍足以,所以设计了一个接口,按照业务自身划分成一个个小块逻辑实现接口。这些个小业务逻辑模块即构成Dump的业务Plugin。这样做的好处:1, 按业务本身划分,结构相对清晰,容易维护。
先简单介绍下Dump Plugin的由来,在搜索Dump中心服务化的项目中,我们把Dump中心的增量数据产出分为2个阶段,Loader阶段和Join阶段,Loader阶段把数据准备成Key-Values形式,Join阶段将数据取出,计算各种业务逻辑并产出最终数据。业务逻辑的计算是相当繁琐且易出错,这类事情做一遍足以,所以设计了一个接口,按照业务自身划分成一个个小块逻辑实现接口。这些个小业务逻辑模块即构成Dump的业务Plugin。
这样做的好处:
1, 按业务本身划分,结构相对清晰,容易维护。
2, 架构和业务通过接口交互,重构架构将尽可能少的影响业务代码
3, 每个业务模块的耗时能准确统计出并能做针对性的优化。
在最初的版本中,先根据依赖关系计算好plugin的执行顺序,然后顺序执行,是一个串行的过程,如下图:
此种方式,计算耗时与业务的复杂程度成正比。而目前Dump中心已经有十几个个业务逻辑Plugin,并且plugin之间有复杂的依赖关系。所以我们尝试用更高效的并发方式去运行这些plugin。这个项目用的开发语言是Java,Java的多线程有多种成熟的设计模式,结合现有框架,我们设计了两种方案并分别尝试。
方案1,以单条数据为粒度,在一条数据的运行内部实现并行化,如下图:
简单的来说,就是起一个工作线程组来运行plugin,来一条数据后,工作线程根据依赖关系获取当前可运行的plugin,当所有plugin都运行完毕后,输出数据。类似于Work Thread模式,工作线程没数据就等着,来了数据就做。主要代码流程如下:
public class Main { private Semaphore mainSemaphore, workSemaphore; private Data data; private int workThreadNum; public Data run(Data data) { this.data = data; workSemaphore.release(workThreadNum); mainSemaphore.acquire(workThreadNum); return this.data; } class WorkThread implements Runnable { private boolean loop = true; public void run() { while(loop) { workSemaphore.acquire(); //getValidPlugin: 一个synchronized的调用,获得未运行的Plguin Plugin plugin = getValidPlugin(); if(plugin != null) plugin.run(data); else mainSemaphore.release(1); } } } }
代码中使用两个Semaphore信号量来同步主线程和工作线程,每条数据都需要激活和同步,并有一个synchronized的方法来获取当前可运行的Plugin,线程同步开销比较大。实现过程中,采用重任务优先,预先计算等方法,降低并行额外引入的开销。在单个Plugin耗时长,关键路径和非关键路径上的plugin耗时相差不大的情况下,此种方案效果不错。但在目前的业务情况下,效果提升不明显,实测约提升了10%。
通过分析plugin的依赖关系,发现目前业务逻辑下,有两个耗时大的plugin均是关键路径上的,方案1的并行是针对单个宝贝的,我们想能否在批量数据或数据流中实现数据维度的并行。数据维度的并行,最简单的方案是将数据逐条扔给ThreadPoolExecutor,每个线程串行执行,但这种方案对于现有结构来说不合适,原因是plugin的代码无法保证线程安全,于是就有了方案2,如下图:
每个Plugin都起一个工作线程,数据像流水线一样从Plugin中间流过,plugin的依赖关系决定数据的流向,类似于Guarded Suspension模式,工作线程维护一个Queue来缓存,等plugin准备好,就从Queue中取数据处理。主要代码流程如下:
public interface QueuePutter { public void put(Data data); } public class Main implements QueuePutter{ private BlockingQueue <data> resultQueue = new LinkedBlockingQueue <data> (); public List <data> run(List <data> dataList) { List <data> resultList = new ArrayList <data> (); for(Data data : dataList) { firstPluginThread.put(data); } putLastData(); while(true) { Data data = resultQueue.take(); if(isLastData(data)) break; resultList.add(data); } return resultList; } public void put(Data data) { this.resultQueue.put(data); } } public class PluginThread implements Runnable,QueuePutter { private Plugin plugin = null; private PluginThread nextPluginThread = null; private boolean loop = true; private BlockingQueue <data> queue = new LinkedBlockingQueue <data> (10); public PluginThread(Plugin plugin, QueuePutter next) { this.plugin = plugin; this.nextPluginThread = next; } public void run() { while(loop) { Data data = this.queue.take(); data = this.plugin.run(data); this.nextPluginThread.put(data); } } public void put(Data data) { this.queue.put(data); } } </data> </data> </data> </data> </data> </data> </data> </data>
代码中同步操作通过BlockingQueue来实现。主线程将数据分发给第一个plugin线程,而最后一个plugin线程负责将数据写回给主线程。主线程用一条特殊的数据来标识这组数据的结尾,而后在主线程队列里一直扫描特殊数据,FIFO队列保证了处理的时序。逻辑上来说,方案2的单条数据的处理还是串行,而是多条数据之间的并行,整体性能只取决于最慢的Plugin的耗时,实测中对于批量数据来说,效果要好于方案1。
总结:实践Dump Plugin并行的两种实现方式,对单数据的列并行和对批量数据/数据流的行并行。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- sqltoy-orm-4.17.6 发版,支持 Greenplum、并行查询可设置并行数量
- PostgreSQL并行查询介绍
- nodejs“并行”处理尝试
- 并行python迭代
- Golang 多核并行
- haskell – 有效的并行策略
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
R for Data Science
Hadley Wickham、Garrett Grolemund / O'Reilly Media / 2016-12-25 / USD 39.99
http://r4ds.had.co.nz/一起来看看 《R for Data Science》 这本书的介绍吧!