推广 热搜: page  数据  小红  红书  考试  论文  数据分析  关键词  哪些  搜索 

环形缓冲区-Hadoop Shuffle过程中的利器

   日期:2024-12-23     移动:https://sicmodule.kub2b.com/mobile/quote/10685.html

Hadoop在shuffle过程中使用了一个数据结构-。

环形队列是在实际编程极为有用的数据结构,它是一个首尾相连的FIFO的数据结构,采用数组的线性空间,数据组织简单。能很快知道队列是否满为空。能以很快速度的来存取数据。 因为有简单高效的原因,甚至在硬件都实现了环形队列。

环形队列广泛用于网络数据收发,和不同程序间数据交换(比如内核与应用程序大量交换数据,从硬件接收大量数据)均使用了环形队列。

环形缓冲区数据结构
Map过程中环形缓冲区是指数据被map处理之后会先放入内存,内存中的这片区域就是环形缓冲区。

环形缓冲区是在MapTask.MapOutputBuffer中定义的,相关的属性如下

 

环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,key/value的元数据存储的格式是int类型,每个key/value对应一个元数据,元数据由4个int组成,第一个int存放value的起始位置,第二个存放key的起始位置,第三个存放partition,最后一个存放value的长度。

key/value序列化的数据和元数据在环形缓冲区中的存储是由equator分隔的,key/value按照索引递增的方向存储,meta则按照索引递减的方向存储,将其数组抽象为一个环形结构之后,以equator为界,key/value顺时针存储,meta逆时针存储。

初始化
环形缓冲区的结构在中创建。

 

init是对环形缓冲区进行初始化构造,由mapreduce.task.io.sort.mb决定map中环形缓冲区的大小sortmb,默认是100M。

此缓冲区也用于存放meta,一个meta占用metaSIZE(16byte),则其中用于存放数据的大小是maxMemUsage -= sortmb << 20 % metaSIZE(由此可知最好设置sortmb转换为byte之后是16的整数倍),然后用maxMemUsage初始化kvbuffer字节数组和kvmeta整形数组,最后设置数组的一些标识信息。利用设置kvbuffer和kvmeta的分界线,初始化的时候以0为分界线,kvindex为aligned - metaSIZE + kvbuffer.length,其位置在环形数组中相当于按照逆时针方向减去metaSIZE,由kvindex设置kvstart = kvend = kvindex,由equator设置bufstart = bufend = bufindex = equator,还得设置bufvoid = kvbuffer.length,bufvoid用于标识用于存放数据的最大位置。

为了提高效率,当buffer占用达到阈值之后,会进行spill,这个阈值是由bufferRemaining进行检查的,bufferRemaining由进行初始化赋值,这里需要注意的是softLimit并不是sortmbspillper,而是kvbuffer.length * spillper,当sortmb << 20是16的整数倍时,才可以认为softLimit是sortmbspillper。

 

buffer初始化之后的抽象数据结构如下图所示

写入buffer
Map通过方法调用中写入数据,数据写入之前已在中对要写入的数据进行逐条分区,下面看下collect

 

每次写入数据时,执行之后,检查bufferRemaining

如果大于0,直接将key/value序列化对和对应的meta写入buffer中,key/value是序列化之后写入的,key/value经过一些列的方法调用将数据写入kvbuffer中,write方法如下

 

write方法将key/value写入kvbuffer中,如果bufindex+len超过了bufvoid,则将写入的内容分开存储,将一部分写入之间,然后重置bufindex,将剩余的部分写入,这里不区分key和value,写入key之后会在collect中判断bufindex < keystart,当bufindex小时,则key被分开存储,执行,value则直接写入,不用判断是否被分开存储,key不能分开存储是因为要对key进行排序。

这里需要注意的是要写入的数据太长,并且kvinde==kvend,则抛出MapBufferTooSmallException异常,在collect中捕获,将此数据直接spill到磁盘,也就是当单条记录过长时,不写buffer,直接写入磁盘。

下面看下bb.shiftBufferedKey()代码

 

shiftBufferedKey时,判断首部是否有足够的空间存放key,有没有足够的空间,则先将首部的部分key写入keytmp中,然后分两次写入,再次调用Buffer.write,如果有足够的空间,分两次copy,先将首部的部分key复制到headbytelen的位置,然后将末尾的部分key复制到首部,移动bufindex,重置bufferRemaining的值。

key/value写入之后,继续写入元数据信息并重置kvindex的值。
spill
一次写入buffer结束,当写入数据比较多,bufferRemaining小于等于0时,准备进行spill,首次spill,spillInProgress为false,此时查看bUsed = distanceTo(kvbidx, bufindex),此时bUsed >= softLimit 并且,则进行spill,调用startSpill

 

startSpill唤醒spill线程之后,进程spill操作,但此时map向buffer的写入操作并没有阻塞,需要重新边界equator和bufferRemaining的值,先来看下equator和bufferRemaining值的设定

 

因为equator是kvbuffer和kvmeta的分界线,为了更多的空间存储kv,则最多拿出distkvi的一半来存储meta,并且利用avgRec估算distkvi能存放多少个record和meta对,根据record和meta对的个数估算meta所占空间的大小,从distkvi/2和meta所占空间的大小中取最小值,又因为distkvi中最少得存放一个meta,所占空间为metaSIZE,在选取kvindex时需要求aligned,aligned最多为metaSIZE-1,总和上述因素,最终选取 。equator选取之后,设置bufmark = bufindex = newPos和kvindex,但此时并不设置bufstart、bufend和kvstart、kvend,因为这几个值要用来表示spill数据的边界。

spill之后,可用的空间减少了,则控制spill的bufferRemaining也应该重新设置,bufferRemaining取三个值的最小值减去2metaSIZE,三个值分别是meta可用占用的空间,kv可用空间和softLimit。**这里为什么要减去2metaSIZE,一个是spill之前kvend到kvindex的距离,另一个是当时的kvindex空间?**?此时,已有一个record要写入buffer,需要从bufferRemaining中减去当前record的元数据占用的空间,即减去metaSIZE,另一个metaSIZE是在计算equator时,没有包括kvindex到kvend(spill之前)的这段metaSIZE,所以要减去这个metaSIZE。

接下来解析下SpillThread线程,查看其run方法

 

run中主要是

 

sortAndSpill中,有mstart和mend得到一共有多少条record需要spill到磁盘,调用sorter.sort对meta进行排序,先对partition进行排序,然后按key排序,排序的结果只调整meta的顺序。

排序之后,判断是否有combiner,没有则直接将record写入磁盘,写入时是一个partition一个IndexRecord,如果有combiner,则将该partition的record写入kvIter,然后调用combinerRunner.combine执行combiner。

写入磁盘之后,将spillx.out对应的spillRec放入内存indexCacheList.add(spillRec),如果所占内存totalIndexCacheMemory超过了indexCacheMemoryLimit,则创建index文件,将此次及以后的spillRec写入index文件存入磁盘。

最后spill次数递增。sortAndSpill结束之后,回到run方法中,执行finally中的代码,对kvstart和bufstart赋值,设置spillInProgress的状态为false。

在spill的同时,map往buffer的写操作并没有停止,依然在调用collect,再次回到collect方法中

 

有新的record需要写入buffer时,判断bufferRemaining -= metaSIZE,此时的bufferRemaining是在开始spill时被重置过的(此时的bufferRemaining应该比初始的softLimit要小),当bufferRemaining小于等最后一个metaSIZE是当前record进入collect之后bufferRemaining减去的那个metaSIZE。

于0时,进入if,此时spillInProgress的状态为false,进入if (!spillInProgress),startSpill时对kvend和bufend进行了重置,则此时,调用resetSpill(),将kvstart、kvend和bufstart、bufend设置为上次startSpill时的位置。此时buffer已将一部分内容写入磁盘,有大量空余的空间,则对bufferRemaining进行重置,此次不spill。

bufferRemaining取值为

本文地址:https://sicmodule.kub2b.com/quote/10685.html     企库往 https://sicmodule.kub2b.com/ , 查看更多

特别提示:本信息由相关用户自行提供,真实性未证实,仅供参考。请谨慎采用,风险自负。


0相关评论
相关最新动态
推荐最新动态
点击排行
网站首页  |  关于我们  |  联系方式  |  使用协议  |  版权隐私  |  网站地图  |  排名推广  |  广告服务  |  积分换礼  |  网站留言  |  RSS订阅  |  违规举报  |  鄂ICP备2020018471号