标签


Hadoop 数据节点之写数据

2014年07月19日

简介

流式接口的数据读、写是非常重要的接口。与读数据相比,写数据实现起来更加复杂。

客户端写数据的操作码为80.

准备

之前在Hadoop流式接口中介绍过写数据的请求报文:

 +------------------------------------------------------------------------------+
 | TRANSFER_VERSION   |    80         |      blockId     |    generationStamp   |
 +------------------------------------------------------------------------------+
 |   pipelineSize     |  isRecovery   |      client      |    hasSrcDataNode    |
 +------------------------------------------------------------------------------+
 |   srcDatanode...   |   numTargets  |  targets...      |      tocken          |   
 +------------------------------------------------------------------------------+  
 |  dataChecksum      |
 +--------------------+  

与读数据类似,当从输入流中读取的操作码为80时,调用DataXceiver.writeBlock()处理请求。

操作步骤:

  • 读取协议字段,包括blockId、generationStamp、pipeline、isRecovery、client 和targets等数据
  • 参数校验
  • 根据请求信息构建BlockReceiver数据块接收器接收数据
  • 清理

DataXceiver.writeBlock()代码如下

private void writeBlock(DataInputStream in) throws IOException {
	// 读取协议接口数据
	replyOut = new DataOutputStream( NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
	...
	short mirrorInStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
	try {
	  // open a block receiver and check if the block does not exist
	  blockReceiver = new BlockReceiver(block, in,  s.getRemoteSocketAddress().toString(),
	      s.getLocalSocketAddress().toString(), isRecovery, client, srcDataNode, datanode);
	  if (targets.length > 0) {
	  	...
	    try {
	      ... // 包含往下一个节点进行交互的socket和流
	      blockReceiver.writeChecksumHeader(mirrorOut);
	      ...
	    } 
	    ...
	  }
	  ...
	  blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,  mirrorAddr, null, targets.length);
	  ...  
	} 
	...
}

DataXceiver.writeBlock()方法首先读取数据报文基本字段,然后构建BlockReceiver,BlockReceiver的构造函数为写数据块和校验数据块文件打开输出数据流。

BlockReceiver(Block block, DataInputStream in, String inAddr,
            String myAddr, boolean isRecovery, String clientName, 
            DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
try{
  this.block = block;
  ... 
  streams = datanode.data.writeToBlock(block, isRecovery, clientName == null || clientName.length() == 0);
  this.finalized = false;
  if (streams != null) {
    this.out = streams.dataOut;
    this.cout = streams.checksumOut;
    if (out instanceof FileOutputStream) {
      this.outFd = ((FileOutputStream) out).getFD();
    } 
    ...
    this.checksumOut = new DataOutputStream(new BufferedOutputStream(
                             streams.checksumOut, SMALL_BUFFER_SIZE));
    }
  }
} 
...
}

DataXceiver委托BlockReceiver.receiveBlock()方法处理写数据的数据包,然后调用DataNode.notifyNamenodeReceiveBlock()通知名字节点,最后writeBlock()将关闭上下流的输入/输出流,完成一次写数据请求。

与BlockSender类似,receiveBlock()方法将数据的接收工作主要由receivePacket()处理。receivePacket()至少读入一个请求数据包。

void receiveBlock(
  DataOutputStream mirrOut, // output to next datanode
  DataInputStream mirrIn,   // input from next datanode
  DataOutputStream replyOut,  // output to previous datanode
  String mirrAddr, DataTransferThrottler throttlerArg,
  int numTargets) throws IOException {
  	...
	try {
	  // write data chunk header
	  if (!finalized) {
	    BlockMetadataHeader.writeHeader(checksumOut, checksum);
	  }
	  if (clientName.length() > 0) {
	    responder = new Daemon(datanode.threadGroup,  new PacketResponder(this, block, mirrIn, 
	                                               replyOut, numTargets, Thread.currentThread()));
	    responder.start(); // start thread to processes reponses
	  }

	  // Receive until packet length is zero.
	  while (receivePacket() > 0) {}

	  // flush the mirror out
	  if (mirrorOut != null) {
	    try {
	      mirrorOut.writeInt(0); // mark the end of the block
	      mirrorOut.flush();
	    } catch (IOException e) {
	      handleMirrorOutError(e);
	    }
	  }

	  if (responder != null) {
	    ((PacketResponder)responder.getRunnable()).close();
	  }
	  ...

	} catch (IOException ioe) {
	  ...
	} finally {
	  ...
	  responder.join();		    
	}
}

receivePacket()首先把数据包发送到下游数据节点,通过此种方式,写数据的数据包通过管道达到所有数据节点上面。

private int receivePacket() throws IOException {
    ...	    
    // First write the packet to the mirror:
    if (mirrorOut != null && !mirrorError) {
      try {
        mirrorOut.write(buf.array(), buf.position(), buf.remaining());
        mirrorOut.flush();
      } catch (IOException e) {
        handleMirrorOutError(e);
      }
    }
    ...
}

receivePacket()后续工作将请求包的数据和数据校验和写入对应的文件,只有最后一个数据节点需要通过verifyChunks()方法对数据进行校验。当发现有错误时,需要上报名字节点。

private int receivePacket() throws IOException {
  ...
  if (mirrorOut == null || clientName.length() == 0) {
    verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
  }

  try {
    if (!finalized) {
        //finally write to the disk :
        out.write(pktBuf, dataOff, len);

        ...
        partialCrc.update(pktBuf, dataOff, len);
        byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
        checksumOut.write(buf);
        ...
    }
  } catch (IOException iex) {
    datanode.checkDiskError(iex);
    throw iex;
  }
}

数据块接收器引入PacketResponder线程用来接受下流数据块节点数据。BlockReceiver用来接受来自上游节点的数据。

PacketResponder线程从下游节点接受数据,并在合适的时候,向上游节点发送确认包,需满足两个条件:

  • 当前数据节点已经顺利处理完该数据包
  • 当前数据节点(处于管道中间时)收到下游数据节点的数据包确认

当前节点由BlockReceiver线程处理数据包,BlockerReceiver.receiver()方法每处理一个数据包,就通过PacketResponder.enqueue将对应的信息(BlockReceiver.Packet)放入ackQueue中,由PacketResponder线程不断循环处理,这是一个典型的生产者-消费者线程。

class PacketResponder implements Runnable, FSConstants {
	private LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
	...

	public void run() {
      ...
      while (running && datanode.shouldRun && !lastPacketInBlock) {
        try {
          ...
          try { 
            Packet pkt = null;
            synchronized (this) {
              // wait for a packet to arrive
              while (running && datanode.shouldRun && ackQueue.size() == 0) {
                  ...
                  wait();
                }
                if (!running || !datanode.shouldRun) {
                  break;
                }
                pkt = ackQueue.removeFirst();
                expected = pkt.seqno;
                notifyAll();
              }
              // receive an ack if DN is not the last one in the pipeline
              if (numTargets > 0 && !localMirrorError) {
                // read an ack from downstream datanode
                ack.readFields(mirrorIn);
                seqno = ack.getSeqno();
                ...
              }
              lastPacketInBlock = pkt.lastPacketInBlock;
            } 
            ...
            // If this is the last packet in block, then close block
            // file and finalize the block before responding success
            if (lastPacketInBlock && !receiver.finalized) {
              receiver.close();
              final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
              block.setNumBytes(receiver.offsetInBlock);
              datanode.data.finalizeBlock(block);
              datanode.myMetrics.incrBlocksWritten();
              datanode.notifyNamenodeReceivedBlock(block, 
                  DataNode.EMPTY_DEL_HINT);
              ...
            }

            // construct my ack message
            ...
            PipelineAck replyAck = new PipelineAck(expected, replies);
            // send my ack back to upstream datanode
            replyAck.write(replyOut);
            replyOut.flush();
           
        } 
        ...
      }
    }
}

PacketResponder.run()的处理分为两个部分,第一部分等待之前描述的两个条件满足,第二部分为条件满足后的处理。

通过wait()等待ackQueue中的数据,如果当前节点处于数据流管道的中间,在流mirrorIn上读取下游的确定。

如果处理的是整个写请求最后一个数据包的确认,则关闭PacketResponder所属的数据块接收器对象,设置数据块长度,使用FSDataset.finalizeBlock()方法提交数据,最后通过DataNode.notifyNamenodeReceivedBlock()通知名字节点,完成一个数据块的接收。

方法的最后向上游节点发送确认包,同时携带下游节点的确认包。