运维开发网

使用SocketChannel读取数据过程的网络分布式字节缓冲分析

运维开发网 https://www.qedev.com 2022-04-26 14:55 出处:网络
这篇文章主要为大家介绍了Netty源码分析ByteBuf使用SocketChannel读取数据过程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

这篇文章主要为大家介绍了Netty源码分析ByteBuf使用SocketChannel读取数据过程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

第三章分析了客户端访问的过程。本节带您分析客户端发送数据和服务器读取数据的过程:

首先温馨提醒,本节与第三章第一节和第二节内容高度耦合,很多知识在此不再赘述。如果对之前的知识印象不深,建议先把第三章第一节和第二节的内容补上,再学这一节。

门户网站:

初始化NioSockectChannelConfig

创建句柄来处理访问事件。


Server读取数据的流程


我们首先看NioEventLoop的processSelectedKey方法private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { //获取到channel中的unsafe final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //如果这个key不是合法的, 说明这个channel可能有问题 if (!k.isValid()) { //代码省略 } try { //如果是合法的, 拿到key的io事件 int readyOps = k.readyOps(); //链接事件 if ((readyOps amp; SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops amp;= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } //写事件 if ((readyOps amp; SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } //读事件和接受链接事件 //如果当前NioEventLoop是work线程的话, 这里就是op_read事件 //如果是当前NioEventLoop是boss线程的话, 这里就是op_accept事件 if ((readyOps amp; (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}

if((ready ops amp;(选择键。OP_READ |选择键。OP_ACCEPT))!= 0 || readyOps == 0)

这里的判断表明轮询的大事件是op_read或op_accept事件。

前面章节分析过,如果当前NioEventLoop是工作线程,那么这里就是op_read事件,也就是read事件,表示客户端发送数据流。

这里调用unsafe的redis()方法进行读取。

如果是工作线程,那么这里的通道是NioServerSocketChannel,其绑定的unsafe是NioByteUnsafe。在这里,您将进入NioByteUnsafe的read()方法:

public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() lt;= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() lt; 0; break; } allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending amp;amp; !config.isAutoRead()) { removeReadOp(); } } }}

首先,获取SocketChannel的配置、管道和其他相关属性。

finalbytebufollocator allocator = config . get allocator();这一步是获取一个ByteBuf的内存分配器,用来分配ByteBuf。


这里会走到DefaultChannelConfig的getAllocator方法中public ByteBufAllocator getAllocator() { return allocator;}

这里返回了DefualtChannelConfig的成员变量,我们来看看这个成员变量:

private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;

在这里,调用ByteBufAllocator的属性DEFAULT,并遵循:

ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;

我们看到ByteBufUtil的静态属性DEFAULT_ALLOCATOR在这里又被调用了,然后跟进去:

static final ByteBufAllocator DEFAULT_ALLOCATOR;

属性DEFAULT_ALLOCATOR在静态块中初始化。


我们跟到static块中static { String allocType = SystemPropertyUtil.get( "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled"); allocType = allocType.toLowerCase(Locale.US).trim(); ByteBufAllocator alloc; if ("unpooled".equals(allocType)) { alloc = UnpooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else if ("pooled".equals(allocType)) { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType); } DEFAULT_ALLOCATOR = alloc; //代码省略}

首先判断运行环境是否是Android,如果不是,返回quot池化quot;字符串保存在allocType中

然后通过if判断,将局部变量alloc = pooledbytebufolcator . DEFAULT,最后将alloc赋给成员变量DEFAULT_ALLOCATOR。

我们遵循PooledByteBufAllocator的默认属性:

public static final PooledByteBufAllocator DEFAULT = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());

我们在这里可以看到,PooledByteBufAllocator对象是由新方法直接创建的,即基于申请一个连续内存来进行缓冲区分配的缓冲区分配器。

缓冲区分配器的知识在上一节已经详细分析过了,这里不再赘述。


回到NioByteUnsafe的read()方法中public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() lt;= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() lt; 0; break; } allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending amp;amp; !config.isAutoRead()) { removeReadOp(); } } }}

bytebufallocator allocator = config . get allocator()中的分配器是PooledByteBufAllocator。

FinRecvByteBufAllocator。HandleallocHandle = recvbufallochandle()是创建一个句柄。正如我们在前面章节中所说,句柄是实际操作recvbytebuallocator的对象。


我们跟进recvBufAllocHandlepublic RecvByteBufAllocator.Handle recvBufAllocHandle() { //如果不存在, 则创建一个handle的实例 if (recvHandle == null) { recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle;}

这里是我们之前分析过的逻辑。如果它不存在,创建一个句柄实例。具体的创作过程可以复习第三章第二节,这里就不赘述了。

类似地,allocHandle.reset(config)重置配置,这也在第3章的第二小节中分析。

配置复位后,执行do-while循环。循环终止条件allocHandle.continueReading()在上一节已经详细分析过了,这里不再赘述。

在do-while循环中,首先看步骤byte buf = alloch handle . allocate(allocator)。在这里,刚刚创建的allocate对象被传入,即PooledByteBufAllocator:

这将运行到defaultmaxmessagesrecvbyteballocator类的allocate方法:

public ByteBuf allocate(ByteBufAllocator alloc) { return alloc.ioBuffer(guess());}

在这里,的guess方法将调用AdaptiveRecvByteBufAllocator的guess方法:

public int guess() { return nextReceiveBufferSize;}

这里会返回AdaptiveRecvByteBufAllocator的成员变量nextReceiveBufferSize,即下次分配的缓冲区大小。根据我们之前了解到的情况,初始大小会在第一次分配时分配,也就是1024字节。

返回到defaultmaxmessagesrecvbyteballocator类的allocate方法:

这样,alloc.ioBuffer(guess())就会分配一个PooledByteBuf。

让我们遵循AbstractByteBufAllocator的ioBuffer方法:

public ByteBuf ioBuffer(int initialCapacity) { if (PlatformDependent.hasUnsafe()) { return directBuffer(initialCapacity); } return heapBuffer(initialCapacity);}

这里先判断是否可以获取jdk的不安全对象,默认为真,所以会去找directBuffer(initialCapacity),最后这里会分配一个PooledUnsafeDirectByteBuf对象。我们在上一节中详细分析了具体的分配过程。

回到NioByteUnsafe的read()方法:

分配ByteBuf后,看这一步:AllocHandle。最后读取的字节数(do read bytes (bytebuf)):

首先看参数doReadBytes(byteBuf)方法。这一步是将通道中的数据读入我们刚刚分配的ByteBuf,并返回读取的字节数。

这里将调用NioSocketChannel的doReadBytes方法:

protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());}

首先,获取channel中绑定的handler,因为我们已经创建了handler,所以这里直接获取。

看看alloc手柄。尝试读取的字节数(byteBuf。可写字节())步骤。ByteBuf。writeable bytes()返回bytebuf的可写字节数,即可以从通道读取和写入bytebuf的最大字节数。Allocate的attemptedBytesRead将可写字节数设置为defaultmaxmessagesrecvbyteballocator类的attemptedBytesRead属性。

转到defaultmaxmessagesrecvbyteballocator中的attemptedBytesRead,我们会看到:

public void attemptedBytesRead(int bytes) { attemptedBytesRead = bytes;}


继续看doReadBytes方法

最后通过bytebuf.write bytes (Java channel()、alloch handle . attempted bytes read())将jdk底层通道中的数据写入我们创建的ByteBuf中,返回实际写入的字节数。

回到NioByteUnsafe的read()方法:

看看alloc手柄。最后读取的字节(读取字节(字节缓冲))。

我们刚刚分析过,doReadBytes(byteBuf)返回世界上写入byteBuf的字节数。

查看lastBytesRead方法,并按照它执行defaultmaxmessagesreecvbytebuallocator的lastBytesRead方法:

public final void lastBytesRead(int bytes) { lastBytesRead = bytes; totalBytesRead += bytes; if (totalBytesRead lt; 0) { totalBytesRead = Integer.MAX_VALUE; }}

这里会赋两个属性,lastBytesRead代表最后读取的字节数,这里赋的值是我们刚刚写入ByteBuf的字节数,totalBytesRead代表读取的字节总数,这里写入的字节数是追加的。

继续看NioByteUnsafe的read()方法:

如果最后读取的数据为0,则表示通道中的所有数据都已被读取,新创建的ByteBuf将被释放回收,并跳出循环。

Allohandle。Incmessagesread (1)这一步是为了增加读取的消息数,因为我们最多循环16次,所以当消息数增加到16时,循环就会结束。

读取后,channelRead事件将通过管道传递。FireChannel读取(字节缓冲)。我们还在第4章详细分析了channelRead事件。

读者在这里会有疑惑。如果在一次读取后传递了channelRead事件,则服务器收到的数据可能不完整。事实上,netty也相应地处理了这一点。我们将在接下来的章节中详细分析netty的半包处理机制。

在循环结束时,这个步骤将被执行到allocHandle.readComplete()。

我们知道第一次分配的ByteBuf初始容量是1024,但是初始容量不一定能满足所有的业务场景。在netty中,记录每次读取数据的字节数,然后在下一次分配ByteBuf时,根据业务场景的需要,容量会尽可能大。具体实现方法体现在readComplete()的步骤中。

我们遵循AdaptiveRecvByteBufAllocator的readComplete()方法:

public void readComplete() { record(totalBytesRead());}

这里调用record方法,传入这次读取的总字节数。


跟到record方法中private void record(int actualReadBytes) { if (actualReadBytes lt;= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) { if (decreaseNow) { index = Math.max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } else { decreaseNow = true; } } else if (actualReadBytes gt;= nextReceiveBufferSize) { index = Math.min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; }}

看判断条件if(actualReadBytes lt;= SIZE_TABLE[Math.max(0,index - INDEX_DECREMENT - 1)]

这里,index是当前分配的缓冲区大小所在的SIZE_TABLE中的索引。缩进这个索引,然后根据缩进的值找出SIZE_TABLE中存储的内存值,再判断是否大于等于本次读取的最大字节数。如果满足条件,说明分配的内存太大,需要进行大小缩减操作。让我们看看与IF块中尺寸减小相关的逻辑。

第一,if(现在减)会决定是否立即缩水。通常第一次不会缩水。然后,它会将reduce now设置为true,也就是说下次直接收缩。

假设需要立即进行收缩操作,我们来看看收缩操作的相关逻辑:

index = math . max(index-index _ decrement,minindex)此步骤将进一步收缩索引,但它不能小于最小索引值。

然后通过nextReceiveBufferSize = size _ table[index]得到设置索引后的内存,赋给nextReceiveBufferSize,这是下次要分配的大小。ByteBuf下次会按照这个大小分配,从而实现收缩操作。

看看elseamp;if(actualReadBytes gt;= nextReceiveBufferSize)

如果判断本次读取的字节总量大于上次分配的大小,则进行扩容操作。

扩展操作也很简单。索引进行步进,然后获取步进索引对应的内存值作为下次分配的大小。

在NioByteUnsafe的read()方法中:

收缩或扩展容量后,ChannelReadComplete()事件通过pipeline . firecannelreadcomplete()传播。

以上是读取客户端消息的相关流程。


章节总结

本章主要分析ByteBuf的基本操作以及缓冲区分配等相关知识。

缓冲区分配可以分为调用jdk的api和分配一个连续内存。

其中,在通过分配连续内存的缓冲区分配中,还引入了页级分配逻辑和子页级分配逻辑。

页面级分配通过操作内存二叉树来记录分配情况。

子页级分配是用位图记录分配情况。

最后介绍了NioSocketChannel处理read事件的相关逻辑。

总的来说,这一章的内容比较难,希望同学们通过课后的多次调试能够熟练掌握。

更多关于ByteBuf使用SocketChannel读取数据的过程,请关注源码搜索网其他相关文章!


0

精彩评论

暂无评论...
验证码 换一张
取 消