聊聊apache gossip的ActiveGossiper

栏目: 后端 · 发布时间: 6年前

内容简介:本文主要研究一下apache gossip的ActiveGossiperincubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.javaincubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.jav

本文主要研究一下apache gossip的ActiveGossiper

AbstractActiveGossiper

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java

/**
 * The ActiveGossipThread sends information. Pick a random partner and send the membership list to that partner
 */
public abstract class AbstractActiveGossiper {

  protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class);

  protected final GossipManager gossipManager;
  protected final GossipCore gossipCore;
  private final Histogram sharedDataHistogram;
  private final Histogram sendPerNodeDataHistogram;
  private final Histogram sendMembershipHistogram;
  private final Random random;
  private final GossipSettings gossipSettings;

  public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
    this.gossipManager = gossipManager;
    this.gossipCore = gossipCore;
    sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));
    sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));
    sendMembershipHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistogram-time"));
    random = new Random();
    gossipSettings = gossipManager.getSettings();
  }

  public void init() {

  }

  public void shutdown() {

  }

  public final void sendShutdownMessage(LocalMember me, LocalMember target){
    if (target == null){
      return;
    }
    ShutdownMessage m = new ShutdownMessage();
    m.setNodeId(me.getId());
    m.setShutdownAtNanos(gossipManager.getClock().nanoTime());
    gossipCore.sendOneWay(m, target.getUri());
  }

  //......

  /**
   * Performs the sending of the membership list, after we have incremented our own heartbeat.
   */
  protected void sendMembershipList(LocalMember me, LocalMember member) {
    if (member == null){
      return;
    }
    long startTime = System.currentTimeMillis();
    me.setHeartbeat(System.nanoTime());
    UdpActiveGossipMessage message = new UdpActiveGossipMessage();
    message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
    message.setUuid(UUID.randomUUID().toString());
    message.getMembers().add(convert(me));
    for (LocalMember other : gossipManager.getMembers().keySet()) {
      message.getMembers().add(convert(other));
    }
    Response r = gossipCore.send(message, member.getUri());
    if (r instanceof ActiveGossipOk){
      //maybe count metrics here
    } else {
      LOGGER.debug("Message " + message + " generated response " + r);
    }
    sendMembershipHistogram.update(System.currentTimeMillis() - startTime);
  }

  protected final Member convert(LocalMember member){
    Member gm = new Member();
    gm.setCluster(member.getClusterName());
    gm.setHeartbeat(member.getHeartbeat());
    gm.setUri(member.getUri().toASCIIString());
    gm.setId(member.getId());
    gm.setProperties(member.getProperties());
    return gm;
  }

  /**
   *
   * @param memberList
   *          An immutable list
   * @return The chosen LocalGossipMember to gossip with.
   */
  protected LocalMember selectPartner(List<LocalMember> memberList) {
    LocalMember member = null;
    if (memberList.size() > 0) {
      int randomNeighborIndex = random.nextInt(memberList.size());
      member = memberList.get(randomNeighborIndex);
    }
    return member;
  }
}
  • AbstractActiveGossiper的构造器需要传入gossipManager及gossipCore;它定义了sendShutdownMessage、sendMembershipList、selectPartner等方法
  • selectPartner方法在memberList不为空的情况下随机生成randomNeighborIndex选择出一个LocalMember
  • sendMembershipList方法首先设置me的heartbeat,然后创建UdpActiveGossipMessage,该message的members首先是当前的localMember,然后再添加gossipManager.getMembers(),最后通过gossipCore.send发送给选中的member

ActiveGossipMessageHandler

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java

public class ActiveGossipMessageHandler implements MessageHandler {
  
  /**
   * @param gossipCore context.
   * @param gossipManager context.
   * @param base message reference.
   * @return boolean indicating success.
   */
  @Override
  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
    List<Member> remoteGossipMembers = new ArrayList<>();
    RemoteMember senderMember = null;
    UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
    for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
      URI u;
      try {
        u = new URI(activeGossipMessage.getMembers().get(i).getUri());
      } catch (URISyntaxException e) {
        GossipCore.LOGGER.debug("Gossip message with faulty URI", e);
        continue;
      }
      RemoteMember member = new RemoteMember(
              activeGossipMessage.getMembers().get(i).getCluster(),
              u,
              activeGossipMessage.getMembers().get(i).getId(),
              activeGossipMessage.getMembers().get(i).getHeartbeat(),
              activeGossipMessage.getMembers().get(i).getProperties());
      if (i == 0) {
        senderMember = member;
      }
      if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))) {
        UdpNotAMemberFault f = new UdpNotAMemberFault();
        f.setException("Not a member of this cluster " + i);
        f.setUriFrom(activeGossipMessage.getUriFrom());
        f.setUuid(activeGossipMessage.getUuid());
        GossipCore.LOGGER.warn(f);
        gossipCore.sendOneWay(f, member.getUri());
        continue;
      }
      remoteGossipMembers.add(member);
    }
    UdpActiveGossipOk o = new UdpActiveGossipOk();
    o.setUriFrom(activeGossipMessage.getUriFrom());
    o.setUuid(activeGossipMessage.getUuid());
    gossipCore.sendOneWay(o, senderMember.getUri());
    gossipCore.mergeLists(senderMember, remoteGossipMembers);
    return true;
  }
}
  • 当目标member接收到UdpActiveGossipMessage的时候,由ActiveGossipMessageHandler来处理该消息;它首先从activeGossipMessage.getMembers(),转换为RemoteMember,添加到remoteGossipMembers,之后通过gossipCore.sendOneWay给发送方回复UdpActiveGossipOk,最后执行gossipCore.mergeLists(senderMember, remoteGossipMembers)

GossipCore

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java

public class GossipCore implements GossipCoreConstants {

  class LatchAndBase {
    private final CountDownLatch latch;
    private volatile Base base;
    
    LatchAndBase(){
      latch = new CountDownLatch(1);
    }
    
  }
  public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
  private final GossipManager gossipManager;
  private ConcurrentHashMap<String, LatchAndBase> requests;
  private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
  private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
  private final Meter messageSerdeException;
  private final Meter transmissionException;
  private final Meter transmissionSuccess;
  private final DataEventManager eventManager;
  
  public GossipCore(GossipManager manager, MetricRegistry metrics){
    this.gossipManager = manager;
    requests = new ConcurrentHashMap<>();
    perNodeData = new ConcurrentHashMap<>();
    sharedData = new ConcurrentHashMap<>();
    eventManager = new DataEventManager(metrics);
    metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
    metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() ->  sharedData.size());
    metrics.register(REQUEST_SIZE, (Gauge<Integer>)() ->  requests.size());
    messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
    transmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
    transmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
  }

  public void receive(Base base) {
    if (!gossipManager.getMessageHandler().invoke(this, gossipManager, base)) {
      LOGGER.warn("received message can not be handled");
    }
  }

  /**
   * Sends a blocking message.
   * todo: move functionality to TransportManager layer.
   * @param message
   * @param uri
   * @throws RuntimeException if data can not be serialized or in transmission error
   */
  private void sendInternal(Base message, URI uri) {
    byte[] json_bytes;
    try {
      json_bytes = gossipManager.getProtocolManager().write(message);
    } catch (IOException e) {
      messageSerdeException.mark();
      throw new RuntimeException(e);
    }
    try {
      gossipManager.getTransportManager().send(uri, json_bytes);
      transmissionSuccess.mark();
    } catch (IOException e) {
      transmissionException.mark();
      throw new RuntimeException(e);
    }
  }

  public Response send(Base message, URI uri){
    if (LOGGER.isDebugEnabled()){
      LOGGER.debug("Sending " + message);
      LOGGER.debug("Current request queue " + requests);
    }

    final Trackable t;
    LatchAndBase latchAndBase = null;
    if (message instanceof Trackable){
      t = (Trackable) message;
      latchAndBase = new LatchAndBase();
      requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase);
    } else {
      t = null;
    }
    sendInternal(message, uri);
    if (latchAndBase == null){
      return null;
    }
    
    try {
      boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);
      if (complete){
        return (Response) latchAndBase.base;
      } else{
        return null;
      }
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    } finally {
      if (latchAndBase != null){
        requests.remove(t.getUuid() + "/" + t.getUriFrom());
      }
    }
  }

  /**
   * Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used
   * when the protocol for the message is not to wait for a response
   * @param message the message to send
   * @param u the uri to send it to
   */
  public void sendOneWay(Base message, URI u) {
    try {
      sendInternal(message, u);
    } catch (RuntimeException ex) {
      LOGGER.debug("Send one way failed", ex);
    }
  }

  public void handleResponse(String k, Base v) {
    LatchAndBase latch = requests.get(k);
    latch.base = v;
    latch.latch.countDown();
  }

  /**
   * Merge lists from remote members and update heartbeats
   *
   * @param senderMember
   * @param remoteList
   *
   */
  public void mergeLists(RemoteMember senderMember, List<Member> remoteList) {
    if (LOGGER.isDebugEnabled()){
      debugState(senderMember, remoteList);
    }
    for (LocalMember i : gossipManager.getDeadMembers()) {
      if (i.getId().equals(senderMember.getId())) {
        LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
        i.recordHeartbeat(senderMember.getHeartbeat());
        i.setHeartbeat(senderMember.getHeartbeat());
        //TODO consider forcing an UP here
      }
    }
    for (Member remoteMember : remoteList) {
      if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
        continue;
      }
      LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(),
      remoteMember.getUri(),
      remoteMember.getId(),
      remoteMember.getHeartbeat(),
      remoteMember.getProperties(),
      gossipManager.getSettings().getWindowSize(),
      gossipManager.getSettings().getMinimumSamples(),
      gossipManager.getSettings().getDistribution());
      aNewMember.recordHeartbeat(remoteMember.getHeartbeat());
      Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP);
      if (result != null){
        for (Entry<LocalMember, GossipState> localMember : gossipManager.getMembers().entrySet()){
          if (localMember.getKey().getId().equals(remoteMember.getId())){
            localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());
            localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());
            localMember.getKey().setProperties(remoteMember.getProperties());
          }
        }
      }
    }
    if (LOGGER.isDebugEnabled()){
      debugState(senderMember, remoteList);
    }
  }

  //......

}
  • GossipCore的构造器需要GossipManager参数,它定义了receive、send、sendOneWay、handleResponse、mergeLists等方法
  • mergeLists方法主要是将接收到的remoteList转换为LocalMember,然后通过的putIfAbsent方法与gossipManager.getMembers()进行合并
  • 合并的同时会更新已有localMember的heartbeat,recordHeartbeat方法会忽略小于等于latestHeartbeatMs的值

GossipManager

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java

public abstract class GossipManager {

  public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
  
  // this mapper is used for ring and user-data persistence only. NOT messages.
  public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {
    private static final long serialVersionUID = 1L;
  {
    enableDefaultTyping();
    configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
  }};

  private final ConcurrentSkipListMap<LocalMember, GossipState> members;
  private final LocalMember me;
  private final GossipSettings settings;
  private final AtomicBoolean gossipServiceRunning;
  
  private TransportManager transportManager;
  private ProtocolManager protocolManager;
  
  private final GossipCore gossipCore;
  private final DataReaper dataReaper;
  private final Clock clock;
  private final ScheduledExecutorService scheduledServiced;
  private final MetricRegistry registry;
  private final RingStatePersister ringState;
  private final UserDataPersister userDataState;
  private final GossipMemberStateRefresher memberStateRefresher;
  
  private final MessageHandler messageHandler;
  private final LockManager lockManager;

  public GossipManager(String cluster,
                       URI uri, String id, Map<String, String> properties, GossipSettings settings,
                       List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,
                       MessageHandler messageHandler) {
    this.settings = settings;
    this.messageHandler = messageHandler;

    clock = new SystemClock();
    me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
            settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
    gossipCore = new GossipCore(this, registry);
    this.lockManager = new LockManager(this, settings.getLockManagerSettings(), registry);
    dataReaper = new DataReaper(gossipCore, clock);
    members = new ConcurrentSkipListMap<>();
    for (Member startupMember : gossipMembers) {
      if (!startupMember.equals(me)) {
        LocalMember member = new LocalMember(startupMember.getClusterName(),
                startupMember.getUri(), startupMember.getId(),
                clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),
                settings.getMinimumSamples(), settings.getDistribution());
        //TODO should members start in down state?
        members.put(member, GossipState.DOWN);
      }
    }
    gossipServiceRunning = new AtomicBoolean(true);
    this.scheduledServiced = Executors.newScheduledThreadPool(1);
    this.registry = registry;
    this.ringState = new RingStatePersister(GossipManager.buildRingStatePath(this), this);
    this.userDataState = new UserDataPersister(
        gossipCore,
        GossipManager.buildPerNodeDataPath(this),
        GossipManager.buildSharedDataPath(this));
    this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData);
    readSavedRingState();
    readSavedDataState();
  }

  /**
   * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
   * thread and start the receiver thread.
   */
  public void init() {
    
    // protocol manager and transport managers are specified in settings.
    // construct them here via reflection.
    
    protocolManager = ReflectionUtils.constructWithReflection(
        settings.getProtocolManagerClass(),
        new Class<?>[] { GossipSettings.class, String.class, MetricRegistry.class },
        new Object[] { settings, me.getId(), this.getRegistry() }
    );
    
    transportManager = ReflectionUtils.constructWithReflection(
        settings.getTransportManagerClass(),
        new Class<?>[] { GossipManager.class, GossipCore.class},
        new Object[] { this, gossipCore }
    );
    
    // start processing gossip messages.
    transportManager.startEndpoint();
    transportManager.startActiveGossiper();
    
    dataReaper.init();
    if (settings.isPersistRingState()) {
      scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
    }
    if (settings.isPersistDataState()) {
      scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
    }
    memberStateRefresher.init();
    LOGGER.debug("The GossipManager is started.");
  }

  /**
   * Shutdown the gossip service.
   */
  public void shutdown() {
    gossipServiceRunning.set(false);
    lockManager.shutdown();
    gossipCore.shutdown();
    transportManager.shutdown();
    dataReaper.close();
    memberStateRefresher.shutdown();
    scheduledServiced.shutdown();
    try {
      scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.error(e);
    }
    scheduledServiced.shutdownNow();
  }

  //......

}
  • GossipManager使用ConcurrentSkipListMap维护了LocalMember与GossipState的映射的members,同时该构造器创建了RingStatePersister、UserDataPersister、GossipMemberStateRefresher
  • init方法调用了transportManager.startEndpoint()及startActiveGossiper方法,同时通过scheduleAtFixedRate注册了RingStatePersister、UserDataPersister这两个定时任务,另外还执行了memberStateRefresher.init()
  • shutdown方法执行了gossipCore.shutdown()、transportManager.shutdown()、memberStateRefresher.shutdown()等

GossipMemberStateRefresher

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java

public class GossipMemberStateRefresher {
  public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class);

  private final Map<LocalMember, GossipState> members;
  private final GossipSettings settings;
  private final List<GossipListener> listeners = new CopyOnWriteArrayList<>();
  private final Clock clock;
  private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData;
  private final ExecutorService listenerExecutor;
  private final ScheduledExecutorService scheduledExecutor;
  private final BlockingQueue<Runnable> workQueue;

  public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings settings,
                                    GossipListener listener,
                                    BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
    this.members = members;
    this.settings = settings;
    listeners.add(listener);
    this.findPerNodeGossipData = findPerNodeGossipData;
    clock = new SystemClock();
    workQueue = new ArrayBlockingQueue<>(1024);
    listenerExecutor = new ThreadPoolExecutor(1, 20, 1, TimeUnit.SECONDS, workQueue,
            new ThreadPoolExecutor.DiscardOldestPolicy());
    scheduledExecutor = Executors.newScheduledThreadPool(1);
  }

  public void init() {
    scheduledExecutor.scheduleAtFixedRate(() -> run(), 0, 100, TimeUnit.MILLISECONDS);
  }

  public void run() {
    try {
      runOnce();
    } catch (RuntimeException ex) {
      LOGGER.warn("scheduled state had exception", ex);
    }
  }

  public void runOnce() {
    for (Entry<LocalMember, GossipState> entry : members.entrySet()) {
      boolean userDown = processOptimisticShutdown(entry);
      if (userDown)
        continue;

      Double phiMeasure = entry.getKey().detect(clock.nanoTime());
      GossipState requiredState;

      if (phiMeasure != null) {
        requiredState = calcRequiredState(phiMeasure);
      } else {
        requiredState = calcRequiredStateCleanupInterval(entry.getKey(), entry.getValue());
      }

      if (entry.getValue() != requiredState) {
        members.put(entry.getKey(), requiredState);
        /* Call listeners asynchronously */
        for (GossipListener listener: listeners)
          listenerExecutor.execute(() -> listener.gossipEvent(entry.getKey(), requiredState));
      }
    }
  }

  public GossipState calcRequiredState(Double phiMeasure) {
    if (phiMeasure > settings.getConvictThreshold())
      return GossipState.DOWN;
    else
      return GossipState.UP;
  }

  public GossipState calcRequiredStateCleanupInterval(LocalMember member, GossipState state) {
    long now = clock.nanoTime();
    long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
    if (nowInMillis - settings.getCleanupInterval() > member.getHeartbeat()) {
      return GossipState.DOWN;
    } else {
      return state;
    }
  }

  /**
   * If we have a special key the per-node data that means that the node has sent us
   * a pre-emptive shutdown message. We process this so node is seen down sooner
   *
   * @param l member to consider
   * @return true if node forced down
   */
  public boolean processOptimisticShutdown(Entry<LocalMember, GossipState> l) {
    PerNodeDataMessage m = findPerNodeGossipData.apply(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
    if (m == null) {
      return false;
    }
    ShutdownMessage s = (ShutdownMessage) m.getPayload();
    if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) {
      members.put(l.getKey(), GossipState.DOWN);
      if (l.getValue() == GossipState.UP) {
        for (GossipListener listener: listeners)
          listenerExecutor.execute(() -> listener.gossipEvent(l.getKey(), GossipState.DOWN));
      }
      return true;
    }
    return false;
  }

  public void register(GossipListener listener) {
    listeners.add(listener);
  }

  public void shutdown() {
    scheduledExecutor.shutdown();
    try {
      scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.debug("Issue during shutdown", e);
    }
    listenerExecutor.shutdown();
    try {
      listenerExecutor.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.debug("Issue during shutdown", e);
    }
    listenerExecutor.shutdownNow();
  }
}
每隔100ms执行
默认为10

AbstractTransportManager

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java

/**
 * Manage the protcol threads (active and passive gossipers).
 */
public abstract class AbstractTransportManager implements TransportManager {
  
  public static final Logger LOGGER = Logger.getLogger(AbstractTransportManager.class);
  
  private final ExecutorService gossipThreadExecutor;
  private final AbstractActiveGossiper activeGossipThread;
  protected final GossipManager gossipManager;
  protected final GossipCore gossipCore;
  
  public AbstractTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
    this.gossipManager = gossipManager;
    this.gossipCore = gossipCore;
    gossipThreadExecutor = Executors.newCachedThreadPool();
    activeGossipThread = ReflectionUtils.constructWithReflection(
      gossipManager.getSettings().getActiveGossipClass(),
        new Class<?>[]{
            GossipManager.class, GossipCore.class, MetricRegistry.class
        },
        new Object[]{
            gossipManager, gossipCore, gossipManager.getRegistry()
        });
  }

  // shut down threads etc.
  @Override
  public void shutdown() {
    gossipThreadExecutor.shutdown();
    if (activeGossipThread != null) {
      activeGossipThread.shutdown();
    }
    try {
      boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
      if (!result) {
        // common when blocking patterns are used to read data from a socket.
        LOGGER.warn("executor shutdown timed out");
      }
    } catch (InterruptedException e) {
      LOGGER.error(e);
    }
    gossipThreadExecutor.shutdownNow();
  }

  @Override
  public void startActiveGossiper() {
    activeGossipThread.init();
  }

  @Override
  public abstract void startEndpoint();
}
  • AbstractTransportManager的startActiveGossiper会调用activeGossipThread.init();这里activeGossipThread为AbstractActiveGossiper的子类,这里我们看下SimpleActiveGossiper

SimpleActiveGossiper

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java

/**
 * Base implementation gossips randomly to live nodes periodically gossips to dead ones
 *
 */
public class SimpleActiveGossiper extends AbstractActiveGossiper {

  private ScheduledExecutorService scheduledExecutorService;
  private final BlockingQueue<Runnable> workQueue;
  private ThreadPoolExecutor threadService;
  
  public SimpleActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
                              MetricRegistry registry) {
    super(gossipManager, gossipCore, registry);
    scheduledExecutorService = Executors.newScheduledThreadPool(2);
    workQueue = new ArrayBlockingQueue<Runnable>(1024);
    threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
            new ThreadPoolExecutor.DiscardOldestPolicy());
  }

  @Override
  public void init() {
    super.init();
    scheduledExecutorService.scheduleAtFixedRate(() -> {
      threadService.execute(() -> {
        sendToALiveMember();
      });
    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(() -> {
      sendToDeadMember();
    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(
            () -> sendPerNodeData(gossipManager.getMyself(),
                    selectPartner(gossipManager.getLiveMembers())),
            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(
            () -> sendSharedData(gossipManager.getMyself(),
                    selectPartner(gossipManager.getLiveMembers())),
            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
  }
  
  @Override
  public void shutdown() {
    super.shutdown();
    scheduledExecutorService.shutdown();
    try {
      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.debug("Issue during shutdown", e);
    }
    sendShutdownMessage();
    threadService.shutdown();
    try {
      threadService.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.debug("Issue during shutdown", e);
    }
  }

  protected void sendToALiveMember(){
    LocalMember member = selectPartner(gossipManager.getLiveMembers());
    sendMembershipList(gossipManager.getMyself(), member);
  }
  
  protected void sendToDeadMember(){
    LocalMember member = selectPartner(gossipManager.getDeadMembers());
    sendMembershipList(gossipManager.getMyself(), member);
  }
  
  /**
   * sends an optimistic shutdown message to several clusters nodes
   */
  protected void sendShutdownMessage(){
    List<LocalMember> l = gossipManager.getLiveMembers();
    int sendTo = l.size() < 3 ? 1 : l.size() / 2;
    for (int i = 0; i < sendTo; i++) {
      threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
    }
  }
}
每隔gossipInterval执行

小结

  • AbstractTransportManager的startActiveGossiper会调用activeGossipThread.init();这里activeGossipThread为AbstractActiveGossiper的子类,这里假设为SimpleActiveGossiper
  • SimpleActiveGossiper的init方法,这里通过scheduledExecutorService的scheduleAtFixedRate注册了sendToALiveMember、sendToDeadMember、sendPerNodeData、sendSharedData四个定时任务( 每隔gossipInterval执行 )
  • sendToALiveMember首先通过父类的selectPartner方法来从gossipManager.getLiveMembers()选择一个liveMember,之后通过sendMembershipList来向它发送membershipList信息;sendToDeadMember首先首先通过父类的selectPartner方法来从gossipManager.getDeadMembers()选择一个liveMember,之后通过sendMembershipList来向它发送membershipList信息
  • AbstractActiveGossiper提供了selectPartner、sendMembershipList方法方法;selectPartner方法在memberList不为空的情况下随机生成randomNeighborIndex选择出一个LocalMember;sendMembershipList方法首先设置me的heartbeat,然后创建UdpActiveGossipMessage,该message的members首先是当前的localMember,然后再添加gossipManager.getMembers(),最后通过gossipCore.send发送给选中的member
  • ActiveGossipMessageHandler用于处理UdpActiveGossipMessage;它首先从activeGossipMessage.getMembers(),转换为RemoteMember,添加到remoteGossipMembers,之后通过gossipCore.sendOneWay给发送方回复UdpActiveGossipOk,最后执行gossipCore.mergeLists(senderMember, remoteGossipMembers)
  • GossipCore的mergeLists方法主要是将接收到的remoteList转换为LocalMember,然后通过的putIfAbsent方法与gossipManager.getMembers()进行合并;合并的同时会更新已有localMember的heartbeat,recordHeartbeat方法会忽略小于等于latestHeartbeatMs的值
  • GossipManager使用ConcurrentSkipListMap维护了LocalMember与GossipState的映射的members,同时该构造器创建了RingStatePersister、UserDataPersister、GossipMemberStateRefresher;init方法调用了transportManager.startEndpoint()及startActiveGossiper方法,同时通过scheduleAtFixedRate注册了RingStatePersister、UserDataPersister这两个定时任务,另外还执行了memberStateRefresher.init()
  • GossipMemberStateRefresher的init方法通过scheduledExecutor.scheduleAtFixedRate注册了GossipMemberStateRefresher的定时任务( 每隔100ms执行 );runOnce方法遍历GossipManager传入的members,然后挨个调用LocalMember的detect方法计算phiMeasure,如果该值不为null则执行calcRequiredState,否则执行calcRequiredStateCleanupInterval来计算requiredState;如果state发生变更则更新然后异步回调GossipListener的gossipEvent方法;calcRequiredState方法判断phiMeasure是否大于convictThreshold( 默认为10 ),大于则返回GossipState.DOWN,否则返回GossipState.UP;calcRequiredStateCleanupInterval方法则判断当前时间是否大于cleanupInterval+member.getHeartbeat(),大于则返回GossipState.DOWN,否则返回原有的state

每次这样全量sendMembershipList在memberList非常多的情况下可能会有效率方面的问题

doc


以上所述就是小编给大家介绍的《聊聊apache gossip的ActiveGossiper》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

来吧!带你玩转 Excel VBA

来吧!带你玩转 Excel VBA

罗刚君、杨嘉恺 / 电子工业出版社 / 2013-7 / 85.00元

本书旨在普及Excel VBA 基础理论,以及通过VBA 的高级应用扩展Excel 的功能,提升读者的制表效率,解决工作中的疑难,同时亦可借此开发商业插件。 本书主要分为操作自动化引言篇、入门篇、进阶篇和疑难解答篇,覆盖从入门到提高的所有内容,以满足不同层次的读者需求。其中操作自动化引言篇简述了操作自动化的需求与方式,借此引出VBA 入门篇。VBA 入门篇包含第2 章到第13 章,主要介绍了......一起来看看 《来吧!带你玩转 Excel VBA》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具