内容简介:ZooKeeper 是一个开源的分布式协调服务,ZooKeeper框架最初是在“Yahoo!"上构建的,用于以简单而稳健的方式访问他们的应用程序。 后来,Apache ZooKeeper成为Hadoop,HBase和其他分布式框架使用的有组织服务的标准。 例如,Apache HBase使用ZooKeeper跟踪分布式数据的状态。ZooKeeper 的设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的接口提供给用户使用。ZooKeeper 通常用于:命名服
ZooKeeper 是一个开源的分布式协调服务,ZooKeeper框架最初是在“Yahoo!"上构建的,用于以简单而稳健的方式访问他们的应用程序。 后来,Apache ZooKeeper成为Hadoop,HBase和其他分布式框架使用的有组织服务的标准。 例如,Apache HBase使用ZooKeeper跟踪分布式数据的状态。ZooKeeper 的设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的接口提供给用户使用。
ZooKeeper 通常用于:命名服务、配置管理、集群管理、分布式协调/通知、分布式锁和分布式队列等等。
各个爬虫的节点通过注册到 ZooKeeper 从而实现爬虫集群的管理。 NetDiscovery 正是借助了 ZooKeeper 的特性来监控爬虫集群。
NetDiscovery 是一款基于 Vert.x、RxJava 2 等框架实现的通用爬虫框架。它包含了丰富的 特性 。
爬虫集群的监控
NetDiscovery 包含了 Spider 和 SpiderEngine。 Spider 用于实现爬虫的业务逻辑,Spider 可以添加到 SpiderEngine,由 SpiderEngine 来管理各个 Spider 的生命周期。
但是 SpiderEngine 部署到每一个节点之后,SpiderEngine 如何进行监控和管理呢?
可以将 SpiderEngine 在运行时,先注册到 ZooKeeper。(需要事先在 ZooKeeper 集群创建 /netdiscovery 节点)
/** * 启动SpiderEngine中所有的spider,让每个爬虫并行运行起来。 * */ public void run() { if (Preconditions.isNotBlank(spiders)) { registerZK(); ...... } } /** * 将当前 SpiderEngine 注册到 zookeeper 指定的目录 /netdiscovery 下 */ private void registerZK() { if (Preconditions.isNotBlank(zkStr) && useZk) { log.info("zkStr: {}", zkStr); RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); CuratorFramework client = CuratorFrameworkFactory.newClient(zkStr, retryPolicy); client.start(); try { String ipAddr = InetAddress.getLocalHost().getHostAddress() + "-" + defaultHttpdPort + "-" + System.currentTimeMillis(); String nowSpiderEngineZNode = "/netdiscovery/" + ipAddr; client.create().withMode(CreateMode.EPHEMERAL).forPath(nowSpiderEngineZNode,nowSpiderEngineZNode.getBytes()); } catch (UnknownHostException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } } 复制代码
另外,需要使用 NetDiscovery Monitor 的 CuratorManager 类。 它借助 Zookeeper 的 Watcher 机制,监听已经注册到 /netdiscovery 这个父 zNode 下的各个子 zNode ,也就是各个 SpiderEngine。
Watcher机制是指 ZooKeeper 客户端向 ZooKeeper 服务器注册 Watcher 的同时,会将 Watcher 对象存储在客户端的 WatchManager 中。ZooKeeper 服务器触发 Watcher 事件后,会向客户端发送通知,客户端线程从 WatchManager 中回调 Watcher 执行相应的功能。
/** * 当前所监控的父的 zNode 下若是子 zNode 发生了变化:新增,删除,修改 * <p> * 下述方法都会触发执行 * * @param event */ @Override public void process(WatchedEvent event) { List<String> newZodeInfos = null; try { newZodeInfos = client.getChildren().usingWatcher(this).forPath("/netdiscovery"); //根据初始化容器的长度与最新的容器的长度进行比对,就可以推导出当前 SpiderEngine 集群的状态:新增,宕机/下线,变更... //哪个容器中元素多,就循环遍历哪个容器。 if (Preconditions.isNotBlank(newZodeInfos)) { if (newZodeInfos.size()>allZnodes.size()){ //明确显示新增了哪个 SpiderEngine 节点 for (String nowZNode:newZodeInfos) { if (!allZnodes.contains(nowZNode)){ log.info("新增 SpiderEngine 节点{}", nowZNode); } } }else if (newZodeInfos.size()<allZnodes.size()){ // 宕机/下线 // 明确显示哪个 SpiderEngine 节点宕机/下线了 for (String initZNode : allZnodes) { if (!newZodeInfos.contains(initZNode)) { log.info("SpiderEngine 节点【{}】下线了!", initZNode); // 如果有下线的处理,则处理(例如发邮件、短信等) if (serverOfflineProcess!=null) { serverOfflineProcess.process(); } } } }else { // SpiderEngine 集群正常运行; // 宕机/下线了,当时马上重启了,总的爬虫未发生变化 } } } catch (Exception e) { e.printStackTrace(); } allZnodes = newZodeInfos; } 复制代码
所以需要单独运行一个进程,例如:
public class TestCuratorManager { public static void main(String[] args) { CuratorManager curatorManager = new CuratorManager(); curatorManager.start(); } } 复制代码
下图反映了 ZooKeeper 如何监控 SpiderEngine 集群。
总结
爬虫框架 github 地址: github.com/fengzhizi71…
本文介绍了如何使用 ZooKeeper 来监控爬虫的集群。未来,NetDiscovery 还会增加更为通用的功能。
Java与Android技术栈:每周更新推送原创技术文章,欢迎扫描下方的公众号二维码并关注,期待与您的共同成长和进步。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 用 Kubernetes 部署 Crawlab 爬虫管理节点集群
- 使用Docker Swarm搭建分布式爬虫集群
- 使用 puppeteer 集群爬取特定类型内容的爬虫实践
- 爬虫需谨慎,那些你不知道的爬虫与反爬虫套路!
- 反爬虫之字体反爬虫
- 反爬虫之字体反爬虫
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。