标签


Hadoop 数据节点之读数据

2014年07月14日

简介

在HDFS中,文件由一个或者多个数据块组成,数据块以本地文件的形式存储在数据节点上,并对外提供文件数据的访问功能。

客户端读写文件时,先通过与名字节点交互获得数据块与数据节点的对应关系,然后进一步与数据节点进行交互 在HDFS中,名字节点和数据节点为主从架构,数据节点接收名字节点的管理,执行名字节点下达的指令。

流式接口

为了提高数据吞吐量,HDFS对文件的读写采用基于TCP流的数据访问接口,而不是IPC接口。

流式接口的实现是基于Java Socket、NIO和多线程,提供了一个高效率、稳定的访问接口。

存储节点

数据存储相关的类有StorageInfo、Storage和DataSorage。其中继承关系为:

StorageInfo <-- Storage <-- DataSorage
                        <-- FSImage

StorageInfo、Storage和DataSorage的基本属性:

public class StorageInfo {
  public int   layoutVersion;  // Version read from the stored file.
  public int   namespaceID;    // namespace id of the storage
  public long  cTime;          // creation timestamp
  ...
}

public abstract class Storage extends StorageInfo {
  private NodeType storageType;    // Type of the node using this storage 
  protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>();
  ...
}

public class DataStorage extends Storage { 
  private String storageID;
  ...
}

数据节点存储DataStorage继承Storage(抽象类),FSImage同样继承Storage,用于组织名字节点的磁盘数据。 StorageInfo为数据节点和名字节点提供通用的存储服务,Storage可以管理多个存储目录StorageDirectory(内部类)

  • layoutVersion:存储系统信息结构的版本号
  • namespaceID:存储系统名字空间标志
  • cTime:存储系统创建时间
  • storageType:存储类别,在数据节点中的值为DATA_NODE
  • storageID:存储ID

DataStorage用于数据节点存储空间的生存期管理,而与数据节点存储逻辑相关的操作,如创建数据块文件、校验信息文件则在FSDataset中定义。

FSDataset继承FSDatasetInterface,FSDataset接口方法大致分为两类:

  • 数据块相关,如创建数据块,打开输入/输出流
  • 校验文件相关,如维护数据块文件与校验文件的相关性,打开输入/输出流

FSDataset将它管理的存储空间分为3个级别:FSDir、FSVolume和FSVolumeSet。

  • FSDir对象表示了”current”目录下面的子目录,其中”current”子目录保存着已经写入HDFS文件系统的数据块
  • FSVolume是数据目录配置项${dfs.data.dir}中的一项,数据节点可以管理一个或者多个数据目录
  • FSVolumeSet管理1个或者多个FSVolume对象

流式接口实现

数据节点通过数据节点存储DataStorage和文件系统数据集FSDataset,将物理存储抽象成基于两者的服务,流式接口构建在此服务上。 数据节点的流式接口包含读数据、写数据、数据块替换、数据块复制和数据校验。

DataXceiverServer & DataXceiver

DataNode.startDataNode()中,数据节点创建ServerSocket对象并绑定到监听地址的监听端口上,并设置Socket接收的缓冲区大小,合理的缓冲区可以提高数据节点的吞吐量。

接下来,将创建流式服务线程组,创建DataXceiverServer服务器,并设置为守护线程。

void startDataNode(Configuration conf,  AbstractList<File> dataDirs, SecureResources resources ) throws IOException {
    ...
    // connect to name node
    this.namenode = (DatanodeProtocol) RPC.waitForProxy(DatanodeProtocol.class,
                       DatanodeProtocol.versionID, nameNodeAddr,  conf);
    // get version and id info from the name-node
    NamespaceInfo nsInfo = handshake();

    // find free port or use privileged port provide
    ServerSocket ss;
    if(secureResources == null) {
      ss = (socketWriteTimeout > 0) ? 
        ServerSocketChannel.open().socket() : new ServerSocket();
      Server.bind(ss, socAddr, 0);
    } else {
      ss = resources.getStreamingSocket();
    }
    ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); 

    this.threadGroup = new ThreadGroup("dataXceiverServer");
    this.dataXceiverServer = new Daemon(threadGroup, 
        new DataXceiverServer(ss, conf, this));
    this.threadGroup.setDaemon(true); // auto destroy when empty
    ...
}

在DataNode.startDataNode()中创建了DataXceiverServer对象,DataXceiverServer通过ServerSocket.accept()方法不断接收来自客户端的连接,并创建相应的DataXceiver,处理客户的请求。每一个请求对应一个DataXceiver线程,即一客户一线程模式。

DataXceiverServer与DataXceiver可可以相当于主从模式,与Memcached的线程模式类似。

class DataXceiverServer implements Runnable, FSConstants {  
  ServerSocket ss;
  DataNode datanode;
  // Record all sockets opend for data transfer
  Map<Socket, Socket> childSockets = Collections.synchronizedMap( new HashMap<Socket, Socket>());
  static final int MAX_XCEIVER_COUNT = 256;
  int maxXceiverCount = MAX_XCEIVER_COUNT;  

  ...
  public void run() {
    while (datanode.shouldRun) {
      try {
        Socket s = ss.accept();
        s.setTcpNoDelay(true);
        new Daemon(datanode.threadGroup,  new DataXceiver(s, datanode, this)).start();
      } catch (AsynchronousCloseException ace) {
          LOG.warn(datanode.dnRegistration + ":DataXceiveServer:" + StringUtils.stringifyException(ace));
          datanode.shouldRun = false;
      } catch (...) {
      	...
      }
    }
    try {
      ss.close();
    } 
    ...
  }
}
  • childSockets包含了所有打开的用于数据传输的Socket,这些socket被DataXceiver所用
  • maxXceiverCount是数据节点流式接口能够支持的最大客户数,默认256

DataXceiverServer只处理客户端的连接请求,实际的请求处理和数据交换都通过DataXceiver处理。

// Thread for processing incoming/outgoing data stream.
class DataXceiver implements Runnable, FSConstants {
  Socket s;
  final String remoteAddress; // address of remote side
  final String localAddress;  // local address of this daemon
  DataNode datanode;
  DataXceiverServer dataXceiverServer;
  private boolean connectToDnViaHostname;
  
  // Read/write data from/to the DataXceiveServer.
  public void run() {
    DataInputStream in=null; 
    try {
      in = new DataInputStream( new BufferedInputStream(NetUtils.getInputStream(s), SMALL_BUFFER_SIZE));
      short version = in.readShort();
      if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
        throw new IOException( "Version Mismatch" );
      }
      boolean local = s.getInetAddress().equals(s.getLocalAddress());
      byte op = in.readByte();
      // Make sure the xciver count is not exceeded
      int curXceiverCount = datanode.getXceiverCount();
      if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
        throw new IOException("xceiverCount " + curXceiverCount
                              + " exceeds the limit of concurrent xcievers "
                              + dataXceiverServer.maxXceiverCount);
      }
      long startTime = DataNode.now();
      switch ( op ) {
      case DataTransferProtocol.OP_READ_BLOCK:
        readBlock( in );
        ...
        break;
      case DataTransferProtocol.OP_WRITE_BLOCK:
        writeBlock( in );
        datanode.myMetrics.addWriteBlockOp(DataNode.now() - startTime);
        if (local)
          datanode.myMetrics.incrWritesFromLocalClient();
        else
          datanode.myMetrics.incrWritesFromRemoteClient();
        break;
      case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
        replaceBlock(in);
        datanode.myMetrics.addReplaceBlockOp(DataNode.now() - startTime);
        break;
      case DataTransferProtocol.OP_COPY_BLOCK:
            // for balancing purpose; send to a proxy source
        copyBlock(in);
        datanode.myMetrics.addCopyBlockOp(DataNode.now() - startTime);
        break;
      case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
        getBlockChecksum(in);
        datanode.myMetrics.addBlockChecksumOp(DataNode.now() - startTime);
        break;
      default:
        throw new IOException("Unknown opcode " + op + " in data stream");
      }
    } catch (Throwable t) {
      LOG.error(datanode.dnRegistration + ":DataXceiver",t);
    } finally {
      LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
                               + datanode.getXceiverCount());
      IOUtils.closeStream(in);
      IOUtils.closeSocket(s);
      dataXceiverServer.childSockets.remove(s);
    }
  }
  ...
}

从DataXceiver.run()可以看到,首先DataXceiver的工作流程为:

  • 创建数据输入流,读取协议版本号,并进行版本检测。这些字段读取的顺序在之前介绍的协议中已经说明。
  • 检查连接请求数是否超过预定的阈值,用来保证服务质量。
  • 根据读取的操作码,进行响应的服务,如读数据、写数据、数据块替换、数据块复制和校验和检查等。

DataXceiverServer和DataXceiver两者协作,采用一客户/一线程模式,处理来自客户端的流式请求服务,有效的提高分布式文件系统的吞吐量。

读数据

读数据操作由准备阶段、数据传输阶段、数据发送阶段和速度控制等组成

在Hadoop 流式接口的读数据报文中介绍过,读数据的操作码为81。当从输入流中读取的操作码为81时,调用DataXceiver.readBlock()处理请求。

操作步骤:

  • 读取协议字段,包括blockId、generationStamp、startOffset、length、clientName和accessToken
  • 参数校验
  • 根据请求信息构建BlockSender数据块发送器发送数据
  • 清理

DataXceiver.readBlock()代码如下

private void readBlock(DataInputStream in) throws IOException {
	long blockId = in.readLong();          
	Block block = new Block( blockId, 0 , in.readLong());

	long startOffset = in.readLong();
	long length = in.readLong();
	String clientName = Text.readString(in);
	Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
	accessToken.readFields(in);
	OutputStream baseStream = NetUtils.getOutputStream(s, datanode.socketWriteTimeout);
	DataOutputStream out = new DataOutputStream(new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));

	...
	// send the block
	BlockSender blockSender = null;
	try {
	  try {
	    blockSender = new BlockSender(block, startOffset, length,
	        true, true, false, datanode, clientTraceFmt);
	  } catch(IOException e) {
	    out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
	    throw e;
	  }

	  out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status
	  long read = blockSender.sendBlock(out, baseStream, null); // send data

	  if (blockSender.isBlockReadFully()) {
	    try {
	      if (in.readShort() == DataTransferProtocol.OP_STATUS_CHECKSUM_OK  && 
	          datanode.blockScanner != null) {
	        datanode.blockScanner.verifiedByClient(block);
	      }
	    } catch (IOException ignored) {}
	  }
	} catch ( ... ) {
	  ...
	} finally {
	  IOUtils.closeStream(out);
	  IOUtils.closeStream(blockSender);
	}
}

DataXceiver.readBlock()中对数据块检验中包含了一块性能优化的代码。 客户端成功读取并通过校验,会发送一个OP_STATUS_CHECKSUM_OK的操作码通知数据节点,如果数据节点发送了一个完整的数据块,则通知数据块扫描器BlockScanner将该数据块标记为校验成功,减少数据块扫描器的工作量,降低资源的消耗。

数据块发送

读请求的数据发送由BlockSender负责,包括准备、发送读请求应答头、发送应答数据包和清理工作等。

BlockSender(Block block, long startOffset, long length,
          boolean corruptChecksumOk, boolean chunkOffsetOK,
          boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
  throws IOException {
try {
  this.block = block;
  ...
  // version check & init checksum
  
  bytesPerChecksum = checksum.getBytesPerChecksum();
  if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
    checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
                               Math.max((int)blockLength, 10*1024*1024));
    bytesPerChecksum = checksum.getBytesPerChecksum();        
  }
  checksumSize = checksum.getChecksumSize();

  if (length < 0) {
    length = blockLength;
  }

  endOffset = blockLength;
  if (startOffset < 0 || startOffset > endOffset
      || (length + startOffset) > endOffset) {
    String msg = " Offset " + startOffset + " and length " + length
    + " don't match " + block + " ( blockLen " + endOffset + " )";
    LOG.warn(datanode.dnRegistration + ":sendBlock() : " + msg);
    throw new IOException(msg);
  }

  
  offset = (startOffset - (startOffset % bytesPerChecksum));
  if (length >= 0) {
    // Make sure endOffset points to end of a checksumed chunk.
    long tmpLen = startOffset + length;
    if (tmpLen % bytesPerChecksum != 0) {
      tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
    }
    if (tmpLen < endOffset) {
      endOffset = tmpLen;
    }
  }

  // seek to the right offsets
  if (offset > 0) {
    long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
    // note blockInStream is  seeked when created below
    if (checksumSkip > 0) {
      // Should we use seek() for checksum file as well?
      IOUtils.skipFully(checksumIn, checksumSkip);
    }
  }
  seqno = 0;

  blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
  if (blockIn instanceof FileInputStream) {
    blockInFd = ((FileInputStream) blockIn).getFD();
  } else {
    blockInFd = null;
  }
  memoizedBlock = new MemoizedBlock(blockIn, blockLength, datanode.data, block);
} catch (IOException ioe) {
  IOUtils.closeStream(this);
  IOUtils.closeStream(blockIn);
  throw ioe;
}
}	

准备工作由BlockSender构造方法完成,主要包含以下几个步骤:

  • 根据参数初始化成员属性
  • 协议版本检查和checksum初始化
  • 计算数据块的偏移位置和长度,包括offset、endOffset、length
  • 打开数据块文件输入流

构造方法决定了需要从数据文件和校验文件中读取哪些数据,需考虑到块的边界问题。因为校验信息是按块组织的,为了让客户端能够进行数据校验,需要返回用户读取数据的所有块。

读请求的应答包括应答头和一个或者多个应答数据包,具体逻辑由BlockSender.sendBlock()和BlockSender.sendChunks()实现。 BlockSender.sendBlock()用来读取一个数据块的数据和校验数据发送到客户端或者数据节点。

long sendBlock(DataOutputStream out, OutputStream baseStream, 
             DataTransferThrottler throttler) throws IOException {
...
try {
  ...
  int maxChunksPerPacket;
  int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
  if (transferToAllowed && !verifyChecksum &&  baseStream instanceof SocketOutputStream && 
      blockIn instanceof FileInputStream) {
    ...
    maxChunksPerPacket = (Math.max(BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO)
                          + bytesPerChecksum - 1)/bytesPerChecksum;
    pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
  } else {
    maxChunksPerPacket = Math.max(1, (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
    pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
  }

  ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);

  while (endOffset > offset) {
    manageOsCache();
    long len = sendChunks(pktBuf, maxChunksPerPacket,  streamForSendChunks);
    offset += len;
    totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum* checksumSize);
    seqno++;
  }
  try {
    out.writeInt(0); // mark the end of block        
    out.flush();
  } catch (IOException e) { //socket error
    throw ioeToSocketException(e);
  }
}
...
finally {
  ...
  close();
}
blockReadFully = (initialOffset == 0 && offset >= blockLength);
return totalRead;
} 

读数据的应答头包括数据校验类型、块大小和可选偏移量。BlockSender既可以用于客户端数据读也可以用于数据块复制,若为数据块复制,则不需要提供偏移量。

BlockSender.sendBlock()的主要逻辑如下:

  • 根据配置计算缓冲区大小,缓冲区可一次发送多个数据块
  • 循环调用BlockSender.sendChunks()发送数据
  • 发送所有数据完毕后,往客户端输出流中写入0,以结束一次读操作。

应答数据包包头包括:packageLen(包长度)、offset、seqno、tail(是否最后一个应答包)和length(数据长度)

应答数据和校验数据通过BlockSend.sendBlock()方法循环调用Blocksend.sendChunks()发送。

private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) 
                     throws IOException {
// Sends multiple chunks in one packet with a single write().

int len = (int) Math.min(endOffset - offset, (((long) bytesPerChecksum) * ((long) maxChunks)));
if (len > bytesPerChecksum && len % bytesPerChecksum != 0) {
  len -= len % bytesPerChecksum;
}
if (len == 0) {
  return 0;
}

int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
int packetLen = len + numChunks*checksumSize + 4;
pkt.clear();

// write packet header
pkt.putInt(packetLen);
pkt.putLong(offset);
pkt.putLong(seqno);
pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
pkt.putInt(len);

int checksumOff = pkt.position();
int checksumLen = numChunks * checksumSize;
byte[] buf = pkt.array();

if (checksumSize > 0 && checksumIn != null) {
  try {
    checksumIn.readFully(buf, checksumOff, checksumLen);
  } 
  ...
}

int dataOff = checksumOff + checksumLen;

if (blockInPosition < 0) {
  //normal transfer
  IOUtils.readFully(blockIn, buf, dataOff, len);

  if (verifyChecksum) {
    int dOff = dataOff;
    int cOff = checksumOff;
    int dLeft = len;

    for (int i=0; i<numChunks; i++) {
      checksum.reset();
      int dLen = Math.min(dLeft, bytesPerChecksum);
      checksum.update(buf, dOff, dLen);
      if (!checksum.compare(buf, cOff)) {
        throw new ChecksumException("Checksum failed at " + 
                                    (offset + len - dLeft), len);
      }
      dLeft -= dLen;
      dOff += dLen;
      cOff += checksumSize;
    }
  }
  
  // only recompute checksum if we can't trust the meta data due to 
  // concurrent writes
  if (memoizedBlock.hasBlockChanged(len)) {
    ChecksumUtil.updateChunkChecksum(
      buf, checksumOff, dataOff, len, checksum
    );
  }
  
  try {
    out.write(buf, 0, dataOff + len);
  } catch (IOException e) {
    throw ioeToSocketException(e);
  }
} else {
  try {
    //use transferTo(). Checks on out and blockIn are already done. 
    SocketOutputStream sockOut = (SocketOutputStream) out;
    FileChannel fileChannel = ((FileInputStream) blockIn).getChannel();

    if (memoizedBlock.hasBlockChanged(len)) {
      fileChannel.position(blockInPosition);
      IOUtils.readFileChannelFully( fileChannel, buf, dataOff, len );
      
      ChecksumUtil.updateChunkChecksum( buf, checksumOff, dataOff, len, checksum );          
      sockOut.write(buf, 0, dataOff + len);
    } else {
      //first write the packet
      sockOut.write(buf, 0, dataOff);
      // no need to flush. since we know out is not a buffered stream.
      sockOut.transferToFully(fileChannel, blockInPosition, len);
    }

    blockInPosition += len;

  } catch (IOException e) {
    throw ioeToSocketException(e);
  }
}

if (throttler != null) { // rebalancing so throttle
  throttler.throttle(packetLen);
}

return len;
}

数据传输共有两种方式,一种是正常的发送,一种是通过NIO的transferTo()方法,”零拷贝”进行数据高效传输,是的数据开的数据不经过数据节点,即数据不需要经过用户空间缓冲区的中转,直接发送,缺点是不能进行数据校验。 普通数据发送的过程中,需要进行校验。

memoizedBlock.hasBlockChanged()判断数据块读写是否有竞争。

BlockTransferThrottler用于控制数据发送的速度。通过简单的计算,得到每秒能够发送的数据,然后通过时间控制行数据的发送。