Netty学习之旅—-ByteBuf源码解读之初探UnpooledHeapByteBuf、UnpooledDirectByteBuf

首先,我们再看一下 ByteBuf 的类设计图,从中更进一步了解ByteBuf。

Netty学习之旅—-ByteBuf源码解读之初探UnpooledHeapByteBuf、UnpooledDirectByteBuf

ByteBuf 继承自 ReferenceCounted,引用计数,也就是说 ByteBuf 的内存回收使用的是引用计数器来实现。

UnpooledHeapByteBuf 是非池化的堆内存实现,而 UnpooledDirectByteBuf 是非池化的堆外内存(直接内存)。非池化的ByteBuf 就是利用完之后就需要销毁,无法重用。

1、UnpooledHeapByteBuf 详解

其继承链:UnpooledHeapByteBuf --> AbstractReferenceCountedByteBuf --> AbstractByteBuf。

1.1 AbstractByteBuf 源码分析

AbstractByteBuf 定义 ByteBuf 的基本属性,诸如 readerIndex, writerIndex, markedReaderIndex, markedWriterIndex, maxCapacity, 我们知道 ByteBuf 的容量是可以自动扩容的。

AbstractByteBuf 的这两个属性,应该引起我们的注意:

  1. SwappedByteBuf swappedBuf
    这个是大端序列与小端序列的转换。
  2. ResourceLeakDetector leakDetector = new ResourceLeakDetector(ByteBuf.class);
    Netty 用来解决内存泄漏检测机制,下一篇会详细介绍。

这里截取一下 SwappedByteBuf 的源码,采用了典型的装饰模式来设计。

  1. public class SwappedByteBuf extends ByteBuf {
  2. private final ByteBuf buf;
  3. private final ByteOrder order;
  4. public SwappedByteBuf(ByteBuf buf) {
  5. if (buf == null) {
  6. throw new NullPointerException("buf");
  7. }
  8. this.buf = buf;
  9. if (buf.order() == ByteOrder.BIG_ENDIAN) {
  10. order = ByteOrder.LITTLE_ENDIAN;
  11. } else {
  12. order = ByteOrder.BIG_ENDIAN;
  13. }
  14. }
  15. @Override
  16. public ByteOrder order() {
  17. return order;
  18. }
  19. @Override
  20. public ByteBuf order(ByteOrder endianness) {
  21. if (endianness == null) {
  22. throw new NullPointerException("endianness");
  23. }
  24. if (endianness == order) {
  25. return this;
  26. }
  27. return buf;
  28. }
  29. }

关于其他 AbstractByteBuf, 该类设计使用了典型的模板模式,对 ByteBuf 提供的类,实现时提供一种模板,然后再提供一个钩子方法,供子类实现,比如_getLong方法,_setLong等方法,由于该类的实现原理不复杂,就不做进一步的源码解读。

1.2 AbstractReferenceCountedByteBuf

该类主要是实现引用计算的常规方法,充分利用 voliate 内存可见性与 CAS 操作完成 refCnt 变量的维护。

其源码实现如下:

  1. package io.netty.buffer;
  2. import io.netty.util.IllegalReferenceCountException;
  3. import io.netty.util.internal.PlatformDependent;
  4. import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
  5. /**
  6. * Abstract base class for {@link ByteBuf} implementations that count references.
  7. */
  8. public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
  9. private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;
  10. static {
  11. AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =
  12. PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
  13. if (updater == null) {
  14. updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
  15. }
  16. refCntUpdater = updater;
  17. }
  18. private volatile int refCnt = 1;
  19. protected AbstractReferenceCountedByteBuf(int maxCapacity) {
  20. super(maxCapacity);
  21. }
  22. @Override
  23. public final int refCnt() {
  24. return refCnt;
  25. }
  26. /**
  27. * An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly
  28. */
  29. protected final void setRefCnt(int refCnt) {
  30. this.refCnt = refCnt;
  31. }
  32. @Override
  33. public ByteBuf retain() {
  34. for (;;) {
  35. int refCnt = this.refCnt;
  36. if (refCnt == 0) {
  37. throw new IllegalReferenceCountException(0, 1);
  38. }
  39. if (refCnt == Integer.MAX_VALUE) {
  40. throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
  41. }
  42. if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
  43. break;
  44. }
  45. }
  46. return this;
  47. }
  48. @Override
  49. public ByteBuf retain(int increment) {
  50. if (increment <= 0) {
  51. throw new IllegalArgumentException("increment: " + increment + " (expected: > 0)");
  52. }
  53. for (;;) {
  54. int refCnt = this.refCnt;
  55. if (refCnt == 0) {
  56. throw new IllegalReferenceCountException(0, increment);
  57. }
  58. if (refCnt > Integer.MAX_VALUE - increment) {
  59. throw new IllegalReferenceCountException(refCnt, increment);
  60. }
  61. if (refCntUpdater.compareAndSet(this, refCnt, refCnt + increment)) {
  62. break;
  63. }
  64. }
  65. return this;
  66. }
  67. @Override
  68. public ByteBuf touch() {
  69. return this;
  70. }
  71. @Override
  72. public ByteBuf touch(Object hint) {
  73. return this;
  74. }
  75. @Override
  76. public final boolean release() {
  77. for (;;) {
  78. int refCnt = this.refCnt;
  79. if (refCnt == 0) {
  80. throw new IllegalReferenceCountException(0, -1);
  81. }
  82. if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
  83. if (refCnt == 1) {
  84. deallocate();
  85. return true;
  86. }
  87. return false;
  88. }
  89. }
  90. }
  91. @Override
  92. public final boolean release(int decrement) {
  93. if (decrement <= 0) {
  94. throw new IllegalArgumentException("decrement: " + decrement + " (expected: > 0)");
  95. }
  96. for (;;) {
  97. int refCnt = this.refCnt;
  98. if (refCnt < decrement) {
  99. throw new IllegalReferenceCountException(refCnt, -decrement);
  100. }
  101. if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
  102. if (refCnt == decrement) {
  103. deallocate();
  104. return true;
  105. }
  106. return false;
  107. }
  108. }
  109. }
  110. /**
  111. * Called once {@link #refCnt()} is equals 0.
  112. */
  113. protected abstract void deallocate();
  114. }

该类,我们只需要了解,当一个ByteBuf 被引用的次数为 0 时,dealocate() 方法将被调用,该方法就是具体回收 ByteBuf 的操作,由具体的子类去实现。

1.3 UnpooledHeapByteBuf 与 UnpooledDirectByteBuf

首先该类的内部结构如下:

Netty学习之旅—-ByteBuf源码解读之初探UnpooledHeapByteBuf、UnpooledDirectByteBuf

Netty学习之旅—-ByteBuf源码解读之初探UnpooledHeapByteBuf、UnpooledDirectByteBuf

对于非池化的 UnpooledByteBuf,内部就是使用 array 来存储数据,相对简单,所以源码分析,我还是侧重于UnpooledDirectByteBuf。重点关注如下两个方面:

  1. 容量的扩容
  2. 内存的分配

1.3.1 capacity(int newCapacity)

  1. public ByteBuf capacity(int newCapacity) {
  2. ensureAccessible(); // @1
  3. if (newCapacity < 0 || newCapacity > maxCapacity()) {
  4. throw new IllegalArgumentException("newCapacity: " + newCapacity);
  5. }
  6. int readerIndex = readerIndex();
  7. int writerIndex = writerIndex();
  8. int oldCapacity = capacity;
  9. if (newCapacity > oldCapacity) { // @2
  10. ByteBuffer oldBuffer = buffer;
  11. ByteBuffer newBuffer = allocateDirect(newCapacity); //@21
  12. oldBuffer.position(0).limit(oldBuffer.capacity()); //@22
  13. newBuffer.position(0).limit(oldBuffer.capacity()); //@23
  14. newBuffer.put(oldBuffer); //@24
  15. newBuffer.clear(); //@25
  16. setByteBuffer(newBuffer); //@26
  17. } else if (newCapacity < oldCapacity) { //@3
  18. ByteBuffer oldBuffer = buffer;
  19. ByteBuffer newBuffer = allocateDirect(newCapacity);
  20. if (readerIndex < newCapacity) {
  21. if (writerIndex > newCapacity) {
  22. writerIndex(writerIndex = newCapacity);
  23. }
  24. oldBuffer.position(readerIndex).limit(writerIndex);
  25. newBuffer.position(readerIndex).limit(writerIndex);
  26. newBuffer.put(oldBuffer);
  27. newBuffer.clear();
  28. } else {
  29. setIndex(newCapacity, newCapacity);
  30. }
  31. setByteBuffer(newBuffer);
  32. }
  33. return this;
  34. }

代码@1,检测一下访问性,可达性,就是引用数必须大于0,否则该 ByteBuf 的内部空间已经被回收了(堆外内存)。

代码@2,扩容操作,思路新建一个缓存区,然后将原先缓存区的数据全部写入到新的缓存区,然后释放旧的缓存区。

代码@21、22,申请一个直接缓存区,然后将原缓冲区的 postion 设置为0,将 limit 设置为 capacity, 处于释放状态(从缓存区读)。

代码@23,将新缓存区的 postion,limit 属性设置为0,老缓存区 limit。

代码@24,将原缓冲区写入到新的缓存区,然后将缓存区置的 position 设置为0,limt 设置为 capacity,其实这里设置position,capacity 的意义不大,因为 ByteBuf 并不会利用内部的 ByteBuffe r的 limit,postion 属性,而是使用readerIndex, wriateIndex。

代码@26,关联新的 ByteBuffer, 并释放原缓存区的空间。

代码@3,压缩缓存区。实现思路是新建一个缓存区,如果 readerIndex 大于新建的 ByteBuffer 的 capacity,则无需将旧的缓存区内容写入到新的缓存区中。如果 readerIndex 小于新 capacity,那需要将 readerIndex 至( Math.min(writerIndex, newCapacity) )直接的内容写入到新的缓存,然后释放旧的缓存区。

我们在重点关注一下 setByteBuffer(newBuffer) 方法,该方法还负责销毁原先的 ByteBuffer。

  1. private void setByteBuffer(ByteBuffer buffer) {
  2. ByteBuffer oldBuffer = this.buffer;
  3. if (oldBuffer != null) {
  4. if (doNotFree) {
  5. doNotFree = false;
  6. } else {
  7. freeDirect(oldBuffer);
  8. }
  9. }
  10. this.buffer = buffer;
  11. tmpNioBuf = null;
  12. capacity = buffer.remaining();
  13. }

释放原先的内存。

1.3.2 内存分配

Netty 在为内存的分配,单独封装,相关类图:

Netty学习之旅—-ByteBuf源码解读之初探UnpooledHeapByteBuf、UnpooledDirectByteBuf

目前,先关注 UnpooledByteBufAllocator,对象池的ByteBuf在后续章节中重点关注。

结合原代码,有如下两个方法引起了我的注意:

  1. 容量扩容规则(容量增长规则)calculateNewCapacity 方法。
  2. 直接内存的分配。newDirectBuffer 方法。

1.3.2.1 calculateNewCapacity

  1. public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
  2. if (minNewCapacity < 0) {
  3. throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");
  4. }
  5. if (minNewCapacity > maxCapacity) {
  6. throw new IllegalArgumentException(String.format(
  7. "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
  8. minNewCapacity, maxCapacity));
  9. }
  10. final int threshold = 1048576 * 4; // 4 MiB page
  11. if (minNewCapacity == threshold) {
  12. return threshold;
  13. }
  14. // If over threshold, do not double but just increase by threshold.
  15. if (minNewCapacity > threshold) { //@1
  16. int newCapacity = minNewCapacity / threshold * threshold;
  17. if (newCapacity > maxCapacity - threshold) {
  18. newCapacity = maxCapacity;
  19. } else {
  20. newCapacity += threshold;
  21. }
  22. return newCapacity;
  23. }
  24. // Not over threshold. Double up to 4 MiB, starting from 64.
  25. int newCapacity = 64;c // @2
  26. while (newCapacity < minNewCapacity) {
  27. newCapacity <<= 1;
  28. }
  29. return Math.min(newCapacity, maxCapacity);
  30. }
  • minNewCapacity:本次需要申请的最小内存。
  • macCapacity:最大总内存申请值。

代码@1,如果最小需要的内存超过设置的 threshold(阔值的话),则循环,每次增加threshold,然后看是否达到本次申请目标。

代码@2,如果需要申请的内存小于阔值,则以64个字节以2的幂增长。这里体现了内存扩容时的一个优化点。

1.3.2.2 newDirectBuffer 方法

  1. @Override
  2. protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
  3. ByteBuf buf;
  4. if (PlatformDependent.hasUnsafe()) {
  5. buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
  6. } else {
  7. buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
  8. }
  9. return toLeakAwareBuffer(buf);
  10. }

该方法中,除了见到申请一个直接内存外,还将该 buf 变成一个可感知的对象。toLeakAwareBuffer 方法,用于该对象被引用的情况,因为 UnpooledDirectByteBuf 是一个聚合对象,内部维护了一个 java.nio.ByteBuffer 的直接对外内存空间,在什么是释放UnpooledDirectByteBuf 中的堆外内存呢?在 UnpooledDirectByteBuf 被java垃圾回收的时候,应该于此同时需要释放指向的堆外内存,但堆外内存不受JVM GC的管理,所以我们只有感知到UnpooledDirectByteBuf被JVM虚拟机回收后,手动去释放堆外内存,大家想想都知道,我们可以通过JAVA提供的引用机制,来实现跟踪垃圾回收器的收集工作,虚引用的作用来了,下一篇,我将会以这个为入口点,重点分析 Netty 堆外内存如何管理,也就是内存泄露检测等方面的课题。


来源:https://blog.csdn.net/prestigeding/article/details/53977445

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。
netty

# Netty学习之旅—-源码分析Netty内存泄漏检测

2021-5-12 17:15:50

javanetty

ByteBuf 篇之 ByteBuf 内部结构与 API 学习

2021-5-12 17:16:40

0 条回复 A文章作者 M管理员
欢迎您,新朋友,感谢参与互动!
    暂无讨论,说说你的看法吧