pRPC-Day2

Netty简介

Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。
Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke’s Choice Award,见https://www.java.net/dukeschoice/2011)。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。

摘自《Essential Netty In Action》

Netty是一个NIO客户端服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。 它极大地简化和简化了诸如TCP和UDP套接字服务器之类的网络编程。

“快速简便”并不意味着最终的应用程序将遭受可维护性或性能问题的困扰。 Netty经过精心设计,结合了许多协议(例如FTP,SMTP,HTTP以及各种基于二进制和文本的旧式协议)的实施经验。 结果,Netty成功地找到了一种无需妥协即可轻松实现开发,性能,稳定性和灵活性的方法。

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高,两张图让你了解BIO和NIO的区别:

img


img

使用 Netty(基于 NIO)替代 BIO 实现网络传输

Netty 使用不同的事件来通知状态的改变或者是操作的状态。事件可能包括:
- 连接已被激活或者连接失活
- 数据读取;
- 用户事件;
- 错误事件。
- 打开或者关闭到远程节点的连接;
- 将数据写到或者冲刷到套接字。

每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法。这是将事件驱动范式直接转换为应用程序逻辑处理比较理想的位置。

对每个事件可以进行,记录日志,数据转换,应用程序逻辑处理等操作,

Netty 提供了大量预定义的可以开箱即用的 ChannelHandler 实现,包括用于各种协议(如 HTTP 和 SSL/TLS)的 ChannelHandler。

在项目中,定制 ChannelHandler服务器处理客户端发送的数据

为什么不继承SimpleChannelInboundHandler

如果继承自 SimpleChannelInboundHandler 的话就不要考虑 ByteBuf 的释放 ,内部的channelRead 方法会替你释放 ByteBuf ,避免可能导致的内存泄露问题。

详见《Netty进阶之路 跟着案例学 Netty》

项目中使用的Netty调度实现

项目中使用了 Netty 的调度模块 EventLoopGroup,使用了默认提供的 NioEventLoopGroup 实现,具体可查阅JavaDoop对其的详解

Netty的ByteBuf

ByteBuf就是Netty提供使用的缓冲区,类似JDK的Buffer,但是比它的功能更加强大,例如:

1,ByteBuffer长度固定,需要我们编码进行控制;

2,ByteBuffer只有一个标识位置的指针position,读写操作的时候需要手工调用flip()、clear()、rewind()等来进行操作;

3,ByteBuffer的API功能有限,一些高级和实用的特性支持不够好,需要自己实现,而Netty提供的ByteBuf则进行在JDK ByteBuffer的基础上完善了这些功能。

一、ByteBuffer的工作原理,核心概念:

byte[] buff:buff即内部用于缓存的数组;

position:当前读取的位置;

mark:为某一读过的位置做标记,便于某些时候回退到该位置

capacity:初始化时候的容量;

limit:读写的上限,limit<=capacity。

二、Netty的ByteBuf的工作原理,它和ByteBuffer的不同主要体现在position,改为了writeIndex和readIndex,而且它的put方法做了自动扩容功能。

三、ByteBuf的常用的功能

1,顺序读操作(read),就是从ByteBuf中进行读数据,包括不同返回的数据类型,不同索引,不同长度的读取等功能非常丰富。

2,顺序写操作(write),就是向ByteBuf进行写数据,包括写入不同类型boolean,int,long,ByteBuf,byte[]等,以及写入后writerIndex的操作,还有在指定的索引写入指定length的数据,功能也是非常丰富。

3,readerIndex和writerIndex上边讲原理的应该已经和清楚了,它就是将byteBuffer中的position分开了,还是很容易理解的。

4,Discardable byte,相比于其他的Java对象,缓冲区的分配和释放是个耗时的操作,因此有了Discardable bytes 即可废弃空间,通过discardReadBytes操作,可以进行原理图中的释放空间,进行重复利用缓存。但是此操作,可以看出对字节数组进行了内存复制,也是消耗性能,是一种牺牲性能换取更多可用内存的操作。

5,Readable bytes和Writable bytes,即可读空间、可写空间,Readable bytes为实际存储数据的区域,Writable bytes为尚未被使用的空闲空间。写入数据如果空间不够会进行自动扩容。下篇看源码看吧

6,Clear操作,和Jdk ByteBuffer的clear操作一样,他并不会清空缓存区内容本事,主要是用来还原操作位置指针 。原理图很明显的。

7,Mark和Reset,当对缓存进行读操作,由于某种原因,可能需要对之前的操作进行回滚。ByteBuf提供了:a,markReaderIndex:将当前的readerIndex备份到markedReaderIndex中;b,resetReaderIndex:将当前的readerIndex设置为markedReaderIndex;c,markWriterIndex:将当前的writerIndex备份到markedWriterIndex;d,resetWriterIndex:将当前的writerIndex设置为markedWriterIndex。

8,查找操作:例如indexOf、bytesBefore、forEachByte等各种方法,来帮助我们查找一些常用值,例如回车换行符、分隔符等。而Netty也为我们在ByteBufProcessor中抽象了这些常用值。

9,Derived buffers,类似数据库的视图,ByteBuf提供了多个接口用户创建某个ByteBuf的视图或者进行复制,例如:duplicate、copy、slice等。

10,转换成标准的ByteBuffer,由于底层JDK进行网络读写的时候都是使用的ByteBuffer,所以ByteBuf提供(也必须)两者的各种相互转换,例如:nioBuffer(),还有有参方法。

11,随机读写(set和get),除了顺序读写之外,ByteBuf还支持随机读写,即可以随机指定读写的索引位置。

Netty中的粘包与拆包

TCP粘包和拆包

TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

如图所示,假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

  1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
  2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;
  3. 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包
  4. 服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。

如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。

项目中通过重新设计协议解决

客户端与服务端通信协议(数据包结构)重新设计 ,可以将原有的 RpcRequestRpcReuqest 对象作为消息体,然后增加如下字段(可以参考:《Netty 入门实战小册》和 Dubbo 框架对这块的设计)

Netty实现长连接-心跳机制

何为心跳

顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.

为什么需要心跳

因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等, 会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的. 为了解决这个问题, 我们就需要引入 心跳 机制. 心跳机制的工作原理是: 在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle 状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互. 自然地, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性.

如何实现心跳

我们可以通过两种方式实现心跳机制:

  • 使用 TCP 协议层面的 keepalive 机制.
  • 在应用层上实现自定义的心跳机制.

虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:

  1. 它不是 TCP 的标准协议, 并且是默认关闭的.
  2. TCP keepalive 机制依赖于操作系统的实现, 默认的 keepalive 心跳时间是 两个小时, 并且对 keepalive 的修改需要系统调用(或者修改系统配置), 灵活性不够.
  3. TCP keepalive 与 TCP 协议绑定, 因此如果需要更换为 UDP 协议时, keepalive 机制就失效了.

虽然使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量, 但是基于上面的几点缺点, 一般的实践中, 人们大多数都是选择在应用层上实现自定义的心跳.
既然如此, 那么我们就来大致看看在在 Netty 中是怎么实现心跳的吧. 在 Netty 中, 实现心跳机制的关键是 IdleStateHandler, 它可以对一个 Channel 的 读/写设置定时器, 当 Channel 在一定事件间隔内没有数据交互时(即处于 idle 状态), 就会触发指定的事件.

项目中的简单实现

通过开启ServerBootstrap()中的参数即可开启心跳机制

// 是否开启 TCP 底层心跳机制                    .childOption(ChannelOption.SO_KEEPALIVE, true)

参数设置

p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!