Hystrix 跨线程共享变量 原 荐

栏目: 后端 · 发布时间: 5年前

内容简介:今天遇到一个问题,在使用 zuul 的过程中我想要在 Hystrix 执行的线程中获取到 com.netflix.zuul.context.RequestContext 中的数据。不过 RequestContext 是基于 ThreadLocal 的,所以在 Hystrix 执行的线程中调用 com.netflix.zuul.context.RequestContext#getCurrentContext 函数获取到的只能是一个空的没有任何数据的 RequestContext 对象。因为 Hystrix

1.在 Servlet 容器线程与 Hystrix 线程中共享变量的问题

今天遇到一个问题,在使用 zuul 的过程中我想要在 Hystrix 执行的线程中获取到 com.netflix.zuul.context.RequestContext 中的数据。不过 RequestContext 是基于 ThreadLocal 的,所以在 Hystrix 执行的线程中调用 com.netflix.zuul.context.RequestContext#getCurrentContext 函数获取到的只能是一个空的没有任何数据的 RequestContext 对象。因为 Hystrix 默认是在另一个线程中执行的 , 而 zuul filter 则是在 servlet 容器的线程中执行的,基于 ThreadLocal 的 RequestContext 自然无法起作用了。

2.在 Servlet 容器线程与 Hystrix 线程中共享变量的实现方式

2.1 关键类介绍

Hystrix 的设计者们早就考虑到了这个问题,并且提供了解决方案。关键类是 :

com.netflix.hystrix.strategy.concurrency.HystrixRequestContext ,com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableDefault 。接下来分别说明他们是如何工作的。

HystrixRequestContext 类内部有一个 ThreadLocal 和一个 ConcurrentHashMap ,ThreadLocal 的目的是为了要在不同线程中保存 HystrixRequestContext 对象,这也就意味着保存了 HystrixRequestContext 中的 ConcurrentHashMap 。 源码:

/*
     * ThreadLocal on each thread will hold the HystrixRequestVariableState.
     * 
     * Shutdown will clear the state inside HystrixRequestContext but not nullify the ThreadLocal on all
     * child threads as these threads will not be known by the parent when cleanupAfterRequest() is called.
     * 
     * However, the only thing held by those child threads until they are re-used and re-initialized is an empty
     * HystrixRequestContext object with the ConcurrentHashMap within it nulled out since once it is nullified
     * from the parent thread it is shared across all child threads.
     */
    private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<HystrixRequestContext>();


/*
     * This ConcurrentHashMap should not be made publicly accessible. It is the state of RequestVariables for a given RequestContext.
     * 
     * Only HystrixRequestVariable has a reason to be accessing this field.
     */
    /* package */ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>>();

com.netflix.hystrix.strategy.concurrency.HystrixRequestContext#setContextOnCurrentThread 函数源码 :把参数指定的一个 HystrixRequestContext 对象实列保存到当前线程的 ThreadLocal 中,看到这个函数我们就应该想到他可能是用来在切换线程之前用来做数据的拷贝用的。

public static void setContextOnCurrentThread(HystrixRequestContext state) {
        requestVariables.set(state);
    }

ConcurrentHashMap key 的类型是 HystrixRequestVariableDefault : 看到它的 set(T value) 函数就是将数据放入到 HystrixRequestContext 对象的 ConcurrentHashMap 中。而 HystrixRequestVariableDefault 对象自己做为这个 ConcurrentHashMap 的 key。

public void set(T value) {
        HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value));
    }

2.2 线程间 HystrixRequestContext 的拷贝

com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable 源码 :

public class HystrixContextRunnable implements Runnable {

    private final Callable<Void> actual;
    private final HystrixRequestContext parentThreadState;

    public HystrixContextRunnable(Runnable actual) {
        this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
    }
    
    public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
        this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual);
    }

    public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy, final HystrixRequestContext hystrixRequestContext, final Runnable actual) {
        this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {

            @Override
            public Void call() throws Exception {
                actual.run();
                return null;
            }

        });
        this.parentThreadState = hystrixRequestContext;
    }

    @Override
    public void run() {
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try {
            // set the state of this thread to that of its parent
            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
            // execute actual Callable with the state of the parent
            try {
                actual.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            // restore this thread back to its original state
            HystrixRequestContext.setContextOnCurrentThread(existingState);
        }
    }

}

com.netflix.hystrix.strategy.concurrency.HystrixContextCallable 源码:

public class HystrixContextCallable<K> implements Callable<K> {

    private final Callable<K> actual;
    private final HystrixRequestContext parentThreadState;

    public HystrixContextCallable(Callable<K> actual) {
        this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
    }

    public HystrixContextCallable(HystrixConcurrencyStrategy concurrencyStrategy, Callable<K> actual) {
        this.actual = concurrencyStrategy.wrapCallable(actual);
        this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
    }

    @Override
    public K call() throws Exception {
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try {
            // set the state of this thread to that of its parent
            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
            // execute actual Callable with the state of the parent
            return actual.call();
        } finally {
            // restore this thread back to its original state
            HystrixRequestContext.setContextOnCurrentThread(existingState);
        }
    }

}

run 方法 , call 方法进入后就已经开启了新的线程, 在这个新的线程中将原来线程中的 HystrixRequestContext 对象通过 HystrixRequestContext.setContextOnCurrentThread(parentThreadState); 函数设置到了当前的线程中 , parentThreadState 的值是 HystrixContextRunnable , HystrixContextCallable  初始化的时候在原来线程中获取到的。

3. 完整使用示例

目的:我想要在 hystrix 线程中使用到 RequestContext 中保存的数据。所以我的思路是定义一个 ZuulFilter 在这个 ZuulFilter 中将 RequestContext 保存到 HystrixRequestContext 中。不要忘记调用 com.netflix.hystrix.strategy.concurrency.HystrixRequestContext#shutdown 函数来清理已经使用过的数据,否则的话会有内存溢出的风险,就和使用 ThreadLocal 一样在使用结束后要记得 remove 。

package org.hepeng.commons.spring.cloud.netflix.zuul.filter;

import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import com.netflix.zuul.exception.ZuulException;
import org.hepeng.commons.spring.cloud.netflix.zuul.RequestContextHelper;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.cloud.netflix.zuul.filters.support.FilterConstants;
import org.springframework.context.ApplicationContext;


/**
 * @author he peng
 */
public class HystrixRequestContextFilter extends ZuulFilter {

    private ApplicationContext context;

    public HystrixRequestContextFilter(ApplicationContext context) {
        this.context = context;
        registerHystrixRequestContextPostFilter();
    }

    private void registerHystrixRequestContextPostFilter() {
        BeanDefinition beanDefinition =
                new RootBeanDefinition(HystrixRequestContextPostFilter.class);
        beanDefinition.setScope("singleton");
        BeanDefinitionHolder beanDefinitionHolder =
                new BeanDefinitionHolder(beanDefinition , "hystrixRequestContextPostFilter" );
        BeanDefinitionReaderUtils.registerBeanDefinition(beanDefinitionHolder , (BeanDefinitionRegistry) this.context);
    }

    @Override
    public String filterType() {
        return FilterConstants.PRE_TYPE;
    }

    @Override
    public int filterOrder() {
        return FilterConstants.PRE_DECORATION_FILTER_ORDER + 1;
    }

    @Override
    public boolean shouldFilter() {
        return true;
    }

    @Override
    public Object run() throws ZuulException {
        HystrixRequestContext.initializeContext();
        RequestContextHelper.set(RequestContext.getCurrentContext());
        return null;
    }

    private static class HystrixRequestContextPostFilter extends ZuulFilter {
        @Override
        public String filterType() {
            return FilterConstants.POST_TYPE;
        }

        @Override
        public int filterOrder() {
            return -100;
        }

        @Override
        public boolean shouldFilter() {
            return true;
        }

        @Override
        public Object run() throws ZuulException {
            HystrixRequestContext context = HystrixRequestContext.getContextForCurrentThread();
            if (HystrixRequestContext.isCurrentThreadInitialized()) {
                context.shutdown();
            }
            return null;
        }
    }
}

对 HystrixRequestVariableDefault 的使用 ,其实就和使用 ThreadLocal 一样,如果你了解 ThreadLocal 的话,就会知道 ThreadLocal 也是做为一个 key 保存在 ThreadLocalMap 中的。

package org.hepeng.commons.spring.cloud.netflix.zuul;

import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableDefault;
import com.netflix.zuul.context.RequestContext;

import java.util.Objects;

import static org.springframework.cloud.netflix.zuul.filters.support.FilterConstants.*;

/**
 * @author he peng
 */
public class RequestContextHelper {

    private final static HystrixRequestVariableDefault<RequestContext> HYSTRIX_REQUEST_VARIABLE = new HystrixRequestVariableDefault();

    public static void set(RequestContext context) {
        HYSTRIX_REQUEST_VARIABLE.set(context);
    }

    public static RequestContext remove() {
        RequestContext context = HYSTRIX_REQUEST_VARIABLE.get();
        HYSTRIX_REQUEST_VARIABLE.remove();
        return context;
    }

    public static RequestContext get() {
        RequestContext context = RequestContext.getCurrentContext();
        if (context.isEmpty()) {
            RequestContext context1 = HYSTRIX_REQUEST_VARIABLE.get();
            if (Objects.nonNull(context1) && ! context1.isEmpty()) {
                context = context1;
            }
        }
        return context;
    }

    public static Object getServiceId() {
        RequestContext ctx = get();
        Object serviceId = ctx.get(SERVICE_ID_KEY);
        if (Objects.isNull(serviceId)) {
            serviceId = ctx.get(PROXY_KEY);
        }
        return serviceId;
    }

    public static Object getLoadBalancer() {
        return get().get(LOAD_BALANCER_KEY);
    }

    public static Object getRoutePath() {
        return get().get(REQUEST_URI_KEY);
    }
}

基于上面我的代码我在 Hystrix 的线程中如果想要获取到之前线程中的 RequestContext 的时候只需要调用 RequestContextHelper.get() 函数就可以获取到了。

完整代码: https://gitee.com/kernelHP/hp-java-commons

如果你想直接使用上述功能不想再自己编写这些代码可以直接使用我已经写好的代码 :

gradle : implementation 'org.hepeng:hp-java-commons:1.1.5'

maven :

<dependency>

<groupId>org.hepeng</groupId>

<artifactId>hp-java-commons</artifactId>

<version>1.1.5</version>

</dependency>

使用示例:只需要将 HystrixRequestContextFilter 通过 spring 容器初始化即可。使用的时候只需要使用 RequestContextHelper 即可获取到 RequestContext 。

@Configuration
public class Config {


    @Bean
    public ZuulFilter hystrixRequestContextFilter(ApplicationContext context) {
        return new HystrixRequestContextFilter(context);
    }

}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

U一点·料

U一点·料

阿里巴巴集团 1688用户体验设计部 / 机械工业出版社 / 2015-7-13 / 79.00元

《U一点·料——阿里巴巴1688UED体验设计践行之路》是1688UED团队历经多年实践之后的心血之作,书中以“道─术─器”的思路为编排脉络,从设计观、思考体系、方法论上层层剖析,将零散的行业knowhow串成体系,对“UED如何发展突破”提出了自己的真知灼见。该书重实战、讲方法、求专业、论文化,是一部走心的诚意之作。 本书作者从美工到用户体验设计师,从感性随意到理性思考,从简单的PS做图到......一起来看看 《U一点·料》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

MD5 加密
MD5 加密

MD5 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器