内容简介:欢迎关注公众号:【如果有需要后台回复
欢迎关注公众号:【 爱编码 】
如果有需要后台回复 2019 赠送 1T的学习资料 哦!!
简介
所有的网路通信都涉及字节序列的移动,所以高效易用的数据结构明显是必不可少的。Netty的ByteBuf实现满足并超越了这些需求。
ByteBuf结构
ByteBuf维护了两个不同的索引: 一个是用于读取,一个用于写入 。当你从ByteBuf读取是,它的 readerIndex
将会被递增已经被读取的字节数。同样地,当你写入ByteBuf时,它的 witerIndex
也会被递增。
作为一个容器,源码中的如下。有三块区域
discardable bytes:无效空间(已经读取过的空间),可丢弃字节的区域,由 readerIndex指针 控制
readable bytes:内容空间,可读字节的区域, 由readerIndex和writerIndex指针控制控制
writable bytes:空闲空间,可写入字节的区域, 由writerIndex指针和capacity容量控制
* <pre> * +-------------------+------------------+------------------+ * | discardable bytes | readable bytes | writable bytes | * | | (CONTENT) | | * +-------------------+------------------+------------------+ * | | | | * 0 <= readerIndex <= writerIndex <= capacity * </pre>
ByteBuf使用模式
总体分类划分是可根据JVM堆内存来区分的。
1.堆内内存(JVM堆空间内)
2.堆外内存(本机直接内存)
3.复合缓冲区(以上2种缓冲区多个混合)
1.堆内内存
最常用的ByteBuf模式是将 数据存储在JVM的堆空间中 。它能在没有使用池化的情况下提供快速的分配和释放。
2.堆外内存
JDK允许JVM实现通过本地调用来分配内存。主要是为了避免每次调用本地I/O操作之前(或者之后)将缓冲区的内容复制到一个中间缓冲区(或者从中间缓冲区把内容复制到缓冲区)。
**最大的特点:它的内容将驻留在常规的会被垃圾回收的堆之外。
最大的缺点:相对于堆缓冲区,它的分配和释放都是较为昂贵的。**
3.复合缓冲区
常用类:CompositeByteBuf,它为多个ByteBuf提供一个聚合视图, 将多个缓冲区表示为单个合并缓冲区的虚拟表示。
比如:HTTP协议:头部和主体这两部分由应用程序的不同模块产生。这个时候把这两部分合并的话,选择CompositeByteBuf是比较好的。
ByteBuf分类
主要分为三大类
Pooled和Unpooled (池化)
unsafe和非unsafe ()
Heap和Direct (堆内和堆外)
Pooled和Unpooled
Pooled:每次都从预先分配好的内存中去取出一段连续内存封装成一个ByteBuf给应用程序使用
Unpooled:每次分配内存的时候,直接调用系统api,向操作系统申请一
块内存
Heap和Direct:
Head:是调用jvm的堆内存进行分配的,需要被gc进行管理
Direct:是调用jdk的api进行内存分配,不受jvm控制,不会参与到gc的过程
Unsafe和非Unsafe
jdk中有Unsafe对象可以直接拿到对象的内存地址,并且基于这个内存地址进行读写操作。那么对应的分类的区别就是是否可以拿到jdk底层的Unsafe进行读写操作了。
内存分配ByteBufAllocator
这个接口实现负责分配缓冲区并且是线程安全的。从下面的接口方法以及注释可以总结出主要是围绕上面的三种ByteBuf内存模式:堆内,堆外以及复合型的内存分配。
/** * Implementations are responsible to allocate buffers. Implementations of this interface are expected to be * thread-safe. */ public interface ByteBufAllocator { ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR; /** * Allocate a {@link ByteBuf}. If it is a direct or heap buffer * depends on the actual implementation. */ ByteBuf buffer(); /** * Allocate a {@link ByteBuf} with the given initial capacity. * If it is a direct or heap buffer depends on the actual implementation. */ ByteBuf buffer(int initialCapacity); /** * Allocate a {@link ByteBuf} with the given initial capacity and the given * maximal capacity. If it is a direct or heap buffer depends on the actual * implementation. */ ByteBuf buffer(int initialCapacity, int maxCapacity); /** * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O. */ ByteBuf ioBuffer(); /** * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O. */ ByteBuf ioBuffer(int initialCapacity); /** * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O. */ ByteBuf ioBuffer(int initialCapacity, int maxCapacity); /** * Allocate a heap {@link ByteBuf}. */ ByteBuf heapBuffer(); /** * Allocate a heap {@link ByteBuf} with the given initial capacity. */ ByteBuf heapBuffer(int initialCapacity); /** * Allocate a heap {@link ByteBuf} with the given initial capacity and the given * maximal capacity. */ ByteBuf heapBuffer(int initialCapacity, int maxCapacity); /** * Allocate a direct {@link ByteBuf}. */ ByteBuf directBuffer(); /** * Allocate a direct {@link ByteBuf} with the given initial capacity. */ ByteBuf directBuffer(int initialCapacity); /** * Allocate a direct {@link ByteBuf} with the given initial capacity and the given * maximal capacity. */ ByteBuf directBuffer(int initialCapacity, int maxCapacity); /** * Allocate a {@link CompositeByteBuf}. * If it is a direct or heap buffer depends on the actual implementation. */ CompositeByteBuf compositeBuffer(); /** * Allocate a {@link CompositeByteBuf} with the given maximum number of components that can be stored in it. * If it is a direct or heap buffer depends on the actual implementation. */ CompositeByteBuf compositeBuffer(int maxNumComponents); /** * Allocate a heap {@link CompositeByteBuf}. */ CompositeByteBuf compositeHeapBuffer(); /** * Allocate a heap {@link CompositeByteBuf} with the given maximum number of components that can be stored in it. */ CompositeByteBuf compositeHeapBuffer(int maxNumComponents); /** * Allocate a direct {@link CompositeByteBuf}. */ CompositeByteBuf compositeDirectBuffer(); /** * Allocate a direct {@link CompositeByteBuf} with the given maximum number of components that can be stored in it. */ CompositeByteBuf compositeDirectBuffer(int maxNumComponents); /** * Returns {@code true} if direct {@link ByteBuf}'s are pooled */ boolean isDirectBufferPooled(); /** * Calculate the new capacity of a {@link ByteBuf} that is used when a {@link ByteBuf} needs to expand by the * {@code minNewCapacity} with {@code maxCapacity} as upper-bound. */ int calculateNewCapacity(int minNewCapacity, int maxCapacity); }
其中ByteBufAllocator 的具体实现可以查看其子类,如下图
下面来看看各自子类的功能以及区别
UnpooledByteBufAllocator
heap内存分配
入口 new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity)
:
发现分配 Unpooled、Unsafe、Heap
内存,其实是分配了一个byte数组,并保存在 UnpooledHeapByteBuf#array
成员变量中。该内存的初始值容量和最大可扩展容量可以指定。
public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); checkNotNull(alloc, "alloc"); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setArray(allocateArray(initialCapacity)); setIndex(0, 0); } protected byte[] allocateArray(int initialCapacity) { return new byte[initialCapacity]; }
查看 UnpooledHeapByteBuf#getByte()
方法,堆内存类型的ByteBuf获取的时候。直接通过下标获取byte数组中的byte
@Override public byte getByte(int index) { ensureAccessible(); return _getByte(index); } @Override protected byte _getByte(int index) { //该array为初始化的时候,实例化的byte[] return HeapByteBufUtil.getByte(array, index); } static byte getByte(byte[] memory, int index) { //直接拿到一个数组 return memory[index]; }
direct内存分配
入口 UnpooledByteBufAllocator#newDirectBuffer() --> UnpooledUnsafeDirectByteBuf
public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } checkPositiveOrZero(initialCapacity, "initialCapacity"); checkPositiveOrZero(maxCapacity, "maxCapacity"); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setByteBuffer(allocateDirect(initialCapacity), false); } protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); } final void setByteBuffer(ByteBuffer buffer, boolean tryFree) { if (tryFree) { ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { if (doNotFree) { doNotFree = false; } else { freeDirect(oldBuffer); } } } this.buffer = buffer; memoryAddress = PlatformDependent.directBufferAddress(buffer); tmpNioBuf = null; capacity = buffer.remaining(); }
可以发现,Unpooled、Direct类型得内存分配实际上是维护了一个底层jdk的一个DirectByteBuffer。分配内存的时候就创建它,并将他保存到buffer成员变量。
跟踪 iUnpooledHeapByteBuf#_getByte()
,就比较简单了,直接使用jdk的api获取
@Override protected byte _getByte(int index) { //使用buffer return buffer.get(index); }
更加详细的分析可以查看下面这篇文章
https://www.jianshu.com/p/158...PooledByteBufAllocator
入口 PooledByteBufAllocator#newHeapBuffer()
和 PooledByteBufAllocator#newDirectBuffer()
,堆内内存和堆外内存分配的模式都比较固定。
1.拿到线程局部缓存PoolThreadCache
2.拿到不同类型的rena
3.使用不同类型的arena进行内存分配
@Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { //拿到线程局部缓存 PoolThreadCache cache = threadCache.get(); //拿到heapArena PoolArena<byte[]> heapArena = cache.heapArena; final ByteBuf buf; if (heapArena != null) { //使用heapArena分配内存 buf = heapArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); } @Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { //拿到线程局部缓存 PoolThreadCache cache = threadCache.get(); //拿到directArena PoolArena<ByteBuffer> directArena = cache.directArena; final ByteBuf buf; if (directArena != null) { //使用directArena分配内存 buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
跟踪 threadCache.get()
调用的是 FastThreadLocal#get()
方法。那么其实threadCache也是一个FastThreadLocal,可以看成是jdk的ThreadLocal,get方法调用了初始化方法 initializel
public final V get() { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); Object v = threadLocalMap.indexedVariable(index); if (v != InternalThreadLocalMap.UNSET) { return (V) v; } //调用初始化方法 V value = initialize(threadLocalMap); registerCleaner(threadLocalMap); return value; }
initialValue()
方法的逻辑如下
1.从预先准备好的 heapArenas
和 directArenas
中获取最少使用的 arena
2.使用获取到的 arean
为参数,实例化一个 PoolThreadCache
并返回
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { private final boolean useCacheForAllThreads; PoolThreadLocalCache(boolean useCacheForAllThreads) { this.useCacheForAllThreads = useCacheForAllThreads; } @Override protected synchronized PoolThreadCache initialValue() { /** * arena翻译成竞技场,关于内存非配的逻辑都在这个竞技场中进行分配 */ //获取heapArena:从heapArenas堆内竞技场中拿出使用最少的一个arena final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); //获取directArena:从directArena堆内竞技场中拿出使用最少的一个arena final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); Thread current = Thread.currentThread(); if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { //创建PoolThreadCache:该Cache最终被一个线程使用 //通过heapArena和directArena维护两大块内存:堆和堆外内存 //通过tinyCacheSize,smallCacheSize,normalCacheSize维护ByteBuf缓存列表维护反复使用的内存块 return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } // No caching so just use 0 as sizes. return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0); } //省略代码...... }
查看 PoolThreadCache
其维护了两种类型的内存分配策略,一种是上述通过持有 heapArena
和 directArena
,另一种是通过维护 tiny,small,normal
对应的缓存列表来维护反复使用的内存。
final class PoolThreadCache { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class); //通过arena的方式维护内存 final PoolArena<byte[]> heapArena; final PoolArena<ByteBuffer> directArena; //维护了tiny, small, normal三种类型的缓存列表 // Hold the caches for the different size classes, which are tiny, small and normal. private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches; private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches; private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches; private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches; private final MemoryRegionCache<byte[]>[] normalHeapCaches; private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches; // Used for bitshifting when calculate the index of normal caches later private final int numShiftsNormalDirect; private final int numShiftsNormalHeap; private final int freeSweepAllocationThreshold; private final AtomicBoolean freed = new AtomicBoolean(); private int allocations; // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity"); this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; //通过持有heapArena和directArena,arena的方式管理内存分配 this.heapArena = heapArena; this.directArena = directArena; //通过tinyCacheSize,smallCacheSize,normalCacheSize创建不同类型的缓存列表并保存到成员变量 if (directArena != null) { tinySubPageDirectCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); directArena.numThreadCaches.getAndIncrement(); } else { // No directArea is configured so just null out all caches tinySubPageDirectCaches = null; smallSubPageDirectCaches = null; normalDirectCaches = null; numShiftsNormalDirect = -1; } if (heapArena != null) { // Create the caches for the heap allocations //创建规格化缓存队列 tinySubPageHeapCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); //创建规格化缓存队列 smallSubPageHeapCaches = createSubPageCaches( smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalHeap = log2(heapArena.pageSize); //创建规格化缓存队列 normalHeapCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, heapArena); heapArena.numThreadCaches.getAndIncrement(); } else { // No heapArea is configured so just null out all caches tinySubPageHeapCaches = null; smallSubPageHeapCaches = null; normalHeapCaches = null; numShiftsNormalHeap = -1; } // Only check if there are caches in use. if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null) && freeSweepAllocationThreshold < 1) { throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)"); } } private static <T> MemoryRegionCache<T>[] createSubPageCaches( int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0 && numCaches > 0) { //MemoryRegionCache 维护缓存的一个对象 @SuppressWarnings("unchecked") MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches]; for (int i = 0; i < cache.length; i++) { // TODO: maybe use cacheSize / cache.length //每一种MemoryRegionCache(tiny,small,normal)都表示不同内存大小(不同规格)的一个队列 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); } return cache; } else { return null; } } private static <T> MemoryRegionCache<T>[] createNormalCaches( int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) { if (cacheSize > 0 && maxCachedBufferCapacity > 0) { int max = Math.min(area.chunkSize, maxCachedBufferCapacity); int arraySize = Math.max(1, log2(max / area.pageSize) + 1); //MemoryRegionCache 维护缓存的一个对象 @SuppressWarnings("unchecked") MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize]; for (int i = 0; i < cache.length; i++) { //每一种MemoryRegionCache(tiny,small,normal)都表示不同内存(不同规格)大小的一个队列 cache[i] = new NormalMemoryRegionCache<T>(cacheSize); } return cache; } else { return null; } } ...... }
更加详细分析可参考以下文章
https://www.jianshu.com/p/1cd...directArena分配direct内存的流程
上一步拿到PoolThreadCache之后,获取对应的 Arena
。那么之后就是Arena具体分配内存的步骤。
入口 PooledByteBufAllocator#newDirectBuffer()
方法种有如下代码
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { //拿到PooledByteBuf对象,仅仅是一个对象 PooledByteBuf<T> buf = newByteBuf(maxCapacity); //从cache种分配内存,并初始化buf种内存地址相关的属性 allocate(cache, buf, reqCapacity); return buf; }
可以看到分配的过程如下:拿到PooledByteBuf对象从cache中分配内存,并重置相关属性.
1. newByteBuf(maxCapacity);
拿到PooledByteBuf对象
@Override protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) { if (HAS_UNSAFE) { //获取一个PooledByteBuf return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); } else { return PooledDirectByteBuf.newInstance(maxCapacity); } } static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) { //从带有回收特性的对象池RECYCLER获取一个PooledUnsafeDirectByteBuf PooledUnsafeDirectByteBuf buf = RECYCLER.get(); //buf可能是从回收站拿出来的,要进行复用 buf.reuse(maxCapacity); return buf; }
2.Recycler是一个基于线程本地堆栈的对象池。Recycler维护了一个ThreadLocal成员变量,用于返回一个stack给回收处理 DefaultHandle
,该处理器通过维护这个堆栈来维护 PooledUnsafeDirectByteBuf
缓存。
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() { @Override protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) { //Recycler负责用回收处理器handler维护PooledUnsafeDirectByteBuf //handler底层持有一个stack作为对象池,维护对象池,handle同时负责对象回收 //存储handler为成员变量,使用完该ByteBuf可以调用回收方法回收 return new PooledUnsafeDirectByteBuf(handle, 0); } };
//维护了一个`ThreadLocal`,`initialValue`方法返回一个堆栈。 private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue() { return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, ratioMask, maxDelayedQueuesPerThread); } @Override protected void onRemoval(Stack<T> value) { // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead if (value.threadRef.get() == Thread.currentThread()) { if (DELAYED_RECYCLED.isSet()) { DELAYED_RECYCLED.get().remove(value); } } } };
3.再看Recycler#get()方法
public final T get() { if (maxCapacityPerThread == 0) { return newObject((Handle<T>) NOOP_HANDLE); } //获取对应的堆栈,相当一个回收站 Stack<T> stack = threadLocal.get(); //从栈顶拿出一个来DefaultHandle(回收处理器) //DefaultHandle持有一个value,其实是PooledUnsafeDirectByteBuf DefaultHandle<T> handle = stack.pop(); //没有回收处理器,说明没有闲置的ByteBuf if (handle == null) { //新增一个处理器 handle = stack.newHandle(); //回调,还记得么?该回调返回一个PooledUnsafeDirectByteBuf //让处理器持有一个新的PooledUnsafeDirectByteBuf handle.value = newObject(handle); } //如果有,则可直接重复使用 return (T) handle.value; } public final V get() { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); Object v = threadLocalMap.indexedVariable(index); if (v != InternalThreadLocalMap.UNSET) { return (V) v; } //回调initialize V value = initialize(threadLocalMap); registerCleaner(threadLocalMap); return value; } private V initialize(InternalThreadLocalMap threadLocalMap) { V v = null; try { //回调 v = initialValue(); } catch (Exception e) { PlatformDependent.throwException(e); } threadLocalMap.setIndexedVariable(index, v); addToVariablesToRemove(threadLocalMap, this); return v; } DefaultHandle<T> newHandle() { //实例化一个处理器并并且初四话成员变量,该成员变量stack从threalocal中初始化 return new DefaultHandle<T>(this); }
DefaultHandle用 stack
作为缓存池维护 PooledUnsafeDirectByteBuf
,同理 PooledDirectByteBuf
也是一样的。只不过实例化的对象的实现不一样而已。同时,处理器定义了回收的方法是将兑现存回栈内,使用的时候则是从栈顶取出。
static final class DefaultHandle<T> implements Handle<T> { private int lastRecycledId; private int recycleId; boolean hasBeenRecycled; //对象缓存池 private Stack<?> stack; private Object value; DefaultHandle(Stack<?> stack) { this.stack = stack; } /** * 定义回收方法,回收对象到stack * @param object */ @Override public void recycle(Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } Stack<?> stack = this.stack; if (lastRecycledId != recycleId || stack == null) { throw new IllegalStateException("recycled already"); } //回收:将自己存进栈中缓存起来 stack.push(this); } }
到这我们刚刚看完第一步,到第二步重置缓存内指针的时候了 ,获取到PooledUnsafeDirectByteBuf的时候,有可能是从缓存中取出来的。因此需要复用.
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) { //从带有回收特性的对象池RECYCLER获取一个PooledUnsafeDirectByteBuf PooledUnsafeDirectByteBuf buf = RECYCLER.get(); //buf可能是从回收站拿出来的,要进行复用 buf.reuse(maxCapacity); return buf; } final void reuse(int maxCapacity) { //重置最大容量 maxCapacity(maxCapacity); //设置引用 setRefCnt(1); //重置指针 setIndex0(0, 0); //重置标记值 discardMarks(); }
到这才刚刚完成分配内存的第一步(拿到PooledByteBuf对象),以上都是仅仅是获取并且用回收站和回收处理器管理这些对象,这些对象仍然只是一个对象,还没有分配实际的内存。
跟踪 PoolArena#allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity)
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { final int normCapacity = normalizeCapacity(reqCapacity); //不同的规格大小进行内存分配 /** * 分配整体逻辑(先判断tiny和small规格的,再判断normal规格的) * 1. 尝试从缓存上进行内存分配,成功则返回 * 2. 失败则再从内存堆中进行分配内存 */ if (isTinyOrSmall(normCapacity)) { // capacity < pageSize int tableIdx; PoolSubpage<T>[] table; boolean tiny = isTiny(normCapacity); //尝试tiny和small规格的缓存内存分配 if (tiny) { // < 512 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = tinyIdx(normCapacity); table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } final PoolSubpage<T> head = table[tableIdx]; /** * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and * {@link PoolChunk#free(long)} may modify the doubly linked list as well. */ synchronized (head) { final PoolSubpage<T> s = head.next; if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity); incTinySmallAllocation(tiny); return; } } //tiny和small规格的缓存内存分配尝试失败 //从内存堆中分配内存 synchronized (this) { allocateNormal(buf, reqCapacity, normCapacity); } incTinySmallAllocation(tiny); return; } //normal规格 //如果分配处出来的内存大于一个值(chunkSize),则执行allocateHuge if (normCapacity <= chunkSize) { //从缓存上进行内存分配 if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } //缓存没有再从内存堆中分配内存 synchronized (this) { allocateNormal(buf, reqCapacity, normCapacity); ++allocationsNormal; } } else { // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, reqCapacity); } }
其整体分配内存的逻辑是根据不同规格大小的内存需要来的,显示 tiny
和 small
规格的,再是 normal
规格的。分配也是先尝试从缓存中进行内存分配,如果分配失败再从内存堆中进行内存分配。 当然,分配出来的内存回和第一步拿到的PooledByteBuf进行绑定起来。
总结
主要学习了ByteBuf 的 基本结构、使用模式、分类、基本的内存分配 。
下次再学习ByteBuf 的 命中逻辑以及内存回收 。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
架构真经
马丁L. 阿伯特(Martin L. Abbott)、迈克尔T.费舍尔(Michael T. Fisher) / 机械工业出版社 / 2017-4 / 79
前言 感谢你对本书第2版感兴趣!作为一本入门、进修和轻量级的参考手册,本书旨在帮助工程师、架构师和管理者研发及维护可扩展的互联网产品。本书给出了一系列规则,每个规则围绕着不同的主题展开讨论。大部分的规则聚焦在技术上,少数规则涉及一些关键的思维或流程问题,每个规则对构建可扩展的产品都是至关重要的。这些规则在深度和焦点上都有所不同。有些规则是高级的,例如定义一个可以应用于几乎任何可扩展性问题的模......一起来看看 《架构真经》 这本书的介绍吧!