内容简介:本文主要研究一下scalecube-cluster的GossipProtocolscalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocol.javascalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
序
本文主要研究一下scalecube-cluster的GossipProtocol
GossipProtocol
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocol.java
/** * Gossip Protocol component responsible for spreading information (gossips) over the cluster * members using infection-style dissemination algorithms. It provides reliable cross-cluster * broadcast. */ public interface GossipProtocol { /** Starts running gossip protocol. After started it begins to receive and send gossip messages */ void start(); /** Stops running gossip protocol and releases occupied resources. */ void stop(); /** * Spreads given message between cluster members. * * @return future result with gossip id once gossip fully spread. */ Mono<String> spread(Message message); /** Listens for gossips from other cluster members. */ Flux<Message> listen(); }
- GossipProtocol接口定义了start、stop、spread、listen方法
GossipProtocolImpl
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
public final class GossipProtocolImpl implements GossipProtocol { private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocolImpl.class); // Qualifiers public static final String GOSSIP_REQ = "sc/gossip/req"; // Injected private final Member localMember; private final Transport transport; private final GossipConfig config; // Local State private long currentPeriod = 0; private long gossipCounter = 0; private Map<String, GossipState> gossips = new HashMap<>(); private Map<String, MonoSink<String>> futures = new HashMap<>(); private List<Member> remoteMembers = new ArrayList<>(); private int remoteMembersIndex = -1; // Disposables private final Disposable.Composite actionsDisposables = Disposables.composite(); // Subject private final FluxProcessor<Message, Message> subject = DirectProcessor.<Message>create().serialize(); private final FluxSink<Message> sink = subject.sink(); // Scheduled private final Scheduler scheduler; /** * Creates new instance of gossip protocol with given memberId, transport and settings. * * @param localMember local cluster member * @param transport cluster transport * @param membershipProcessor membership event processor * @param config gossip protocol settings * @param scheduler scheduler */ public GossipProtocolImpl( Member localMember, Transport transport, Flux<MembershipEvent> membershipProcessor, GossipConfig config, Scheduler scheduler) { this.transport = Objects.requireNonNull(transport); this.config = Objects.requireNonNull(config); this.localMember = Objects.requireNonNull(localMember); this.scheduler = Objects.requireNonNull(scheduler); // Subscribe actionsDisposables.addAll( Arrays.asList( membershipProcessor // .publishOn(scheduler) .subscribe(this::onMemberEvent, this::onError), transport .listen() .publishOn(scheduler) .filter(this::isGossipReq) .subscribe(this::onGossipReq, this::onError))); } @Override public void start() { actionsDisposables.add( scheduler.schedulePeriodically( this::doSpreadGossip, config.getGossipInterval(), config.getGossipInterval(), TimeUnit.MILLISECONDS)); } @Override public void stop() { // Stop accepting gossip requests and spreading gossips actionsDisposables.dispose(); // Stop publishing events sink.complete(); } @Override public Mono<String> spread(Message message) { return Mono.fromCallable(() -> message) .subscribeOn(scheduler) .flatMap(msg -> Mono.create(sink -> futures.put(createAndPutGossip(msg), sink))); } @Override public Flux<Message> listen() { return subject.onBackpressureBuffer(); } private void onMemberEvent(MembershipEvent event) { Member member = event.member(); if (event.isRemoved()) { remoteMembers.remove(member); } if (event.isAdded()) { remoteMembers.add(member); } } private void onGossipReq(Message message) { long period = this.currentPeriod; GossipRequest gossipRequest = message.data(); for (Gossip gossip : gossipRequest.gossips()) { GossipState gossipState = gossips.get(gossip.gossipId()); if (gossipState == null) { // new gossip gossipState = new GossipState(gossip, period); gossips.put(gossip.gossipId(), gossipState); sink.next(gossip.message()); } gossipState.addToInfected(gossipRequest.from()); } } private boolean isGossipReq(Message message) { return GOSSIP_REQ.equals(message.qualifier()); } private String createAndPutGossip(Message message) { long period = this.currentPeriod; Gossip gossip = new Gossip(generateGossipId(), message); GossipState gossipState = new GossipState(gossip, period); gossips.put(gossip.gossipId(), gossipState); return gossip.gossipId(); } //...... }
- GossipProtocolImpl实现了GossipProtocol接口,它维护了名为gossips的gossipId与GossipState的map,以及remoteMembers列表
- 它的构造器订阅了membershipProcessor,触发onMemberEvent方法,该方法根据MembershipEvent来对remoteMembers进行添加或移除member;订阅了transport.listen(),过滤出GossipReq,触发onGossipReq方法,该方法合并GossipRequest的gossips到本地的gossips,对于新的gossip的message则发送到sink,并维护该gossip的gossipState,将请求的memberId添加到infected中;spread方法则将message放入到本地的gossips中
- start方法每隔gossipInterval执行doSpreadGossip方法;spread方法则通过createAndPutGossip创建Gossip并放入gossips中
doSpreadGossip
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
public final class GossipProtocolImpl implements GossipProtocol { //...... private List<Member> remoteMembers = new ArrayList<>(); private int remoteMembersIndex = -1; private void doSpreadGossip() { // Increment period long period = currentPeriod++; // Check any gossips exists if (gossips.isEmpty()) { return; // nothing to spread } try { // Spread gossips to randomly selected member(s) selectGossipMembers().forEach(member -> spreadGossipsTo(period, member)); // Sweep gossips sweepGossips(period); } catch (Exception ex) { LOGGER.warn("Exception at doSpreadGossip[{}]: {}", period, ex.getMessage(), ex); } } private void spreadGossipsTo(long period, Member member) { // Select gossips to send List<Gossip> gossips = selectGossipsToSend(period, member); if (gossips.isEmpty()) { return; // nothing to spread } // Send gossip request Address address = member.address(); gossips .stream() .map(this::buildGossipRequestMessage) .forEach( message -> transport .send(address, message) .subscribe( null, ex -> LOGGER.debug( "Failed to send GossipReq[{}]: {} to {}, cause: {}", period, message, address, ex.toString()))); } private List<Gossip> selectGossipsToSend(long period, Member member) { int periodsToSpread = ClusterMath.gossipPeriodsToSpread(config.getGossipRepeatMult(), remoteMembers.size() + 1); return gossips .values() .stream() .filter( gossipState -> gossipState.infectionPeriod() + periodsToSpread >= period) // max rounds .filter(gossipState -> !gossipState.isInfected(member.id())) // already infected .map(GossipState::gossip) .collect(Collectors.toList()); } private List<Member> selectGossipMembers() { int gossipFanout = config.getGossipFanout(); if (remoteMembers.size() < gossipFanout) { // select all return remoteMembers; } else { // select random members // Shuffle members initially and once reached top bound if (remoteMembersIndex < 0 || remoteMembersIndex + gossipFanout > remoteMembers.size()) { Collections.shuffle(remoteMembers); remoteMembersIndex = 0; } // Select members List<Member> selectedMembers = gossipFanout == 1 ? Collections.singletonList(remoteMembers.get(remoteMembersIndex)) : remoteMembers.subList(remoteMembersIndex, remoteMembersIndex + gossipFanout); // Increment index and return result remoteMembersIndex += gossipFanout; return selectedMembers; } } private Message buildGossipRequestMessage(Gossip gossip) { GossipRequest gossipRequest = new GossipRequest(gossip, localMember.id()); return Message.withData(gossipRequest) .qualifier(GOSSIP_REQ) .sender(localMember.address()) .build(); } private void sweepGossips(long period) { // Select gossips to sweep int periodsToSweep = ClusterMath.gossipPeriodsToSweep(config.getGossipRepeatMult(), remoteMembers.size() + 1); Set<GossipState> gossipsToRemove = gossips .values() .stream() .filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSweep) .collect(Collectors.toSet()); // Check if anything selected if (gossipsToRemove.isEmpty()) { return; // nothing to sweep } // Sweep gossips LOGGER.debug("Sweep gossips[{}]: {}", period, gossipsToRemove); for (GossipState gossipState : gossipsToRemove) { gossips.remove(gossipState.gossip().gossipId()); MonoSink<String> sink = futures.remove(gossipState.gossip().gossipId()); if (sink != null) { sink.success(gossipState.gossip().gossipId()); } } } //...... }
- doSpreadGossip方法首先递增currentPeriod,然后执行selectGossipMembers,遍历该member执行spreadGossipsTo,最后执行sweepGossips
- selectGossipMembers方法会根据gossipFanout配置随机选择gossipFanout个member,这里维护了remoteMembersIndex,具体是对remoteMembers进行subList,当remoteMembersIndex小于0或remoteMembersIndex + gossipFanout > remoteMembers.size()时会Collections.shuffle(remoteMembers)并重置remoteMembersIndex为0,之后对remoteMembersIndex加上gossipFanout
- spreadGossipsTo方法首先执行selectGossipsToSend获取要发送的gossips,然后通过buildGossipRequestMessage构造GOSSIP_REQ消息,最后通过transport.send方法发送
- sweepGossips方法则选取periodsToSweep,然后从gossips移除period > gossipState.infectionPeriod() + periodsToSweep的gossipState
小结
- GossipProtocol接口定义了start、stop、spread、listen方法;GossipProtocolImpl实现了GossipProtocol接口,它维护了名为gossips的gossipId与GossipState的map,以及remoteMembers列表
- GossipProtocolImpl的构造器订阅了membershipProcessor,触发onMemberEvent方法,该方法根据MembershipEvent来对remoteMembers进行添加或移除member;订阅了transport.listen(),过滤出GossipReq,触发onGossipReq方法,该方法合并GossipRequest的gossips到本地的gossips,对于新的gossip的message则发送到sink,并维护该gossip的gossipState,将请求的memberId添加到infected中;spread方法则将message放入到本地的gossips中
- GossipProtocolImpl的start方法每隔gossipInterval执行doSpreadGossip方法;spread方法则通过createAndPutGossip创建Gossip并放入gossips中;doSpreadGossip方法首先递增currentPeriod,然后执行selectGossipMembers,遍历该member执行spreadGossipsTo,最后执行sweepGossips
这里GossipProtocolImpl注册了onMemberEvent及onGossipReq,其中onMemberEvent用于监听MembershipEvent,并根据该event来维护remoteMembers列表;onGossipReq则是监听其他member的doSpreadGossip方法发送过来的GossipReq消息,合并该消息的gossips到本地的gossips;而doSpreadGossip方法则是每隔gossipInterval执行,根据gossipFanout配置随机选择gossipFanout个member,然后针对每个member选择要发送的gossips进行spread( onGossipReq及spread方法会更改gossips,而每隔gossipInterval触发的doSpreadGossip则从gossips选择待spread的消息进行发送
)
doc
以上所述就是小编给大家介绍的《聊聊scalecube-cluster的GossipProtocol》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
你必须知道的213个C语言问题
范立锋、李世欣 / 人民邮电出版社 / 2010-6 / 45.00元
《你必须知道的213个C语言问题》精选了213个在C语言程序设计中经常遇到的问题,目的是帮助读者解决在C语言学习和开发中遇到的实际困难,提高读者学习和开发的效率。这些问题涵盖了C语言与软件开发、C语言基础、编译预处理、字符串、函数、键盘操作、文件、目录和磁盘、数组、指针和结构、DOS服务和BIOS服务、日期和时间、重定向I/O和进程命令、C语言开发常见错误及程序调试等内容,均是作者经过充分的调研,......一起来看看 《你必须知道的213个C语言问题》 这本书的介绍吧!