grpc源码分析1-context

栏目: 服务器 · 发布时间: 6年前

内容简介:Context本身是Immutable的,但是它保存的状态不一定是。Context中保存数据使用的是KV映射。

io.grpc.Context 表示上下文,用来在一次grpc请求链路中传递用户登录信息、tracing信息等。

Context本身是Immutable的,但是它保存的状态不一定是。

Context中保存数据使用的是KV映射。

Key定义如下, 通过这种方式,而不是使用String可以有多个相同name的不同Key,而name则只用来做debug信息。

public static final class Key<T>{
   private final String name;
   private final T defaultValue;
}

获取Key对应的value即从Context中找到对应的映射,不过Key也提供了get这个简便方法,从当前context获取对应的value

/**
* Get the value from the {@link#current()} context for this key.
*/
    @SuppressWarnings("unchecked")
    public T get() {
      return get(Context.current());
    }

    /**
* Get the value from the specified context for this key.
*/
    @SuppressWarnings("unchecked")
    public T get(Context context) {
      T value = (T) context.lookup(this);
      return value == null ? defaultValue : value;
    }

那么Context中是如果保存这个KV映射的呢, 我们通常会使用HashMap来进行保存。

不过HashMap是一个比较全能的Map实现,有对put、delete、get等操作都有优化,因此内部有很多辅助结构,这就增加了内存消耗。(HashMap实现分析可以参考我的另一篇文章)

在Context的使用场景中,只需要put和get操作。

例如在当前Context基础上,增加一些KV对,是put操作;查询当前某个key对应的value,是get操作。

最开始的版本实现里,Context使用的是二维数组Object[][],第一维是Key,第二维是Value。

这样的缺点是查询的时间复杂度是线性的(其实大多数场景的key的数量都比较少)。

所以在HashMap和数组间的时间复杂度和占用空间的权衡就是 Hash Array Mapped Trie 数据结构了。

在grpc中的实现是 io.grpc.PersistentHashArrayMappedTrie

下面使用 SizeOf 工具简单通过deepSize对比一下 HashMapPersistentHashArrayMappedTrie 的内存占用情况。

private static final Context.Key<String> userName = Context.key("userName");

    private static final Map<Context.Key<?>, Object> cache = new HashMap<>();

    private static PersistentHashArrayMappedTrie<Context.Key<?>, Object> persistentHashArrayMappedTrie =
            new PersistentHashArrayMappedTrie<>();

    public static void main(String[] args) {
        cache.put(userName, "hello");
        persistentHashArrayMappedTrie = persistentHashArrayMappedTrie.put(userName, "hello");
        SizeOf sizeOf = SizeOf.newInstance();
        System.out.println(sizeOf.deepSizeOf(cache));
        System.out.println(sizeOf.deepSizeOf(persistentHashArrayMappedTrie));
    }

可以看到内存占用差别还是比较大的,因为HashMap里面的Node保存了key、hash、value、next等字段,并且默认使用一定大小的capacity数量作为起始capacity来避免反复的resize。

Context常用用法如下。首先获取当前context,这个一般是作为参数传过来的,或通过current()获取当前的已有context。

然后通过attach方法,绑定到当前线程上,并且返回当前线程

Context current = xxx.
Context previous = current.attach();
try {
    // do something in context
} finally {
    current.detach(previous);
}

Context的主要方法如下

  • attach() attach Context自己,从而进入到一个新的scope中,新的scope以此Context实例作为current,并且返回之前的current context
  • detach(Context toDetach) attach()方法的反向方法,退出当前Context并且detach到toDetachContext,每个attach方法要对应一个detach,所以一般通过try finally代码块或wrap模板方法来使用。
  • static storage() 获取storage,Storage是用来attach和detach当前context用的。
public class Context {
  public static final Context ROOT = new Context(null, EMPTY_ENTRIES);
  public static Context current(){
    Context current = storage().current();
    if (current == null) {
      return ROOT;
    }
    return current;
  }
  private Context(PersistentHashArrayMappedTrie<Key<?>, Object> keyValueEntries,int generation){
    cancellableAncestor = null;
    this.keyValueEntries = keyValueEntries;
    this.generation = generation;
    validateGeneration(generation);
  }
  public Context attach(){
    Context prev = storage().doAttach(this);
    if (prev == null) {
      return ROOT;
    }
    return prev;
  }
  public void detach(Context toAttach){
    checkNotNull(toAttach, "toAttach");
    storage().detach(this, toAttach);
  }
  // For testing
  static Storage storage(){
    Storage tmp = storage.get();
    if (tmp == null) {
      tmp = createStorage();
    }
    return tmp;
  }

上面提到的wrap模板方法如下,可以wrap一个Runnable(不需要返回值的使用)或Callable(要返回一个值)

public Runnablewrap(final Runnable r){
  return new Runnable() {
    @Override
    public void run(){
      Context previous = attach();
      try {
        r.run();
      } finally {
        detach(previous);
      }
    }
  };
}
public <C> Callable<C> wrap(final Callable<C> c) {
  return new Callable<C>() {
    @Override
    public Ccall()throws Exception {
      Context previous = attach();
      try {
        return c.call();
      } finally {
        detach(previous);
      }
    }
  };
}

Context的attach、detach方法都调用了Storage对应的方法。

grpc的默认的Storage实现是使用ThreadLocal的 ThreadLocalContextStorage 。ThreadLocal是实现可以参考 ThreadLocal使用和源码分析

final class ThreadLocalContextStorageextends Context.Storage{
  // ThreadLocal保存的是当前的Context
  static final ThreadLocal<Context> localContext = new ThreadLocal<>();

  @Override
  public Context doAttach(Context toAttach) {
    Context current = current();
    localContext.set(toAttach);
    return current;
  }

  @Override
  public void detach(Context toDetach, Context toRestore) {
    if (current() != toDetach) {
      log.log(Level.SEVERE, "Context was not attached when detaching",
          new Throwable().fillInStackTrace());
    }
    if (toRestore != Context.ROOT) {
      localContext.set(toRestore);
    } else {
      // Avoid leaking our ClassLoader via ROOT if this Thread is reused across multiple
      // ClassLoaders, as is common for Servlet Containers. The ThreadLocal is weakly referenced by
      // the Thread, but its current value is strongly referenced and only lazily collected as new
      // ThreadLocals are created.
      //
      // Use set(null) instead of remove() since remove() deletes the entry which is then re-created
      // on the next get() (because of initialValue() handling). set(null) has same performance as
      // set(toRestore).
      detach时清除到toDetach的引用,避免发生引用泄露,虽然ThreadLocal本身是Thread里面weak reference的,但是value确是强引用的,所以要通过null要主动去掉引用
      localContext.set(null);
    }
  }

  // 获得当前Context,通过ThreadLocal获取,如果ThreadLocal中没有值,说明当前Context为root context
  @Override
  public Context current() {
    Context current = localContext.get();
    if (current == null) {
      return Context.ROOT;
    }
    return current;
  }
}

ThreadLocal在线程池场景的问题

熟悉ThreadLocal的朋友知道,ThreadLocal存储在线程的ThreadLocalMap中,如果从一个线程提交一个Runnable或Callable到线程池,那么

在线程池中则获取不到提交任务的线程的ThreadLocal了。那么如何解决这个问题呢。

grpc中同样提供了一些对Executor的委托封装。

public static Executor currentContextExecutor(final Executor e){
    class CurrentContextExecutorimplements Executor{
      @Override
      public void execute(Runnable r){
        e.execute(Context.current().wrap(r));
      }
    }

    return new CurrentContextExecutor();
  }

这里的Executor执行execute时,会先调用Context.current()获取当前Context对象,然后使用Context.wrap(Runnable)方法wrap提交的Runnable。

public Runnablewrap(final Runnable r){
    return new Runnable() {
      @Override
      public void run(){
        Context previous = attach();
        try {
          r.run();
        } finally {
          detach(previous);
        }
      }
    };
  }

再看一下wrap方法,由于它是Context类的方法,所以new Runnable()这个匿名内部类的attach()调用实际是Context.this.attach()所以这个方法

对应到上面是Context.current这个提交任务的当前Context来执行attach(),这样就完成了Context从提交线程到实际执行线程的传递。

题外话,通用的ThreadLocal解决方案可以参考 transimitable-thread-local

可以解决一些tracing框架的例如上下文tracing信息丢失问题。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Software Engineering for Internet Applications

Software Engineering for Internet Applications

Eve Andersson、Philip Greenspun、Andrew Grumet / The MIT Press / 2006-03-06 / USD 35.00

After completing this self-contained course on server-based Internet applications software, students who start with only the knowledge of how to write and debug a computer program will have learned ho......一起来看看 《Software Engineering for Internet Applications》 这本书的介绍吧!

html转js在线工具
html转js在线工具

html转js在线工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具