内容简介:HDFS 是一个分布式文件系统,在 HDFS 上写文件的过程与我们平时使用的单机文件系统非常不同,从宏观上来看,在 HDFS 文件系统上创建并写一个文件,流程如下图(来自《Hadoop:The Definitive Guide》一书)所示:具体过程描述如下:下面代码使用 Hadoop 的 API 来实现向 HDFS 的文件写入数据,同样也包括创建一个文件和写数据两个主要过程,代码如下所示:
HDFS 是一个分布式文件系统,在 HDFS 上写文件的过程与我们平时使用的单机文件系统非常不同,从宏观上来看,在 HDFS 文件系统上创建并写一个文件,流程如下图(来自《Hadoop:The Definitive Guide》一书)所示:
具体过程描述如下:
- Client 调用 DistributedFileSystem 对象的 create 方法,创建一个文件输出流(FSDataOutputStream)对象
- 通过 DistributedFileSystem 对象与 Hadoop 集群的 NameNode 进行一次 RPC 远程调用,在 HDFS 的 Namespace 中创建一个文件条目(Entry),该条目没有任何的 Block
- 通过 FSDataOutputStream 对象,向 DataNode 写入数据,数据首先被写入 FSDataOutputStream 对象内部的 Buffer 中,然后数据被分割成一个个 Packet 数据包
- 以 Packet 最小单位,基于 Socket 连接发送到按特定算法选择的 HDFS 集群中一组 DataNode(正常是 3 个,可能大于等于 1)中的一个节点上,在这组 DataNode 组成的 Pipeline 上依次传输 Packet
- 这组 DataNode 组成的 Pipeline 反方向上,发送 ack,最终由 Pipeline 中第一个 DataNode 节点将 Pipeline ack 发送给 Client
- 完成向文件写入数据,Client 在文件输出流(FSDataOutputStream)对象上调用 close 方法,关闭流
- 调用 DistributedFileSystem 对象的 complete 方法,通知 NameNode 文件写入成功
下面代码使用 Hadoop 的 API 来实现向 HDFS 的文件写入数据,同样也包括创建一个文件和写数据两个主要过程,代码如下所示:
static String[] contents = new String[] { "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc", "dddddddddddddddddddddddddddddddd", "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee", }; public static void main(String[] args) { String file = "hdfs://h1:8020/data/test/test.log"; Path path = new Path(file); Configuration conf = new Configuration(); FileSystem fs = null; FSDataOutputStream output = null; try { fs = path.getFileSystem(conf); output = fs.create(path); // 创建文件 for(String line : contents) { // 写入数据 output.write(line.getBytes("UTF-8")); output.flush(); } } catch (IOException e) { e.printStackTrace(); } finally { try { output.close(); } catch (IOException e) { e.printStackTrace(); } } } 复制代码
结合上面的示例代码,我们先从 fs.create(path); 开始,可以看到 FileSystem 的实现 DistributedFileSystem 中给出了最终返回 FSDataOutputStream 对象的抽象逻辑,代码如下所示:
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { statistics.incrementWriteOps(1); return new FSDataOutputStream (dfs.create(getPathName(f), permission, overwrite, true, replication, blockSize, progress, bufferSize), statistics); } 复制代码
上面,DFSClient dfs 的 create 方法中创建了一个 OutputStream 对象,在 DFSClient 的 create 方法:
public OutputStream create(String src, FsPermission permission, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize ) throws IOException { ... ... } 复制代码
创建了一个 DFSOutputStream 对象,如下所示:
final DFSOutputStream result = new DFSOutputStream(src, masked, overwrite, createParent, replication, blockSize, progress, buffersize, conf.getInt("io.bytes.per.checksum", 512)); 复制代码
下面,我们从 DFSOutputStream 类开始,说明其内部实现原理。
DFSOutputStream 内部原理
打开一个 DFSOutputStream 流,Client 会写数据到流内部的一个缓冲区中,然后数据被分解成多个 Packet,每个 Packet 大小为 64k 字节,每个 Packet 又由一组 chunk 和这组 chunk 对应的 checksum 数据组成,默认 chunk 大小为 512 字节,每个 checksum 是对 512 字节数据计算的校验和数据。 当 Client 写入的字节流数据达到一个 Packet 的长度,这个 Packet 会被构建出来,然后会被放到队列 dataQueue 中,接着 DataStreamer 线程会不断地从 dataQueue 队列中取出 Packet,发送到复制 Pipeline 中的第一个 DataNode 上,并将该 Packet 从 dataQueue 队列中移到 ackQueue 队列中。ResponseProcessor 线程接收从 Datanode 发送过来的 ack,如果是一个成功的 ack,表示复制 Pipeline 中的所有 Datanode 都已经接收到这个 Packet,ResponseProcessor 线程将 packet 从队列 ackQueue 中删除。 在发送过程中,如果发生错误,所有未完成的 Packet 都会从 ackQueue 队列中移除掉,然后重新创建一个新的 Pipeline,排除掉出错的那些 DataNode 节点,接着 DataStreamer 线程继续从 dataQueue 队列中发送 Packet。 下面是 DFSOutputStream 的结构及其原理,如图所示:
我们从下面 3 个方面来描述内部流程:
- 创建 Packet
Client 写数据时,会将字节流数据缓存到内部的缓冲区中,当长度满足一个 Chunk 大小(512B)时,便会创建一个 Packet 对象,然后向该 Packet 对象中写 Chunk Checksum 校验和数据,以及实际数据块 Chunk Data,校验和数据是基于实际数据块计算得到的。每次满足一个 Chunk 大小时,都会向 Packet 中写上述数据内容,直到达到一个 Packet 对象大小(64K),就会将该 Packet 对象放入到 dataQueue 队列中,等待 DataStreamer 线程取出并发送到 DataNode 节点。
- 发送 Packet
DataStreamer 线程从 dataQueue 队列中取出 Packet 对象,放到 ackQueue 队列中,然后向 DataNode 节点发送这个 Packet 对象所对应的数据。
- 接收 ack
发送一个 Packet 数据包以后,会有一个用来接收 ack 的 ResponseProcessor 线程,如果收到成功的 ack,则表示一个 Packet 发送成功。如果成功,则 ResponseProcessor 线程会将 ackQueue 队列中对应的 Packet 删除。
DFSOutputStream 初始化
首先看一下,DFSOutputStream 的初始化过程,构造方法如下所示:
DFSOutputStream(String src, FsPermission masked, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, int bytesPerChecksum) throws IOException { this(src, blockSize, progress, bytesPerChecksum, replication); computePacketChunkSize(writePacketSize, bytesPerChecksum); // 默认 writePacketSize=64*1024(即64K),bytesPerChecksum=512(没512个字节计算一个校验和), try { if (createParent) { // createParent为true表示,如果待创建的文件的父级目录不存在,则自动创建 namenode.create(src, masked, clientName, overwrite, replication, blockSize); } else { namenode.create(src, masked, clientName, overwrite, false, replication, blockSize); } } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileAlreadyExistsException.class, FileNotFoundException.class, NSQuotaExceededException.class, DSQuotaExceededException.class); } streamer.start(); // 启动一个DataStreamer线程,用来将写入的字节流打包成packet,然后发送到对应的Datanode节点上 } 上面computePacketChunkSize方法计算了一个packet的相关参数,我们结合代码来查看,如下所示: int chunkSize = csize + checksum.getChecksumSize(); int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1); packetSize = n + chunkSize*chunksPerPacket; 复制代码
我们用默认的参数值替换上面的参数,得到:
int chunkSize = 512 + 4; int n = 21 + 4; chunksPerPacket = Math.max((64*1024 - 25 + 516-1)/516, 1); // 127 packetSize = 25 + 516*127; 复制代码
上面对应的参数,说明如下表所示:
参数名称 | 参数值 | 参数含义 |
---|---|---|
chunkSize | 512+4=516 | 每个 chunk 的字节数(数据 + 校验和) |
csize | 512 | 每个 chunk 数据的字节数 |
psize | 64*1024 | 每个 packet 的最大字节数(不包含 header) |
DataNode.PKT_HEADER_LEN | 21 | 每个 packet 的 header 的字节数 |
chunksPerPacket | 127 | 组成每个 packet 的 chunk 的个数 |
packetSize | 25+516*127=65557 | 每个 packet 的字节数(一个 header + 一组 chunk) |
在计算好一个 packet 相关的参数以后,调用 create 方法与 Namenode 进行 RPC 请求,请求创建文件:
if (createParent) { // createParent为true表示,如果待创建的文件的父级目录不存在,则自动创建 namenode.create(src, masked, clientName, overwrite, replication, blockSize); } else { namenode.create(src, masked, clientName, overwrite, false, replication, blockSize); } 复制代码
远程调用上面方法,会在 FSNamesystem 中创建对应的文件路径,并初始化与该创建的文件相关的一些信息,如租约(向 Datanode 节点写数据的凭据)。文件在 FSNamesystem 中创建成功,就要初始化并启动一个 DataStreamer 线程,用来向 Datanode 写数据,后面我们详细说明具体处理逻辑。
Packet 结构与定义
Client 向 HDFS 写数据,数据会被组装成 Packet,然后发送到 Datanode 节点。Packet 分为两类,一类是实际数据包,另一类是 heatbeat 包。一个 Packet 数据包的组成结构,如图所示:
上图中,一个 Packet 是由 Header 和 Data 两部分组成,其中 Header 部分包含了一个 Packet 的概要属性信息,如下表所示:
字段名称 | 字段类型 | 字段长度 | 字段含义 |
---|---|---|---|
pktLen | int | 4 | 4 + dataLen + checksumLen |
offsetInBlock | long | 8 | Packet 在 Block 中偏移量 |
seqNo | long | 8 | Packet 序列号,在同一个 Block 唯一 |
lastPacketInBlock | boolean | 1 | 是否是一个 Block 的最后一个 Packet |
dataLen | int | 4 | dataPos – dataStart,不包含 Header 和 Checksum 的长度 |
Data 部分是一个 Packet 的实际数据部分,主要包括一个 4 字节校验和(Checksum)与一个 Chunk 部分,Chunk 部分最大为 512 字节。 在构建一个 Packet 的过程中,首先将字节流数据写入一个 buffer 缓冲区中,也就是从偏移量为 25 的位置(checksumStart)开始写 Packet 数据的 Chunk Checksum 部分,从偏移量为 533 的位置(dataStart)开始写 Packet 数据的 Chunk Data 部分,直到一个 Packet 创建完成为止。如果一个 Packet 的大小未能达到最大长度,也就是上图对应的缓冲区中,Chunk Checksum 与 Chunk Data 之间还保留了一段未被写过的缓冲区位置,这种情况说明,已经在写一个文件的最后一个 Block 的最后一个 Packet。在发送这个 Packet 之前,会检查 Chunksum 与 Chunk Data 之间的缓冲区是否为空白缓冲区(gap),如果有则将 Chunk Data 部分向前移动,使得 Chunk Data 1 与 Chunk Checksum N 相邻,然后才会被发送到 DataNode 节点。 我们看一下 Packet 对应的 Packet 类定义,定义了如下一些字段:
ByteBuffer buffer; // only one of buf and buffer is non-null byte[] buf; long seqno; // sequencenumber of buffer in block long offsetInBlock; // 该packet在block中的偏移量 boolean lastPacketInBlock; // is this the last packet in block? int numChunks; // number of chunks currently in packet int maxChunks; // 一个packet中包含的chunk的个数 int dataStart; int dataPos; int checksumStart; int checksumPos; 复制代码
Packet 类有一个默认的没有参数的构造方法,它是用来做 heatbeat 的,如下所示:
Packet() { this.lastPacketInBlock = false; this.numChunks = 0; this.offsetInBlock = 0; this.seqno = HEART_BEAT_SEQNO; // 值为-1 buffer = null; int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; // 21+4=25 buf = new byte[packetSize]; checksumStart = dataStart = packetSize; checksumPos = checksumStart; dataPos = dataStart; maxChunks = 0; } 复制代码
通过代码可以看到,一个 heatbeat 的内容,实际上只有一个长度为 25 字节的 header 数据。通过 this.seqno = HEART_BEAT_SEQNO; 的值可以判断一个 packet 是否是 heatbeat 包,如果 seqno 为 - 1 表示这是一个 heatbeat 包。
Client 发送 Packet 数据
可以 DFSClient 类中看到,发送一个 Packet 之前,首先需要向选定的 DataNode 发送一个 Header 数据包,表明要向 DataNode 写数据,该 Header 的数据结构,如图所示:
上图显示的是 Client 发送 Packet 到第一个 DataNode 节点的 Header 数据结构,主要包括待发送的 Packet 所在的 Block(先向 NameNode 分配 Block ID 等信息)的相关信息、Pipeline 中另外 2 个 DataNode 的信息、访问令牌(Access Token)和校验和信息,Header 中各个字段及其类型,详见下表:
字段名称 | 字段类型 | 字段长度 | 字段含义 |
---|---|---|---|
Transfer Version | short | 2 | Client 与 DataNode 之间数据传输版本号,由常量 DataTransferProtocol.DATA_TRANSFER_VERSION 定义,值为 17 |
OP | int | 4 | 操作类型,由常量 DataTransferProtocol.OP_WRITE_BLOCK 定义,值为 80 |
blkId | long | 8 | Block 的 ID 值,由 NameNode 分配 |
GS | long | 8 | 时间戳(Generation Stamp),NameNode 分配 blkId 的时候生成的时间戳 |
DNCnt | int | 4 | DataNode 复制 Pipeline 中 DataNode 节点的数量 |
Recovery Flag | boolean | 1 | Recover 标志 |
Client | Text | Client 主机的名称,在使用 Text 进行序列化的时候,实际包含长度 len 与主机名称字符串 ClientHost | |
srcNode | boolean | 1 | 是否发送 src node 的信息,默认值为 false,不发送 src node 的信息 |
nonSrcDNCnt | int | 4 | 由 Client 写的该 Header 数据,该数不包含 Pipeline 中第一个节点(即为 DNCnt-1) |
DN2 | DatanodeInfo | DataNode 信息,包括 StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState | |
DN3 | DatanodeInfo | DataNode 信息,包括 StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState | |
Access Token | Token | 访问令牌信息,包括 IdentifierLength、Identifier、PwdLength、Pwd、KindLength、Kind、ServiceLength、Service | |
CheckSum Header | DataChecksum | 1+4 | 校验和 Header 信息,包括 type、bytesPerChecksum |
Header 数据包发送成功,Client 会收到一个成功响应码(DataTransferProtocol.OP_STATUS_SUCCESS = 0),接着将 Packet 数据发送到 Pipeline 中第一个 DataNode 上,如下所示:
Packet one = null; one = dataQueue.getFirst(); // regular data packet ByteBuffer buf = one.getBuffer(); // write out data to remote datanode blockStream.write(buf.array(), buf.position(), buf.remaining()); if (one.lastPacketInBlock) { // 如果是Block中的最后一个Packet,还要写入一个0标识该Block已经写入完成 blockStream.writeInt(0); // indicate end-of-block } 复制代码
否则,如果失败,则会与 NameNode 进行 RPC 调用,删除该 Block,并把该 Pipeline 中第一个 DataNode 加入到 excludedNodes 列表中,代码如下所示:
if (!success) { LOG.info("Abandoning " + block); namenode.abandonBlock(block, src, clientName); if (errorIndex < nodes.length) { LOG.info("Excluding datanode " + nodes[errorIndex]); excludedNodes.add(nodes[errorIndex]); } // Connection failed. Let's wait a little bit and retry retry = true; } 复制代码
DataNode 端服务组件
数据最终会发送到 DataNode 节点上,在一个 DataNode 上,数据在各个组件之间流动,流程如下图所示:
DataNode 服务中创建一个后台线程 DataXceiverServer,它是一个 SocketServer,用来接收来自 Client(或者 DataNode Pipeline 中的非最后一个 DataNode 节点)的写数据请求,然后在 DataXceiverServer 中将连接过来的 Socket 直接派发给一个独立的后台线程 DataXceiver 进行处理。所以,Client 写数据时连接一个 DataNode Pipeline 的结构,实际流程如图所示:
每个 DataNode 服务中的 DataXceiver 后台线程接收到来自前一个节点(Client/DataNode)的 Socket 连接,首先读取 Header 数据:
Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize, in.readLong()); LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " + localAddress); int pipelineSize = in.readInt(); // num of datanodes in entire pipeline boolean isRecovery = in.readBoolean(); // is this part of recovery? String client = Text.readString(in); // working on behalf of this client boolean hasSrcDataNode = in.readBoolean(); // is src node info present if (hasSrcDataNode) { srcDataNode = new DatanodeInfo(); srcDataNode.readFields(in); } int numTargets = in.readInt(); if (numTargets < 0) { throw new IOException("Mislabelled incoming datastream."); } DatanodeInfo targets[] = new DatanodeInfo[numTargets]; for (int i = 0; i < targets.length; i++) { DatanodeInfo tmp = new DatanodeInfo(); tmp.readFields(in); targets[i] = tmp; } Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>(); accessToken.readFields(in); 复制代码
上面代码中,读取 Header 的数据,与前一个 Client/DataNode 写入 Header 字段的顺序相对应,不再累述。在完成读取 Header 数据后,当前 DataNode 会首先将 Header 数据再发送到 Pipeline 中下一个 DataNode 结点,当然该 DataNode 肯定不是 Pipeline 中最后一个 DataNode 节点。接着,该 DataNode 会接收来自前一个 Client/DataNode 节点发送的 Packet 数据,接收 Packet 数据的逻辑实际上在 BlockReceiver 中完成,包括将来自前一个 Client/DataNode 节点发送的 Packet 数据写入本地磁盘。在 BlockReceiver 中,首先会将接收到的 Packet 数据发送写入到 Pipeline 中下一个 DataNode 节点,然后再将接收到的数据写入到本地磁盘的 Block 文件中。
DataNode 持久化 Packet 数据
在 DataNode 节点的 BlockReceiver 中进行 Packet 数据的持久化,一个 Packet 是一个 Block 中一个数据分组,我们首先看一下,一个 Block 在持久化到磁盘上的物理存储结构,如下图所示:
每个 Block 文件(如上图中 blk_1084013198 文件)都对应一个 meta 文件(如上图中 blk_1084013198_10273532.meta 文件),Block 文件是一个一个 Chunk 的二进制数据(每个 Chunk 的大小是 512 字节),而 meta 文件是与每一个 Chunk 对应的 Checksum 数据,是序列化形式存储。
写文件过程中 Client/DataNode 与 NameNode 进行 RPC 调用
Client 在 HDFS 文件系统中写文件过程中,会发生多次与 NameNode 节点进行 RPC 调用来完成写数据相关操作,主要是在如下时机进行 RPC 调用:
- 写文件开始时创建文件:Client 调用 create 在 NameNode 节点的 Namespace 中创建一个标识该文件的条目
- 在 Client 连接 Pipeline 中第一个 DataNode 节点之前,Client 调用 addBlock 分配一个 Block(blkId+DataNode 列表 + 租约)
- 如果与 Pipeline 中第一个 DataNode 节点连接失败,Client 调用 abandonBlock 放弃一个已经分配的 Block
- 一个 Block 已经写入到 DataNode 节点磁盘,Client 调用 fsync 让 NameNode 持久化 Block 的位置信息数据
- 文件写完以后,Client 调用 complete 方法通知 NameNode 写入文件成功
- DataNode 节点接收到并成功持久化一个 Block 的数据后,DataNode 调用 blockReceived 方法通知 NameNode 已经接收到 Block
具体 RPC 调用的详细过程,可以参考源码。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
与孩子一起学编程
[美] 桑德Warren Sande、Carter Sande / 苏金国、姚曜 等 / 人民邮电出版社 / 2010-11 / 65.00元
一本老少咸宜的编程入门奇书!一册在手,你完全可以带着自己的孩子,跟随Sande父子组合在轻松的氛围中熟悉那些编程概念,如内存、循环、输入和输出、数据结构和图形用户界面等。这些知识一点儿也不高深,听起来备感亲切,书中言语幽默风趣而不失真义,让学习过程充满乐趣。细心的作者还配上了孩子们都喜欢的可爱漫画和经过运行测试的程序示例,教你用最易编写和最易理解的Python语言,写出你梦想中的游戏程序。 ......一起来看看 《与孩子一起学编程》 这本书的介绍吧!