内容简介:本文主要研究一下flink的MemoryPoolflink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.javaflink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java
序
本文主要研究一下flink的MemoryPool
MemoryPool
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java
abstract static class MemoryPool {
abstract int getNumberOfAvailableMemorySegments();
abstract MemorySegment allocateNewSegment(Object owner);
abstract MemorySegment requestSegmentFromPool(Object owner);
abstract void returnSegmentToPool(MemorySegment segment);
abstract void clear();
}
- MemoryPool定义了getNumberOfAvailableMemorySegments、allocateNewSegment、requestSegmentFromPool、returnSegmentToPool、clear这几个抽象方法;它有HybridHeapMemoryPool、HybridOffHeapMemoryPool这两个子类
HybridHeapMemoryPool
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java
static final class HybridHeapMemoryPool extends MemoryPool {
/** The collection of available memory segments. */
private final ArrayDeque<byte[]> availableMemory;
private final int segmentSize;
HybridHeapMemoryPool(int numInitialSegments, int segmentSize) {
this.availableMemory = new ArrayDeque<>(numInitialSegments);
this.segmentSize = segmentSize;
for (int i = 0; i < numInitialSegments; i++) {
this.availableMemory.add(new byte[segmentSize]);
}
}
@Override
MemorySegment allocateNewSegment(Object owner) {
return MemorySegmentFactory.allocateUnpooledSegment(segmentSize, owner);
}
@Override
MemorySegment requestSegmentFromPool(Object owner) {
byte[] buf = availableMemory.remove();
return MemorySegmentFactory.wrapPooledHeapMemory(buf, owner);
}
@Override
void returnSegmentToPool(MemorySegment segment) {
if (segment.getClass() == HybridMemorySegment.class) {
HybridMemorySegment heapSegment = (HybridMemorySegment) segment;
availableMemory.add(heapSegment.getArray());
heapSegment.free();
}
else {
throw new IllegalArgumentException("Memory segment is not a " + HybridMemorySegment.class.getSimpleName());
}
}
@Override
protected int getNumberOfAvailableMemorySegments() {
return availableMemory.size();
}
@Override
void clear() {
availableMemory.clear();
}
}
- HybridHeapMemoryPool继承了MemoryPool,它使用的是jvm的heap内存;构造器接收numInitialSegments、segmentSize两个参数用于初始化availableMemory这个ArrayDeque,该queue的元素类型为byte[]
- allocateNewSegment方法调用的是MemorySegmentFactory.allocateUnpooledSegment,用于分配unpooled memory;requestSegmentFromPool方法调用的是availableMemory.remove(),然后调用MemorySegmentFactory.wrapPooledHeapMemory包装为MemorySegment,这个方法没有判断ArrayDeque的大小就直接remove,需要注意
- returnSegmentToPool方法只对HybridMemorySegment类型进行处理,首先将它的byte[]归还到availableMemory,之后调用heapSegment.free()释放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法调用的是availableMemory.clear()
HybridOffHeapMemoryPool
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java
static final class HybridOffHeapMemoryPool extends MemoryPool {
/** The collection of available memory segments. */
private final ArrayDeque<ByteBuffer> availableMemory;
private final int segmentSize;
HybridOffHeapMemoryPool(int numInitialSegments, int segmentSize) {
this.availableMemory = new ArrayDeque<>(numInitialSegments);
this.segmentSize = segmentSize;
for (int i = 0; i < numInitialSegments; i++) {
this.availableMemory.add(ByteBuffer.allocateDirect(segmentSize));
}
}
@Override
MemorySegment allocateNewSegment(Object owner) {
return MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, owner);
}
@Override
MemorySegment requestSegmentFromPool(Object owner) {
ByteBuffer buf = availableMemory.remove();
return MemorySegmentFactory.wrapPooledOffHeapMemory(buf, owner);
}
@Override
void returnSegmentToPool(MemorySegment segment) {
if (segment.getClass() == HybridMemorySegment.class) {
HybridMemorySegment hybridSegment = (HybridMemorySegment) segment;
ByteBuffer buf = hybridSegment.getOffHeapBuffer();
availableMemory.add(buf);
hybridSegment.free();
}
else {
throw new IllegalArgumentException("Memory segment is not a " + HybridMemorySegment.class.getSimpleName());
}
}
@Override
protected int getNumberOfAvailableMemorySegments() {
return availableMemory.size();
}
@Override
void clear() {
availableMemory.clear();
}
}
- HybridOffHeapMemoryPool继承了MemoryPool,它使用的是OffHeap;构造器接收numInitialSegments、segmentSize两个参数用于初始化availableMemory这个ArrayDeque,该queue的元素类型为ByteBuffer
- allocateNewSegment方法调用的是MemorySegmentFactory.allocateUnpooledOffHeapMemory,用于分配unpooled off-heap memory;requestSegmentFromPool方法调用的是availableMemory.remove(),然后调用MemorySegmentFactory.wrapPooledOffHeapMemory包装为MemorySegment,这个方法没有判断ArrayDeque的大小就直接remove,需要注意
- returnSegmentToPool方法只对HybridMemorySegment类型进行处理,首先将它的ByteBuffer归还到availableMemory,之后调用heapSegment.free()释放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法调用的是availableMemory.clear()
小结
- MemoryPool定义了getNumberOfAvailableMemorySegments、allocateNewSegment、requestSegmentFromPool、returnSegmentToPool、clear这几个抽象方法;它有HybridHeapMemoryPool、HybridOffHeapMemoryPool这两个子类
- HybridHeapMemoryPool继承了MemoryPool,它使用的是jvm的heap内存;构造器接收numInitialSegments、segmentSize两个参数用于初始化availableMemory这个ArrayDeque,该queue的元素类型为byte[];allocateNewSegment方法调用的是MemorySegmentFactory.allocateUnpooledSegment,用于分配unpooled memory;requestSegmentFromPool方法调用的是availableMemory.remove(),然后调用MemorySegmentFactory.wrapPooledHeapMemory包装为MemorySegment,这个方法没有判断ArrayDeque的大小就直接remove,需要注意;returnSegmentToPool方法只对HybridMemorySegment类型进行处理,首先将它的byte[]归还到availableMemory,之后调用heapSegment.free()释放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法调用的是availableMemory.clear()
- HybridOffHeapMemoryPool继承了MemoryPool,它使用的是OffHeap;构造器接收numInitialSegments、segmentSize两个参数用于初始化availableMemory这个ArrayDeque,该queue的元素类型为ByteBuffer;allocateNewSegment方法调用的是MemorySegmentFactory.allocateUnpooledOffHeapMemory,用于分配unpooled off-heap memory;requestSegmentFromPool方法调用的是availableMemory.remove(),然后调用MemorySegmentFactory.wrapPooledOffHeapMemory包装为MemorySegment,这个方法没有判断ArrayDeque的大小就直接remove,需要注意;returnSegmentToPool方法只对HybridMemorySegment类型进行处理,首先将它的ByteBuffer归还到availableMemory,之后调用heapSegment.free()释放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法调用的是availableMemory.clear()
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
超简单!一学就懂的互联网金融
视觉图文 / 人民邮电出版社 / 2015-2-1 / 45.00元
零基础、全图解,通过130多个精辟的知识点、220多张通俗易懂的逻辑图表,让您一书在手,即可彻底看懂、玩转互联网金融从菜鸟成为达人,从新手成为互联网金融高手! 本书主要特色:最简洁的版式+最直观的图解+最实用的内容。 本书细节特色:10章专题内容详解+80多个特别提醒奉献+130多个知识点讲解+220多张图片全程图解,深度剖析互联网金融的精华之处,帮助读者在最短的时间内掌握互联网金融知......一起来看看 《超简单!一学就懂的互联网金融》 这本书的介绍吧!