内容简介:本文通过更改配置及数据结构改造,快速解决HDFS Decommission缓慢问题。
本文通过更改配置及数据结构改造,快速解决HDFS Decommission缓慢问题。
上篇文章回顾: 记一次Open-Falcon-Graph频繁OOM问题排查
//
背景
//
c3prc-xiami有大量raid单副本文件,decommission单个datanode速度很慢,观察监控指标,发现:
-
网卡流量始终保持低速,60~80mb/s
-
磁盘io util也是单个磁盘100%在某一个时刻,也即是同一时间只有一个磁盘在工作
这样下线速度就十分缓慢,一个datanode一天只能下4w个block。而一个datanode平均有20w个block,这个速度明显不符合要求。
最开始认为是配置问题,我们分析了几种配置,有一定的效果,提高到了6w个block每天,但还是慢。
最后我们从代码层面着手,改变相关数据结构,使下线速度明显提升,一个datanode下线平均1到2天就能下完。
//
分析配置
//
public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = "dfs.namenode.replication.max-streams";public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;public static final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit";public static final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4;
blockmanager.maxReplicationStreams这个变量有两个作用:
(1)在选择从哪里把块复制出去的时候chooseSourceDatanode () ,如果某个dn上已经有>maxReplicationStreams的块在被复制则不会再选中它作为源了。
(2)HeartbeatManager每次会想dn发送DNA_TRANSFER命令,会从DatanodeDescriptor.replicateBlocks取一定数量的block进行传输,而每个DN能够启动的DataTransfer线程数最大不能超过maxReplicationStreams。
blockmanage.replicationStreamsHardLimit同上一个变量类似,只是在chooseSourceDatanode(),如果block的优先级最高,这个dn还能再多复制2个(默认值分别是2,4),但是不能>replicationStreamsHardLimit。
所以在同一时刻最多replicationStreamsHardLimit被选出,而且是在同一个dn中。但是单纯调整hardLimit并没有多少效果。
真正控制一个dn往外拷贝数据还是maxReplicationStreams,dn通过心跳向NN报告正在进行Transfer的线程数,而后NN向dn发送maxTransfer个DNA_transfer CMD:
//get datanode commandsfinal int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress; xmitsInProgress=正在传输数量
对于每个dn:从待复制blocks队列取出maxTransfers
public List<BlockTargetPair> getReplicationCommand(int maxTransfers) { return replicateBlocks.poll(maxTransfers); }
结论1:
通过调大上述2个参数,从2到4,再调整到8,效果还是比较明显,dn中的日志也反映出同一个时刻传输线程数有所增加。
但当调整为12或者更大时,就没有多少效果了。整体网速也没有上来。
目前调整为12是比较合理的。和单个dn磁盘数量对应。
public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY ="dfs.namenode.replication.max-streams";
public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;
public static final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY ="dfs.namenode.replication.max-streams-hard-limit";
public static final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4;
namenode中的blockmanager. ReplicationMonitor每3秒会computeDatanodeWork并且取一批block,然后告诉dn去复制这些blocks取的数量:
final int blocksToProcess = numlive
* this.blocksReplWorkMultiplier;
开始我认为调大能加快下线速度,但这个参数影响小,它的作用仅仅是把取出来的block放入DatanodeDescriptor的等待队列中(replicateBlocks)
同过观察NN日志发现如下问题:
-
ReplicationMonitor每次迭代会打印日志:
askdn_ip to replicate blk_xxx to another_dn.
并且数量为blocksToProcess。在同一时刻,这个dn_ip都是一样的。
-
每次迭代(在一段时间内循环)多次要求同一个dn拷贝blocksToProcess(c3prc-xiaomi=800个)blocks,而dn每次最多拷贝maxReplicationStreams
-
也就是说NN做了很多无效工作,取的blocks都是同一个dn,当取到轮到下一个dn时,又是同样的问题。并不是每个dn都在同时工作。观察监控发现dn间歇性往外拷贝数据。
ReplicationMonitor每次取blocksToProcess个blocks的时候,这些blocks可能是同一个dn上,甚至同一个dn的同一个磁盘上。
因此,要分析每次的取法。目的是能取出不连续的blocks,能让不同dn,不同磁盘同时工作。
//
分析数据结构
//
和下线主要有关的代码集中在blockmanager,以及UnderReplicatedBlocks
/**
* Store set of Blocks that need to be replicated 1 or more times.
* We also store pending replication-orders.
*/public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
所有需要做replicate的block都会放在blockmanager.neededReplications中。
UnderReplicatedBlocks是一个复合结构,保存者5个(LEVEL=5)带有优先级的队列:
private final List<LightWeightHashSet<Block>> priorityQueues
对于只有一个副本的block,或者replica都在decommision节点上,它在优先级最高的队列中,raid副本下线就是这种情况。
对于每一个优先级的队列,实现是LightWeightLinkedSet,它是一个有序的hashset,元素收尾相连。
UnderReplicatedBlocks的实现是保证每个队列的元素都会被取到,同时,每个队列中的元素按顺序依次被取出,不会让某些block永远没机会被取出。
具体做法是为每一个队列保存一个偏移:
private finalList<LightWeightHashSet<Block>> priorityQueues
如果取到最后一个队列(LEVEL-1)末尾了,就重置所有队列的偏移=0,从头再取。
这5个队列都是先进先被选,队尾进,而且优先级Level=0的更容易被取到。具体取的算法是在UnderReplicatedBlocks.chooseUnderReplicatedBlocks()中。
在进行decommission操作的时候,可能整个dn的块都是要加入neededReplication队列(raid集群如此,如果有3副本,那一个block有3个source.单副本的source dn只有一个)。这时候,加入某一个优先级队列(LightWeightLinkedSet)的blocks是有序的,而且连续上w个blocks属于同一个dn,甚至连续在同一个磁盘上。由于从队列是从头到尾顺序取,所以会有问题,尤其是对单副本的情况。
因此,我们想要随机从优先级队列中取出block.但又要保证每个block被取到,所以还是要有序的。
//
数据结构改造-ShuffleAddSet
//
实际上,这么做是合理的,先进入队列可以优先被取到。但对于我们这种场景,并不要求取出的顺序性和放入顺序一致。如果能打乱顺序,再取出就能使一次迭代取出的blocks尽可能在不同dn或者不同磁盘上。
LightWeightLinkedSet: UnderReplicatedBlocks默认采用的优先级队列的实现。本身是一个hashset继承LightWeightHashSet,同时元素双向连接,带有Head tail
LightWeightHashSet: 轻量级hashSet
ShuffleAddSet: Look like LightWeightLinkedSet
我们实现了一种ShuffleAddSet继承LightWeightHashSet,尽可能表现和LightWeightLinkedSet一致,这样对外UnderReplicatedBlocks不需要做过多修改。
ShuffleAddSet中有两个队列,而对外表现为一个优先级队列。
第一个队列, 同时也是Set和LightWeightLinkedSet是一样的,双向有序,外部调用方法取元素也是从这个Set取。
第二个队列, 缓存队列cachedAddList,一开始用ArrayList、LinkedList,由于性能问题不使用了。现在也是用LightWeightHashSet,HashSet具有天然无序性质。
每当有新的元素加入,首先会放入cachedAddList中,随后当第一个队列数据空或者取到末尾,立即将cachedAddList数据shuffle,并拷贝到第一个队列中,然后清空自己,继续接收新元素。由于HashSet本身无序,因此少一步shuffle操作,直接从cachedAddList拷贝至第一个队列即可。
需要注意的是取数据(调用迭代器取)和add操作必须是同步的,因为取的时候第一个队列到达末尾或着空,会触发shuffle and add操作,清空cachedlist。
综上,第一个队列只有为空或者取到末尾的时候,会从第二个队里加入数据,如果都为空说明整个优先级队列空。每从cachedlist加入一批,这一批就是随机顺序,虽然第一个队列不是整个队列都随机打乱,总体上,第一个队列还是是乱序的。
这样做的问题:
(1)外部调用这个优先级队列的add操作,先进入队列的不一定是先被调度,后加入cachedList的元素,也可能排在第一个队列的前面先调度。
(2)极端情况,如果每次加入1个,然后再取1个元素,少量的元素,或者取出数量和频率远大于add数量,(比如cachedlist加入一个元素,第一个队列立刻到达末尾了)实际上没有达到随机效果。
好在我们的场景不要求FIFO,而且每次Decommision初始加入UnderReplicatedBlocks的block数量很大,ReplicationMonitor每次取/处理的数量blockToProcess,相对而言(下线8台节点,UnderReplicatedBlocks会达到180w)较小。发生shuffle and add不是很频繁,也不是性能瓶颈。观测到最长时间是200ms。
同时,我们将这个功能ShuffleAddSet作为一种可配置项目,UnderReplicatedBlocks可以在初始化时候选择用ShuffleAddSet或者LightWeightLinkedSet
dfs.namenode.blockmanagement.queues.shuffle
如下:
/** the queues themselves */private final List<LightWeightHashSet<Block>> priorityQueues
= new ArrayList<LightWeightHashSet<Block>>();public static final String NAMENODE_BLOCKMANAGEMENT_QUEUES_SHUFFLE = "dfs.namenode.blockmanagement.queues.shuffle";/** Stores the replication index for each priority */private Map<Integer, Integer> priorityToReplIdx = new HashMap<Integer, Integer>(LEVEL);/** Create an object. */UnderReplicatedBlocks() {
Configuration conf = new HdfsConfiguration(); boolean useShuffle = conf.getBoolean(NAMENODE_BLOCKMANAGEMENT_QUEUES_SHUFFLE, false); for (int i = 0; i < LEVEL; i++) { if (useShuffle) {
priorityQueues.add(new ShuffleAddSet<Block>());
} else {
priorityQueues.add(new LightWeightLinkedSet<Block>());
}
priorityToReplIdx.put(i, 0);
}
}
//
性能问题
//
我们发现使用ShuffleAddSet时候,开始下线8台dn时会卡住,主要是卡在DecommissionManager$Monitor,每次要检查此dn上全部的blocks是否 underReplicated,这样blocks很多拿写锁的时间会很长。
也会检查ShuffleAddSet.contains(blocks),由于有两个队列,所以contain开销会比之前大。
2019-04-12,12:09:35,876 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Long read lock is held at 1555042175235. And released after 641 milliseconds.Call stack is:
java.lang.Thread.getStackTrace(Thread.java:1479)
org.apache.hadoop.util.StringUtils.getStackTrace(StringUtils.java:914)
org.apache.hadoop.hdfs.server.namenode.FSNamesystemLock.checkAndLogLongReadLockDuration(FSNamesystemLock.java:104)
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.writeUnlock(FSNamesystem.java:1492)
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.isReplicationInProgress(BlockManager.java:3322)
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.checkDecommissionState(DatanodeManager.java:751)
org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager$Monitor.check(DecommissionManager.java:93)
org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager$Monitor.run(DecommissionManager.java:70)
java.lang.Thread.run(Thread.java:662)
通过修改isReplicationInProgress方法,类似处理blockreport,每隔一定数量放一次锁的方式,缓解写锁时间太长导致其他rpc请求没有响应。
++processed;// Release lock per 5w blocks processed and has too many underReplicatedBlocks.if (processed == numReportBlocksPerIteration &&
namesystem.hasWriteLock() && underReplicatedBlocks > numReportBlocksPerIteration) {
namesystem.writeUnlock();
processed = 0;
namesystem.writeLock();
}
//
结论
//
对于单副本较多的集群,可采用如下方式下线:
dfs.namenode.blockmanagement.queues.shuffle= truedfs.namenode.replication.max-streams= 12 默认是2,限制一个datanode复制数量
dfs.namenode.replication.max-streams-hard-limit=12 默认是4dfs.namenode.replication.work.multiplier.per.iteration= 4 默认2 / namenode一次调度的数量=该值×datanodes数量
开启shuffle and add,并调整单个dn最大复制数为物理磁盘数量,对于小集群可以调大work.multiplier一次处理4倍LiveDatanode数量block.使下线速度最大化。
注意的问题:
每次操作下线2台节点(refreshNodes),隔10分钟再下2台,一直到8台。同时下线的dn最好不要超过8台。不然DecommissionManager的开销会很大,影响NN正常服务。
END
我知道你会再看一下的
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- XMLDecoder解析流程分析
- dubbo注册服务IP解析异常及IP解析源码分析
- 技术问题分析-报文解析(10.23)
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- Colly源码解析——结合例子分析底层实现
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。