QMQ源码分析之Actor

栏目: 编程工具 · 发布时间: 5年前

内容简介:QMQ有关actor的一篇要了解QMQ的actor模式是如何起作用的,就要先来看看Broker是如何处理消息拉取请求的。能看出在这里起作用的是这个actorSystem。PullMessageWorker继承了ActorSystem.Processor,所以真正处理拉取请求的是这个接口里的process方法。请求到达pullMessageWorker,worker将该次请求交给actorSystem调度,调度到这次请求时,worker还有个根据拉取结果做反应的策略,即如果暂时没有消息,那么suspend,以

QMQ有关actor的一篇 文章 阐述了actor的应用场景。即client消费消息的请求会先进入一个RequestQueue,在client消费消息时,往往存在多个主题、多个消费组共享一个RequestQueue消费消息。在这个Queue中,存在不同主题的有不同消费组数量,以及不同消费组有不同consumer数量,那么就会存在抢占资源的情况。举个 文章 中的例子,一个主题下有两个消费组A和B,A有100个consumer,B有200个consumer,那么在RequestQueue中来自B的请求可能会多于A,这个时候就存在消费unfair的情况,所以需要隔离不同主题不同消费组以保证fair。除此之外,当consumer消费能力不足,造成broker消息堆积,这个时候就会导致consumer所在消费组总在消费"老消息",影响全局整体的一个消费能力。因为"老消息"不会存在page cache中,这个时候很可能就会从磁盘load,那么表现是RequestQueue中来自消费"老消息"消费组的请求处理时间过长,影响到其他主题消费组的消费,因此这个时候也需要做策略来避免不同消费组的相互影响。所以QMQ就有了actor机制,以消除各个消费组之间因消费能力不同、consumer数量不同而造成的相互影响各自的消费能力。

PullMessageWorker

要了解QMQ的actor模式是如何起作用的,就要先来看看Broker是如何处理消息拉取请求的。

class PullMessageWorker implements ActorSystem.Processor<PullMessageProcessor.PullEntry> {
    // 消息存储层
    private final MessageStoreWrapper store;
    // actor
    private final ActorSystem actorSystem;
  	
    private final ConcurrentMap<String, ConcurrentMap<String, Object>> subscribers;

    PullMessageWorker(MessageStoreWrapper store, ActorSystem actorSystem) {
        this.store = store;
        this.actorSystem = actorSystem;
        this.subscribers = new ConcurrentHashMap<>();
    }
    
    void pull(PullMessageProcessor.PullEntry pullEntry) {
    		// subject+group作actor调度粒度
        final String actorPath = ConsumerGroupUtils.buildConsumerGroupKey(pullEntry.subject, pullEntry.group);
        // actor调度
        actorSystem.dispatch(actorPath, pullEntry, this);
    }

    @Override
    public boolean process(PullMessageProcessor.PullEntry entry
                           , ActorSystem.Actor<PullMessageProcessor.PullEntry> self) {
        QMon.pullQueueTime(entry.subject, entry.group, entry.pullBegin);

        //开始处理请求的时候就过期了,那么就直接不处理了,也不返回任何东西给客户端,客户端等待超时
        //因为出现这种情况一般是server端排队严重,暂时挂起客户端可以避免情况恶化
        // deadline机制,如果QMQ认为这个消费请求来不及处理,那么就直接返回,避免雪崩
      	if (entry.expired()) {
            QMon.pullExpiredCountInc(entry.subject, entry.group);
            return true;
        }

        if (entry.isInValid()) {
            QMon.pullInValidCountInc(entry.subject, entry.group);
            return true;
        }

      	// 存储层find消息
        final PullMessageResult pullMessageResult = store.findMessages(entry.pullRequest);

        if (pullMessageResult == PullMessageResult.FILTER_EMPTY ||
                pullMessageResult.getMessageNum() > 0
                || entry.isPullOnce()
                || entry.isTimeout()) {
            entry.processMessageResult(pullMessageResult);
            return true;
        }

      	// 没有拉取到消息,那么挂起该actor
        self.suspend();
      	// timer task,在超时前唤醒actor
        if (entry.setTimerOnDemand()) {
            QMon.suspendRequestCountInc(entry.subject, entry.group);
          	// 订阅消息,一有消息来就唤醒该actor
            subscribe(entry.subject, entry.group);
            return false;
        }

      	// 已经超时,那么即刻唤醒调度
        self.resume();
        entry.processNoMessageResult();
        return true;
    }

  	// 订阅
    private void subscribe(String subject, String group) {
        ConcurrentMap<String, Object> map = subscribers.get(subject);
        if (map == null) {
            map = new ConcurrentHashMap<>();
            map = ObjectUtils.defaultIfNull(subscribers.putIfAbsent(subject, map), map);
        }
        map.putIfAbsent(group, HOLDER);
    }

  	// 有消息来就唤醒订阅的subscriber
    void remindNewMessages(final String subject) {
        final ConcurrentMap<String, Object> map = this.subscribers.get(subject);
        if (map == null) return;

        for (String group : map.keySet()) {
            map.remove(group);
            this.actorSystem.resume(ConsumerGroupUtils.buildConsumerGroupKey(subject, group));
            QMon.resumeActorCountInc(subject, group);
        }
    }
}

// ActorSystem内定义的处理接口
public interface ActorSystem.Processor<T> {
		boolean process(T message, Actor<T> self);
}
复制代码

能看出在这里起作用的是这个actorSystem。PullMessageWorker继承了ActorSystem.Processor,所以真正处理拉取请求的是这个接口里的process方法。请求到达pullMessageWorker,worker将该次请求交给actorSystem调度,调度到这次请求时,worker还有个根据拉取结果做反应的策略,即如果暂时没有消息,那么suspend,以一个timer task定时resume;如果在timer task执行之前有消息进来,那么也会即时resume。

ActorSystem

接下来就看看ActorSystem里边是如何做的 公平调度

public class ActorSystem {
		// 内部维护的是一个ConcurrentMap,key即PullMessageWorker里的subject+group
    private final ConcurrentMap<String, Actor> actors;
    // 执行actor的executor
    private final ThreadPoolExecutor executor;
    
    private final AtomicInteger actorsCount;
    private final String name;

    public ActorSystem(String name) {
        this(name, Runtime.getRuntime().availableProcessors() * 4, true);
    }

    public ActorSystem(String name, int threads, boolean fair) {
        this.name = name;
        this.actorsCount = new AtomicInteger();
        // 这里根据fair参数初始化一个优先级队列作为executor的参数,处理关于前言里说的"老消息"的情况
        BlockingQueue<Runnable> queue = fair ? new PriorityBlockingQueue<>() : new LinkedBlockingQueue<>();
        this.executor = new ThreadPoolExecutor(threads, threads, 60, TimeUnit.MINUTES, queue, new NamedThreadFactory("actor-sys-" + name));
        this.actors = Maps.newConcurrentMap();
        QMon.dispatchersGauge(name, actorsCount::doubleValue);
        QMon.actorSystemQueueGauge(name, () -> (double) executor.getQueue().size());
    }
}
复制代码

可以看到,用一个线程池处理actor的调度执行,这个线程池里的队列是一个优先级队列。优先级队列存储的元素是Actor。关于Actor我们稍后来看,先来看一下ActorSystem的处理调度流程。

// PullMessageWorker调用的就是这个方法
    public <E> void dispatch(String actorPath, E msg, Processor<E> processor) {
      	// 取得actor
        Actor<E> actor = createOrGet(actorPath, processor);
      	// 在后文Actor定义里能看到,actor内部维护一个queue,这里actor仅仅是offer(msg)
        actor.dispatch(msg);
      	// 执行调度
        schedule(actor, true);
    }

		// 无消息时,则会挂起
    public void suspend(String actorPath) {
        Actor actor = actors.get(actorPath);
        if (actor == null) return;

        actor.suspend();
    }

		// 有消息则恢复,可以理解成线程的"就绪状态"
    public void resume(String actorPath) {
        Actor actor = actors.get(actorPath);
        if (actor == null) return;

        actor.resume();
      	// 立即调度,可以留意一下那个false
      	// 当actor是"可调度状态"时,这个actor是否能调度是取决于actor的queue是否有消息
        schedule(actor, false);
    }

    private <E> Actor<E> createOrGet(String actorPath, Processor<E> processor) {
        Actor<E> actor = actors.get(actorPath);
        if (actor != null) return actor;

        Actor<E> add = new Actor<>(this.name, actorPath, this, processor, DEFAULT_QUEUE_SIZE);
        Actor<E> old = actors.putIfAbsent(actorPath, add);
        if (old == null) {
            LOG.info("create actorSystem: {}", actorPath);
            actorsCount.incrementAndGet();
            return add;
        }
        return old;
    }

		// 将actor入队的地方
    private <E> boolean schedule(Actor<E> actor, boolean hasMessageHint) {
      	// 如果actor不能调度,则ret false
        if (!actor.canBeSchedule(hasMessageHint)) return false;
      	// 设置actor为"可调度状态"
        if (actor.setAsScheduled()) {
          	// 提交时间,和actor执行总耗时共同决定在队列里的优先级
            actor.submitTs = System.currentTimeMillis();
          	// 入队,入的是线程池里的优先级队列
            this.executor.execute(actor);
            return true;
        }
      	// actor.setAsScheduled()里,这里是actor已经是可调度状态,那么没必要再次入队
        return false;
    }
复制代码

actorSystem维护一个线程池,线程池队列具有优先级,队列存储元素是actor。actor的粒度是subject+group。Actor是一个Runnable,且因为是优先级队列的存储元素所以需继承Comparable接口(队列并没有传Comparator参数),并且actor有四种状态,初始状态、可调度状态、挂起状态、调度状态(这个状态其实不存在,但是暂且这么叫以帮助理解)。

接下来看看Actor这个类:

public static class Actor<E> implements Runnable, Comparable<Actor> {
      	// 初始状态
        private static final int Open = 0;
      	// 可调度状态
        private static final int Scheduled = 2;
      	// 掩码,二进制表示:11 与Open和Scheduled作&运算 
      	// shouldScheduleMask&currentStatus != Open 则为不可置为调度状态(当currentStatus为挂起状态或调度状态)
        private static final int shouldScheduleMask = 3;
        private static final int shouldNotProcessMask = ~2;
      	// 挂起状态
        private static final int suspendUnit = 4;
        //每个actor至少执行的时间片
        private static final int QUOTA = 5;
      	// status属性内存偏移量,用Unsafe操作
        private static long statusOffset;

        static {
            try {
                statusOffset = Unsafe.instance.objectFieldOffset(Actor.class.getDeclaredField("status"));
            } catch (Throwable t) {
                throw new ExceptionInInitializerError(t);
            }
        }

        final String systemName;
        final ActorSystem actorSystem;
      	// actor内部维护的queue,后文简单分析下
        final BoundedNodeQueue<E> queue;
      	// ActorSystem内部定义接口,PullMessageWorker实现的就是这个接口,用于真正业务逻辑处理的地方
        final Processor<E> processor;
        private final String name;
      	// 一个actor执行总耗时
        private long total;
      	// actor执行提交时间,即actor入队时间
        private volatile long submitTs;
        //通过Unsafe操作
        private volatile int status;

        Actor(String systemName, String name, ActorSystem actorSystem, Processor<E> processor, final int queueSize) {
            this.systemName = systemName;
            this.name = name;
            this.actorSystem = actorSystem;
            this.processor = processor;
            this.queue = new BoundedNodeQueue<>(queueSize);

            QMon.actorQueueGauge(systemName, name, () -> (double) queue.count());
        }

      	// 入队,是actor内部的队列
        boolean dispatch(E message) {
            return queue.add(message);
        }

      	// actor执行的地方
        @Override
        public void run() {
            long start = System.currentTimeMillis();
            String old = Thread.currentThread().getName();
            try {
                Thread.currentThread().setName(systemName + "-" + name);
                if (shouldProcessMessage()) {
                    processMessages();
                }
            } finally {
                long duration = System.currentTimeMillis() - start;
              	// 每次actor执行的耗时累加到total
                total += duration;
                QMon.actorProcessTime(name, duration);

                Thread.currentThread().setName(old);
              	// 设置为"空闲状态",即初始状态 (currentStatus & ~Scheduled)
                setAsIdle();
              	// 进行下一次调度
                this.actorSystem.schedule(this, false);
            }
        }

        void processMessages() {
            long deadline = System.currentTimeMillis() + QUOTA;
            while (true) {
                E message = queue.peek();
                if (message == null) return;
              	// 处理业务逻辑
                boolean process = processor.process(message, this);
              	// 失败,该message不会出队,等待下一次调度
              	// 如pullMessageWorker中没有消息时将actor挂起
                if (!process) return;
		
              	// 出队
                queue.pollNode();
              	// 每个actor只有QUOTA个时间片的执行时间
                if (System.currentTimeMillis() >= deadline) return;
            }
        }

        final boolean shouldProcessMessage() {
          	// 能够真正执行业务逻辑的判断
          	// 一种场景是,针对挂起状态,由于没有拉取到消息该actor置为挂起状态
          	// 自然就没有抢占时间片的必要了
            return (currentStatus() & shouldNotProcessMask) == 0;
        }

      	// 能否调度
        private boolean canBeSchedule(boolean hasMessageHint) {
            int s = currentStatus();
            if (s == Open || s == Scheduled) return hasMessageHint || !queue.isEmpty();
            return false;
        }

        public final boolean resume() {
            while (true) {
                int s = currentStatus();
                int next = s < suspendUnit ? s : s - suspendUnit;
                if (updateStatus(s, next)) return next < suspendUnit;
            }
        }

        public final void suspend() {
            while (true) {
                int s = currentStatus();
                if (updateStatus(s, s + suspendUnit)) return;
            }
        }

        final boolean setAsScheduled() {
            while (true) {
                int s = currentStatus();
              	// currentStatus为非Open状态,则ret false
                if ((s & shouldScheduleMask) != Open) return false;
              	// 更新actor状态为调度状态
                if (updateStatus(s, s | Scheduled)) return true;
            }
        }

        final void setAsIdle() {
            while (true) {
                int s = currentStatus();
              	// 更新actor状态位不可调度状态,(这里可以理解为更新为初始状态Open)
                if (updateStatus(s, s & ~Scheduled)) return;
            }
        }

        final int currentStatus() {
          	// 根据status在内存中的偏移量取得status
            return Unsafe.instance.getIntVolatile(this, statusOffset);
        }

        private boolean updateStatus(int oldStatus, int newStatus) {
          	// Unsafe 原子操作,处理status的轮转变更
            return Unsafe.instance.compareAndSwapInt(this, statusOffset, oldStatus, newStatus);
        }

      	// 决定actor在优先级队列里的优先级的地方
      	// 先看总耗时,以达到动态限速,保证执行"慢"的请求(已经堆积的消息拉取请求)在后执行
      	// 其次看提交时间,先提交的actor先执行
        @Override
        public int compareTo(Actor o) {
            int result = Long.compare(total, o.total);
            return result == 0 ? Long.compare(submitTs, o.submitTs) : result;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            Actor<?> actor = (Actor<?>) o;
            return Objects.equals(systemName, actor.systemName) &&
                    Objects.equals(name, actor.name);
        }

        @Override
        public int hashCode() {
            return Objects.hash(systemName, name);
        }
    }
复制代码

Actor实现了Comparable,在优先级队列里优先级是Actor里的total和submitTs共同决定的。total是actor执行总耗时,submitTs是调度时间。那么对于处理较慢的actor自然就会在队列里相对"尾部"位置,这时就做到了根据actor的执行耗时的一个动态限速。Actor利用Unsafe机制来控制各个状态的轮转原子性更新的,且每个actor执行时间可以简单理解为5个时间片。

其实工作进行到这里就可以结束了,但是抱着研究的态度,不妨接着往下看看。

Actor内部维护一个Queue,这个Queue是自定义的,是一个Lock-free bounded non-blocking multiple-producer single-consumer queue。JDK里的QUEUE多数都是用锁控制,不用锁,猜测也应该是用Unsafe 原子操作实现。那么来看看吧:

private static class BoundedNodeQueue<T> {

				// 头结点、尾节点在内存中的偏移量
        private final static long enqOffset, deqOffset;

        static {
            try {
                enqOffset = Unsafe.instance.objectFieldOffset(BoundedNodeQueue.class.getDeclaredField("_enqDoNotCallMeDirectly"));
                deqOffset = Unsafe.instance.objectFieldOffset(BoundedNodeQueue.class.getDeclaredField("_deqDoNotCallMeDirectly"));
            } catch (Throwable t) {
                throw new ExceptionInInitializerError(t);
            }
        }

        private final int capacity;
        // 尾节点,通过enqOffset操作
        private volatile Node<T> _enqDoNotCallMeDirectly;
        // 头结点,通过deqOffset操作
        private volatile Node<T> _deqDoNotCallMeDirectly;

        protected BoundedNodeQueue(final int capacity) {
            if (capacity < 0) throw new IllegalArgumentException("AbstractBoundedNodeQueue.capacity must be >= 0");
            this.capacity = capacity;
            final Node<T> n = new Node<T>();
            setDeq(n);
            setEnq(n);
        }

				// 获取尾节点
        private Node<T> getEnq() {
        		// getObjectVolatile这种方式保证拿到的都是最新数据
            return (Node<T>) Unsafe.instance.getObjectVolatile(this, enqOffset);
        }

				// 设置尾节点,仅在初始化时用
        private void setEnq(Node<T> n) {
            Unsafe.instance.putObjectVolatile(this, enqOffset, n);
        }

        private boolean casEnq(Node<T> old, Node<T> nju) {
        		// cas,循环设置,直到成功
            return Unsafe.instance.compareAndSwapObject(this, enqOffset, old, nju);
        }

				// 获取头结点
        private Node<T> getDeq() {
            return (Node<T>) Unsafe.instance.getObjectVolatile(this, deqOffset);
        }

				// 仅在初始化时用
        private void setDeq(Node<T> n) {
            Unsafe.instance.putObjectVolatile(this, deqOffset, n);
        }

				// cas设置头结点
        private boolean casDeq(Node<T> old, Node<T> nju) {
            return Unsafe.instance.compareAndSwapObject(this, deqOffset, old, nju);
        }

				// 与其叫count,不如唤作index,但是是否应该考虑溢出的情况?
        public final int count() {
            final Node<T> lastNode = getEnq();
            final int lastNodeCount = lastNode.count;
            return lastNodeCount - getDeq().count;
        }

        /**
         * @return the maximum capacity of this queue
         */
        public final int capacity() {
            return capacity;
        }

        public final boolean add(final T value) {
            for (Node<T> n = null; ; ) {
                final Node<T> lastNode = getEnq();
                final int lastNodeCount = lastNode.count;
                if (lastNodeCount - getDeq().count < capacity) {
                    // Trade a branch for avoiding to create a new node if full,
                    // and to avoid creating multiple nodes on write conflict á la Be Kind to Your GC
                    if (n == null) {
                        n = new Node<T>();
                        n.value = value;
                    }

                    n.count = lastNodeCount + 1; // Piggyback on the HB-edge between getEnq() and casEnq()

                    // Try to putPullLogs the node to the end, if we fail we continue loopin'
                    // 相当于 
                  	// enq -> next = new Node(value); enq = neq -> next;
                    if (casEnq(lastNode, n)) {
                      	// 注意一下这个Node.setNext方法
                        lastNode.setNext(n);
                        return true;
                    }
                } else return false; // Over capacity—couldn't add the node
            }
        }

        public final boolean isEmpty() {
          	// enq == deq 即为empty
            return getEnq() == getDeq();
        }

        /**
         * Removes the first element of this queue if any
         *
         * @return the value of the first element of the queue, null if empty
         */
        public final T poll() {
            final Node<T> n = pollNode();
            return (n != null) ? n.value : null;
        }

        public final T peek() {
            Node<T> n = peekNode();
            return (n != null) ? n.value : null;
        }

        protected final Node<T> peekNode() {
            for (; ; ) {
                final Node<T> deq = getDeq();
                final Node<T> next = deq.next();
                if (next != null || getEnq() == deq)
                    return next;
            }
        }

        /**
         * Removes the first element of this queue if any
         *
         * @return the `Node` of the first element of the queue, null if empty
         */
        public final Node<T> pollNode() {
            for (; ; ) {
                final Node<T> deq = getDeq();
                final Node<T> next = deq.next();
                if (next != null) {
                    if (casDeq(deq, next)) {
                        deq.value = next.value;
                        deq.setNext(null);
                        next.value = null;
                        return deq;
                    } // else we retry (concurrent consumers)
                  	// 比较套路的cas操作,就不多说了
                } else if (getEnq() == deq) return null; // If we got a null and head meets tail, we are empty
            }
        }

        public static class Node<T> {
          
            private final static long nextOffset;

            static {
                try {
                    nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly"));
                } catch (Throwable t) {
                    throw new ExceptionInInitializerError(t);
                }
            }

            protected T value;
            protected int count;
            // 也是利用偏移量操作
            private volatile Node<T> _nextDoNotCallMeDirectly;

            public final Node<T> next() {
                return (Node<T>) Unsafe.instance.getObjectVolatile(this, nextOffset);
            }

            protected final void setNext(final Node<T> newNext) {
              	// 这里有点讲究,下面分析下
                Unsafe.instance.putOrderedObject(this, nextOffset, newNext);
            }
        }
    }
复制代码

如上代码,是通过属性在内存的偏移量,结合cas原子操作来进行更新赋值等操作,以此来实现lock-free,这是比较常规的套路。值得一说的是Node里的setNext方法,这个方法的调用是在cas节点后,对"上一位置"的next节点进行赋值。而这个方法使用的是Unsafe.instance.putOrderedObject,要说这个putOrderedObject,就不得不说MESI,缓存一致性协议。如volatile,当进行写操作时,它是依靠storeload barrier来实现其他线程对此的可见性。而putOrderedObject也是依靠内存屏障,只不过是storestore barrier。storestore是比storeload快速的一种内存屏障。在硬件层面,内存屏障分两种:Load-Barrier和Store-Barrier。Load-Barrier是让高速缓存中的数据失效,强制重新从主内存加载数据;Store-Barrier是让写入高速缓存的数据更新写入主内存,对其他线程可见。而 java 层面的四种内存屏障无非是硬件层面的两种内存屏障的组合而已。那么可见,storestore barrier自然比storeload barrier快速。那么有一个问题,我们可不可以在这里也用cas操作呢?答案是可以,但没必要。你可以想想这里为什么没必要。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Kotlin实战

Kotlin实战

【美】Dmitry Jemerov(德米特里·詹莫瑞福)、【美】 Svetlana Isakova(斯维特拉娜·伊凡诺沃) / 覃宇、罗丽、李思阳、蒋扬海 / 电子工业出版社 / 2017-8 / 89.00

《Kotlin 实战》将从语言的基本特性开始,逐渐覆盖其更多的高级特性,尤其注重讲解如何将 Koltin 集成到已有 Java 工程实践及其背后的原理。本书分为两个部分。第一部分讲解如何开始使用 Kotlin 现有的库和API,包括基本语法、扩展函数和扩展属性、数据类和伴生对象、lambda 表达式,以及数据类型系统(着重讲解了可空性和集合的概念)。第二部分教你如何使用 Kotlin 构建自己的 ......一起来看看 《Kotlin实战》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

SHA 加密
SHA 加密

SHA 加密工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具