/** * a class to throttle the data transfers. * This class is thread safe. It can be shared by multiple threads. * The parameter bandwidthPerSec specifies the total bandwidth shared by * threads. */ public class DataTransferThrottler {通过传入指代的bandwidthPerSec带宽速率来作为一个最大的限制值,在限制类的作用下,带宽的平均速度将会控制在这个速率之下.在这个类中,定义了以下几个变量:
private final long period; // period over which bw is imposed private final long periodExtension; // Max period over which bw accumulates. private long bytesPerPeriod; // total number of bytes can be sent in each period private long curPeriodStart; // current period starting time private long curReserve; // remaining bytes can be sent in the period private long bytesAlreadyUsed;在DataTransferThrottler类中的主要限流思想是通过单位时间段内限制指定字节数的方式来控制平均传输速度,假设发现IO传输速度过快,超过规定时间内的带宽限定字节数,则会进行等待操作,等待下一个同意带宽传输周期的到来,这个用结构图表演示样例如以下:
/** * Constructor * @param period in milliseconds. Bandwidth is enforced over this * period. * @param bandwidthPerSec bandwidth allowed in bytes per second. */ public DataTransferThrottler(long period, long bandwidthPerSec) { this.curPeriodStart = monotonicNow(); this.period = period; //将带宽依照周期做比例转化 this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000; this.periodExtension = period*3; }由于传入的带宽是以秒为单位,所以周期单位是ms,所以要除以1000.curReserve这个变量的意思可理解为可使用字节传输量.初始传输值就是一个周期的可传输字节数.DataTransferThrottler的throttle就是带宽限制的主要方法.
public synchronized void throttle(long numOfBytes, Canceler canceler) { if ( numOfBytes <= 0 ) { return; } //当前的可传输的字节数减去当前发送/接收字节数 curReserve -= numOfBytes; //当前字节使用量 bytesAlreadyUsed += numOfBytes; //假设curReserve<=0,说明当前周期内可使用字节数已经用完 while (curReserve <= 0) { //假设设置了canceler对象,则不会进行限流操作 if (canceler != null && canceler.isCancelled()) { return; } long now = monotonicNow(); long curPeriodEnd = curPeriodStart + period; // 假设当前时间还在本周期时间内的话,则必须等待此周期的结束, // 又一次获取新的可传输字节量 if ( now < curPeriodEnd ) { // Wait for next period so that curReserve can be increased. try { wait( curPeriodEnd - now ); } catch (InterruptedException e) { // Abort throttle and reset interrupted status to make sure other // interrupt handling higher in the call stack executes. Thread.currentThread().interrupt(); break; } } else if ( now < (curPeriodStart + periodExtension)) { // 假设当前时间已经超过此周期的时间且不大于最大周期间隔,则添加可接受字节数, // 并更新周期起始时间为前一周期的末尾时间 curPeriodStart = curPeriodEnd; curReserve += bytesPerPeriod; } else { // 假设当前时间超过curPeriodStart + periodExtension,则表示 // 已经长时间没有使用Throttler,又一次重置时间 // discard the prev period. Throttler might not have // been used for a long time. curPeriodStart = now; curReserve = bytesPerPeriod - bytesAlreadyUsed; } } //传输结束,当前字节使用量进行移除 bytesAlreadyUsed -= numOfBytes; }所以,这里能够得到一个启示,影响带宽平均传输速率的指标,不仅仅仅仅有传入的带宽速度上限值參数,相同period周期的设置也非常重要,相同的带宽周期设小了,发生wait等待的次数会相对变多,最后的带宽平均速度就会更低.这个问题在下文中还会继续提到.
//set up parameter for cluster balancing this.balanceThrottler = new BlockBalanceThrottler( conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT), conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT));以下这个balancer带宽大小配置属性就是设置给Throttler对象的.
public static final String DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec"; public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;默认带宽大小1M.这个Throttler对象在DataXceiver.replaceBlock和DataXceiver.copyBlock中被调用.
@Override public void copyBlock(final ExtendedBlock block, final Token<BlockTokenIdentifier> blockToken) throws IOException { ... long beginRead = Time.monotonicNow(); // send block content to the target long read = blockSender.sendBlock(reply, baseStream, dataXceiverServer.balanceThrottler); long duration = Time.monotonicNow() - beginRead; datanode.metrics.incrBytesRead((int) read); datanode.metrics.incrBlocksRead(); datanode.metrics.incrTotalReadTime(duration); ...
@Override public void replaceBlock(final ExtendedBlock block, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String delHint, final DatanodeInfo proxySource) throws IOException { ... // receive a block blockReceiver.receiveBlock(null, null, replyOut, null, dataXceiverServer.balanceThrottler, null, true); // notify name node datanode.notifyNamenodeReceivedBlock( block, delHint, blockReceiver.getStorageUuid()); LOG.info("Moved " + block + " from " + peer.getRemoteAddressString() + ", delHint=" + delHint); } ...最后会调用到BlockSender.sendPacket和BlockReceive.receivePacket方法.分别在相应类的以下2个方法中调用到了throttle(bytes)的方法
private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out, boolean transferTo, DataTransferThrottler throttler) throws IOException { int dataLen = (int) Math.min(endOffset - offset, (chunkSize * (long) maxChunks)); int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet int checksumDataLen = numChunks * checksumSize; int packetLen = dataLen + checksumDataLen + 4; boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0; ... if (throttler != null) { // rebalancing so throttle throttler.throttle(packetLen); } return dataLen; }
/** * Receives and processes a packet. It can contain many chunks. * returns the number of data bytes that the packet has. */ private int receivePacket() throws IOException { // read the next packet packetReceiver.receiveNextPacket(in); ... if (throttler != null) { // throttle I/O throttler.throttle(len); } return lastPacketInBlock?所以我们能够从側面了解到DataXceiver的copyBlock和replaceBlock方法都是用于处理balancer相关程序时使用的.-1:len; }
@Override protected void doPut(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { try { ServletContext context = getServletContext(); final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context); final Configuration conf = (Configuration) getServletContext() .getAttribute(JspHelper.CURRENT_CONF); final PutImageParams parsedParams = new PutImageParams(request, response, conf); final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); validateRequest(context, conf, request, response, nnImage, parsedParams.getStorageInfoString()); UserGroupInformation.getCurrentUser().doAs( new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { ... InputStream stream = request.getInputStream(); try { long start = monotonicNow(); MD5Hash downloadImageDigest = TransferFsImage .handleUploadImageRequest(request, txid, nnImage.getStorage(), stream, parsedParams.getFileSize(), getThrottler(conf)); ...当中getThrottler方法会从配置文件的相关属性中得到此实例对象
/** * Construct a throttler from conf * @param conf configuration * @return a data transfer throttler */ public final static DataTransferThrottler getThrottler(Configuration conf) { long transferBandwidth = conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT); DataTransferThrottler throttler = null; if (transferBandwidth > 0) { throttler = new DataTransferThrottler(transferBandwidth); } return throttler; }默认是返回throttler对象为null的,由于限制带宽默觉得0
public static final String DFS_IMAGE_TRANSFER_RATE_KEY = "dfs.image.transfer.bandwidthPerSec"; public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0; //no throttling终于在receiveFile方法中调用了throttle方法
private static MD5Hash receiveFile(String url, List<File> localPaths, Storage dstStorage, boolean getChecksum, long advertisedSize, MD5Hash advertisedDigest, String fsImageName, InputStream stream, DataTransferThrottler throttler) throws IOException { ... int num = 1; byte[] buf = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE]; while (num > 0) { num = stream.read(buf); if (num > 0) { received += num; for (FileOutputStream fos : outputStreams) { fos.write(buf, 0, num); } if (throttler != null) { throttler.throttle(num); } } }默认此throttler是不开启的.
/** * Scan a block. * * @param cblock The block to scan. * @param bytesPerSec The bytes per second to scan at. * * @return The length of the block that was scanned, or * -1 if the block could not be scanned. */ private long scanBlock(ExtendedBlock cblock, long bytesPerSec) { ... BlockSender blockSender = null; try { blockSender = new BlockSender(block, 0, -1, false, true, true, datanode, null, CachingStrategy.newDropBehind()); throttler.setBandwidth(bytesPerSec); long bytesRead = blockSender.sendBlock(nullStream, null, throttler); resultHandler.handle(block, null); return bytesRead; //...bytesPerSec在以下这种方法被设置
@SuppressWarnings("unchecked") Conf(Configuration conf) { this.targetBytesPerSec = Math.max(0L, conf.getLong( DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT)); ...
public static final String DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second"; public static final long DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L;默认1M.
/** Constructor * @param bandwidthPerSec bandwidth allowed in bytes per second. */ public DataTransferThrottler(long bandwidthPerSec) { this(500, bandwidthPerSec); // by default throttling period is 500ms }优化建议是make property configurable,这点我在学习过程中已经完毕了,已建立Issue,提交开源社区,编号HDFS-9756.
2.上述带宽限制的场景都一个共同点,都还仅仅是在非Job层面做的,并没有在正常的read,write block操作做限制,这种话,Job的传输数据将会使用光已有带宽,个人感觉能够把这方面的限制也加上,做出可配,默认不开启正常的读写带宽限制,原理与balancer的coplyBlcok和replaceBlock操作相似.这种话,readBlock和writeBlock会变得更灵活,眼下readBlock传入的throttler为null.
read = blockSender.sendBlock(out, baseStream, null); // send data这样做的优点能够依据机器带宽资源不同,从而进行总带宽速率的限制.有兴趣的同学能够自己试一试.
Throttler限流方案是hadoop中限制资源使用的一种手段.事实上在Hadoop中,还有相似其它的相似限制资源滥用的方法,比方Quota配额机制.HDFS中的配额机制指的是对每一个文件夹下,我能够设置该文件夹下的space count存储空间使用,和namespace count,命名空间使用计数,能够理解为子文件数,通过Quota就能够限制文件夹下创建过多的文件或写入过量饿数据.否则,就会抛出异常.相关代码的定义例如以下:
/** * Counters for namespace, storage space and storage type space quota and usage. */ public class QuotaCounts { // Name space and storage space counts (HDFS-7775 refactors the original disk // space count to storage space counts) private EnumCounters<Quota> nsSsCounts; // Storage type space counts private EnumCounters<StorageType> tsCounts;这里仅仅做概况的叙述,假设同学们想深入了解细节,可自行阅读相关源代码.
Issue 链接: https://issues.apache.org/jira/browse/HDFS-9756
Github patch链接:https://github.com/linyiqun/open-source-patch/tree/master/hdfs/HDFS-9756
