Netty内存池之PoolThreadCache详解


声明:本文转载自https://my.oschina.net/zhangxufeng/blog/3040834,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

       PoolThreadCahche是Netty内存管理中能够实现高效内存申请和释放的一个重要原因,Netty会为每一个线程都维护一个PoolThreadCache对象,当进行内存申请时,首先会尝试从PoolThreadCache中申请,如果无法从中申请到,则会尝试从Netty的公共内存池中申请。本文首先会对PoolThreadCache的数据结构进行讲解,然后会介绍Netty是如何初始化PoolThreadCache的,最后会介绍如何在PoolThreadCache中申请内存和如何将内存释放到PoolThreadCache中。

1. PoolThreadCache数据结构

       PoolThreadCache的数据结构与PoolArena的主要属性结构非常相似,但细微位置有很大的不同。在PoolThreadCache中,其维护了三个数组(我们以直接内存的缓存方式为例进行讲解),如下所示:

// 存储tiny类型的内存缓存,该数组长度为32,其中只有下标为1~31的元素缓存了有效数据,第0号位空置。
// 这里内存大小的存储方式也与PoolSubpage类似,数组的每一号元素都存储了不同等级的内存块,每个等级的
// 内存块的内存大小差值为16byte,比如第1号位维护了大小为16byte的内存块,第二号为维护了大小为32byte的
// 内存块,依次类推,第31号位维护了大小为496byte的内存块。
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
// 存储small类型的内存缓存,该数组长度为4,数组中每个元素中维护的内存块大小也是成等级递增的,并且这里
// 的递增方式是按照2的指数次幂进行的,比如第0号为维护的是大小为512byte的内存块,第1号位维护的是大小为
// 1024byte的内存块,第2号位维护的是大小为2048byte的内存块,第3号位维护的是大小为4096byte的内存块
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
// 存储normal类型的内存缓存。需要注意的是,这里虽说是维护的normal类型的缓存,但是其只维护2<<13,2<<14
// 和2<<15三个大小的内存块,而该数组的大小也正好为3,因而这三个大小的内存块将被依次放置在该数组中。
// 如果申请的目标内存大于2<<15,那么Netty会将申请动作交由PoolArena进行。
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

       这三个数组分别保存了tiny,small和normal类型的缓存数据,不同于PoolArena的使用PoolSubpage和PoolChunk进行内存的维护,这里都是使用MemoryRegionCache进行的。另外,在MemoryRegionCache中保存了一个有界队列,对于tiny类型的缓存,该队列的长度为512,对于small类型的缓存,该队列的长度为256,对于normal类型的缓存,该队列的长度为64。在进行内存释放的时候,如果队列已经满了,那么就会将该内存块释放回PoolArena中。这里需要说明的是,这里的队列中的元素统一使用的是Entry这种数据结构,该结构的主要属性如下:

static final class Entry<T> {
  // 用于循环利用当前Entry对象的处理器,该处理器的实现原理,我们后续将进行讲解
  final Handle<Entry<?>> recyclerHandle;
  // 记录了当前内存块是从哪一个PoolChunk中申请得来的
  PoolChunk<T> chunk;
  // 如果是直接内存,该属性记录了当前内存块所在的ByteBuffer对象
  ByteBuffer nioBuffer;
  // 由于当前申请的内存块在PoolChunk以及PoolSubpage中的位置是可以通过一个长整型参数来表示的,
  // 这个长整型参数就是这里的handle,因而这里直接将其记录下来,以便后续需要将当前内存块释放到
  // PoolArena中时,能够快速获取其所在的位置
  long handle = -1;
}

       PoolThreadCache中维护每一个内存块最终都是使用的一个Entry对象来进行的,从上面的属性可以看出,记录该内存块最重要的属性是chunk和handle,chunk记录了当前内存块所在的PoolChunk对象,而handle则记录了当前内存块是在PoolChunk和PoolSubpage中的哪个位置(关于PoolChunk,PoolSubpage和PoolArena的实现原理,建议读者阅读一下前面的文章,这样有助于读者快速理解相关原理)。如此,对于Netty使用的PoolThreadCache的存储结构我们就有了一个比较清晰的认识。下面我们通过一幅图来对PoolThreadCache的数据结构进行一个整体的演示:

image.png

       如上图所示展示的就是PoolThreadCache的结构示意图。从图中可以看出在一个PoolThreadCache中,主要有三个MemoryRegionCache数组用于存储tiny,small和normal类型的内存块。每个MemoryRegionCache中有一个队列,队列中的元素类型为Entry。Entry的作用就是存储缓存的内存块的,其存储的方式主要是通过记录当前内存块所在的PoolChunk和标志其在PoolChunk中位置的handle参数。对于不同类型的数组,队列的长度是不一样的,tiny类型的是512,small类型的是256,normal类型的则是64。

2. PoolThreadCache初始化

       对于PoolThreadCache的初始化,这里单独拿出来讲解的原因是,其初始化过程是与PoolThreadLocalCache所绑定的。PoolThreadLocalCache的作用与Java中的ThreadLocal的作用非常类似,其有一个initialValue()方法,用于在无法从PoolThreadLocalCache中获取数据时,通过调用该方法初始化一个。另外其提供了一个get()方法和和remove()方法,分别用于从PoolThreadLocalCache中将当前绑定的数据给清除。这里我们首先看看获取PoolThreadCache的入口代码:

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
  // 从PoolThreadLocalCache中尝试获取一个PoolThreadCache对象,
  // 如果不存在,则自行初始化一个返回
  PoolThreadCache cache = threadCache.get();
  // 由于当前方法是需要返回一个direct buffer,因而这里直接使用cache中的directArena
  PoolArena<ByteBuffer> directArena = cache.directArena;

  final ByteBuf buf;
  if (directArena != null) {
    // 如果directArena不为空,则直接调用其allocate()方法申请内存
    buf = directArena.allocate(cache, initialCapacity, maxCapacity);
  } else {
    // 如果当前缓存中由于某种原因无法获取到directArena,则直接创建一个存有直接内存的ByteBuf,
    // 一般情况下不会走到这一步
    buf = PlatformDependent.hasUnsafe() ?
      UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
  }

  // 为ByteBuf设置内存泄露检测功能
  return toLeakAwareBuffer(buf);
}

       从上面的代码中可以看出,在最开始的时候,就会通过PoolThreadLocalCache尝试获取一个PoolThreadCache对象,如果不存在,其会自行初始化一个。这里我们直接看其是如何初始化的,如下是PoolThreadLocalCache.initialValue()方法的源码:

@Override
protected synchronized PoolThreadCache initialValue() {
  // 这里leastUsedArena()就是获取对应的PoolArena数组中最少被使用的那个Arena,将其返回。
  // 这里的判断方式是通过比较PoolArena.numThreadCaches属性来进行的,该属性记录了当前PoolArena被
  // 多少个线程所占用了。这里采用的思想就是,找到最少被使用的那个PoolArena,将其存入新的线程缓存中
  final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
  final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

  Thread current = Thread.currentThread();
  // 只有在指定了为每个线程使用缓存,或者当前线程是FastThreadLocalThread的子类型时,才会使用线程缓存
  if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
    return new PoolThreadCache(
      heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
      DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
  }
  // 如果指定了不使用缓存,或者线程换粗对象不是FastThreadLocalThread类型的,则创建一个PoolThreadCache
  // 对象,该对象中是不做任何缓存的,因为初始化数据都是0
  return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}

private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
  if (arenas == null || arenas.length == 0) {
    return null;
  }

  // 在PoolArena数组中找到被最少线程占用的对象,将其返回。这样做的目的是,由于内存池是多个线程都可以
  // 访问的公共区域,因而当这里就需要对内存池进行划分,以减少线程之间的竞争。
  PoolArena<T> minArena = arenas[0];
  for (int i = 1; i < arenas.length; i++) {
    PoolArena<T> arena = arenas[i];
    if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
      minArena = arena;
    }
  }

  return minArena;
}

       从上述代码可以看出,对于PoolThreadCache的初始化,其首先会查找PoolArena数组中被最少线程占用的那个arena,然后将其封装到一个新建的PoolThreadCache中。

3. 内存申请

       需要注意的是,PoolThreadCache申请内存并不是说其会创建一块内存,或者说其会到PoolArena中申请内存,而是指,其本身已经缓存有内存块,而当前申请的内存块大小正好与其一致,就会将该内存块返回;PoolThreadCache中的内存块都是在当前线程使用完创建的ByteBuf对象后,通过调用其release()方法释放内存时直接缓存到当前PoolThreadCache中的,其并不会直接将内存块返回给PoolArena。这里我们直接看一下其allocate()方法是如何实现的:

// 申请tiny类型的内存块
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
  return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

// 申请small类型的内存块
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, 
     int reqCapacity, int normCapacity) {
  return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
}

// 申请normal类型的内存块
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, 
     int reqCapacity, int normCapacity) {
  return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
}

// 从MemoryRegionCache中申请内存
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
  if (cache == null) {
    return false;
  }

  // 从MemoryRegionCache中申请内存,本质上就是从其队列中申请,如果存在,则初始化申请到的内存块
  boolean allocated = cache.allocate(buf, reqCapacity);
  // 这里是如果当前PoolThreadCache中申请内存的次数达到了8192次,则对内存块进行一次trim()操作,
  // 对使用较少的内存块,将其返还给PoolArena,以供给其他线程使用
  if (++allocations >= freeSweepAllocationThreshold) {
    allocations = 0;
    trim();
  }
  return allocated;
}

       这里对于内存块的申请,我们可以看到,PoolThreadCache是将其分为tiny,small和normal三种不同的方法来调用的,而具体大小的区分其实是在PoolArena中进行区分的(读者可以阅读本人前面的关于PoolArena介绍的文章)。在对应的内存数组中找到MemoryRegionCache对象之后,通过调用allocate()方法来申请内存,申请完之后还会检查当前缓存申请次数是否达到了8192次,达到了则对缓存中使用的内存块进行检测,将较少使用的内存块返还给PoolArena。这里我们首先看一下获取MemoryRegionCache的代码是如何实现的,也即cacheForTiny(),cacheForSmall()和cacheForNormal()的代码:

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
  // 计算当前数组下标索引,由于tiny类型的内存块每一层级相差16byte,因而这里的计算方式就是
  // 将目标内存大小除以16
  int idx = PoolArena.tinyIdx(normCapacity);
  // 返回tiny类型的数组中对应位置的MemoryRegionCache
  if (area.isDirect()) {
    return cache(tinySubPageDirectCaches, idx);
  }
  return cache(tinySubPageHeapCaches, idx);
}

private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
  // 计算当前数组下标的索引,由于small类型的内存块大小都是2的指数次幂,因而这里就是将目标内存大小
  // 除以1024之后计算其偏移量
  int idx = PoolArena.smallIdx(normCapacity);
  // 返回small类型的数组中对应位置的MemoryRegionCache
  if (area.isDirect()) {
    return cache(smallSubPageDirectCaches, idx);
  }
  return cache(smallSubPageHeapCaches, idx);
}

private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
  // 对于normal类型的缓存,这里也是首先将其向右位移13位,也就是8192,然后取2的对数,这样就
  // 可以得到其在数组中的位置,然后返回normal类型的数组中对应位置的MemoryRegionCache
  if (area.isDirect()) {
    int idx = log2(normCapacity >> numShiftsNormalDirect);
    return cache(normalDirectCaches, idx);
  }
  int idx = log2(normCapacity >> numShiftsNormalHeap);
  return cache(normalHeapCaches, idx);
}

       这里对于数组位置的计算,主要是根据各个数组数据存储方式的不同而进行的,而它们最终都是通过一个MemoryRegionCache存储的,因而只需要返回该缓存对象即可。下面我们继续看一下MemoryRegionCache.allocate()方法是如何申请内存的:

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
  // 尝试从队列中获取,如果队列中不存在,说明没有对应的内存块,则返回false,表示申请失败
  Entry<T> entry = queue.poll();
  if (entry == null) {
    return false;
  }
  
  // 走到这里说明队列中存在对应的内存块,那么通过其存储的Entry对象来初始化ByteBuf对象,
  // 如此即表示申请内存成功
  initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
  // 对entry对象进行循环利用
  entry.recycle();

  // 更新当前已经申请的内存数量
  ++allocations;
  return true;
}

       可以看到,MemoryRegionCache申请内存的方式主要是从队列中取,如果取到了,则使用该内存块初始化一个ByteBuf对象。

       前面我们讲到,PoolThreadCache会对其内存块使用次数进行计数,这么做的目的在于,如果一个ThreadPoolCache所缓存的内存块使用较少,那么就可以将其释放到PoolArena中,以便于其他线程可以申请使用。PoolThreadCache会在其内存总的申请次数达到8192时遍历其所有的MemoryRegionCache,然后调用其trim()方法进行内存释放,如下是该方法的源码:

public final void trim() {
  // size表示当前MemoryRegionCache中队列的最大可存储容量,allocations表示当前MemoryRegionCache
  // 的内存申请次数,size-allocations的含义就是判断当前申请的次数是否连队列的容量都没达到
  int free = size - allocations;
  allocations = 0;

  // 如果申请的次数连队列的容量都没达到,则释放该内存块
  if (free > 0) {
    free(free);
  }
}

private int free(int max) {
  int numFreed = 0;
  // 依次从队列中取出Entry数据,调用freeEntry()方法释放该Entry
  for (; numFreed < max; numFreed++) {
    Entry<T> entry = queue.poll();
    if (entry != null) {
      freeEntry(entry);
    } else {
      return numFreed;
    }
  }
  return numFreed;
}

private void freeEntry(Entry entry) {
  // 通过当前Entry中保存的PoolChunk和handle等数据释放当前内存块
  PoolChunk chunk = entry.chunk;
  long handle = entry.handle;
  ByteBuffer nioBuffer = entry.nioBuffer;
  entry.recycle();
  chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer);
}

4. 内存释放

       对于内存的释放,其原理比较简单,一般的释放内存的入口在ByteBuf对象中。当调用ByteBuf.release()方法的时候,其首先会将释放动作委托给PoolChunk的free()方法,PoolChunk则会判断当前是否是池化的ByteBuf,如果是池化的ByteBuf,则调用PoolThreadCache.add()方法将其添加到PoolThreadCache中,也就是说在释放内存时,其实际上是释放到当前线程的PoolThreadCache中的。如下是add()方法的源码:

boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
    long handle, int normCapacity, SizeClass sizeClass) {
  // 通过当前释放的内存块的大小计算其应该放到哪个等级的MemoryRegionCache中
  MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
  if (cache == null) {
    return false;
  }
  
  // 将内存块释放到目标MemoryRegionCache中
  return cache.add(chunk, nioBuffer, handle);
}

public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) {
  // 这里会尝试从缓存中获取一个Entry对象,如果没获取到则创建一个
  Entry<T> entry = newEntry(chunk, nioBuffer, handle);
  // 将实例化的Entry对象放到队列里
  boolean queued = queue.offer(entry);
  if (!queued) {
    entry.recycle();
  }

  return queued;
}

5. 小结

       本文首先详细讲解了PoolThreadCache的数据结构,并且说明了其中需要注意的点,然后介绍了PoolThreadCache的实例化方式,接着从申请和释放内存两个角度介绍了PoolThreadCache源码的实现方式。

本文发表于2019年04月23日 11:00
(c)注:本文转载自https://my.oschina.net/zhangxufeng/blog/3040834,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1736 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1