ChannelHandler和ChannelPipeline
Channel生命周期 / 状态自动机
ChannelUnregisterd
=>ChannelRegistered
=> ChannelActive
=> ChannelInactive
=> ChannelUnregisterd
每一个状态转化都会产生相应事件。
ChannelUnregistered:刚创建;
ChannelRegistered: (创建以后),已注册到EventLoop;
ChannelActive: 已经连接到远程节点;
ChannelInactive: 没有连接到远程节点。
ChannelHeadler 生命周期
handlerAdded: ChannelHeadler添加到pipeline时调用;
handlerRemoved: 移除时;
exceptionCaught: 发生错误时。
ChannelInboundHandler接口 (入站事件)
省略别的常见事件:
ChannelWritabilityChanged: 可写状态发生改变事件
userEventTriggered: 调用ChannelInboundHandler.fireUserEventTriggered()时触发。用于用户自定义事件。
可写状态与高低水位:
high watermark
机制: 写太快时达到高水位线时,转变为不可写;
// is_full()是根据当前是否大于等于high water mark来判断,如果full会wait。low watermark
机制: 达到低水位线时,转变为可写。
// 其他地方low watermark
的含义: 设定最小时间戳,低于低水位线的数据不再接收。
高低水位设置:
1 | Channel.config.setWriteHighWaterMark(); |
ChannelConfig
默认的水位配置为低水位32K,高水位64K。
资源释放
第五章里提到了bufferBuf
的释放问题:pipeline
里最后一个Handler
要负责释放收到的数据:
1 | bufferBuf.release(); |
落实到入站事件中, 如果重写了ChannelRead()
事件,这个方法需要负责释放池化的ByteBuf
:
1 |
|
SimpleChannelInboundHandler的自动释放
如果不想每次在ChannelRead()
方法里释放消息,可以直接使用SimpleChannelInboundHandler
,它会自动释放收到的消息。
相对的,由于有了自动释放,后续就无法再访问到了,因此使用SimpleChannelInboundHandler
的时候消息引用会失效。
小结:
- 不使用SimpleChannelInboundHandler: 记得在一个ChannelRead释放消息数据;
- 使用SimpleChannelInboundHandler: 注意会被自动释放,引用会失效。
ChannelOutboundHandler接口(出站事件)
类似的,出站事件也有很多,省略一些常见,列出几个特别的:
1 | flush: 将数据冲刷到远程节点时被调用; |
与入站事件相对的,需要在write
方法中释放消息:
1 |
|
注意到write
方法比channelRead
多一个ChannelPromise
参数:
1 | ChannelPromise(子接口) -> ChannelFuture(父接口) |
ChannelPromise与ChannelFuture
设计模式
:
实际上出站事件基本都多了这个ChannelPromise
参数。
为了避免程序员写bug
,netty4用ChannelPromise
接口来更改任务完成状态,
而在那些只需要读/查询的场景,返回ChannelFuture
接口。
此外,ChannelFuture
中比jdk的普通Future
多了一些信息,状态有4种:
1 | Uncompleted => success/fail/cancelled |
每种状态的判定:(其实就是字面上的意思,猜也知道)
状态 | 判定条件 |
---|---|
Uncompleted | isDone():false,isSuccess():false,isCancelled():false,cause():null |
success | isDone():ture,isSuccess():ture |
fail | isDone():ture,isSuccess():false,cause():non-null |
cancelled | isDone():ture,isCancelled():true |
入站事件和出站事件的区别
再次强调一件事:
出站事件基本比入站事件都多了1个
ChannelPromise
参数。
本质上也就是出站事件要多一个通知机制:ChannelPromise
,ChannelFuture
与ChannelFutureListener
。
- 那么为什么两者要这样区别呢?
根本原因是出站中有write
,flush
这样的io
操作,比较费时而且依赖于复杂因素,需要设计成异步的。
而入站事件基本都是在自己的内存里搞定,同步就够用了。
理解了这一点,我们就能心平气和地接受出站事件的通知机制了。
通知机制
- 每个出站操作返回一个
ChannelFuture
,注册到它的ChannelFutureListener
将在操作完成的时候被通知成功还是失败;- 出站操作传入一个
ChannelPromise
,可以进行立即通知(更改状态):setSuccess
/setFailure
。
注册ChannelFutureListener
的两种姿势:
- 对channel进行操作,获取
ChannelFuture
,然后注册Listener
; // 可以用于某一次写的定制化操作; - 出站事件中,在传入的
ChannelPromise
上注册Listener
。 // 应用于某类型的所有操作。
相应的代码如下:
1 | // 方法1: |
ChannelHandler适配器
netty提供了handler
的基本实现:ChannelInboundHandlerAdapter
和ChannelOutboundHandlerAdapter。
(入站和出站)
ChannelPipeline接口
1个Channel <=> 对应1个固定的ChannelPipeline
在一个ChannelHandler
中如何访问pipeline
?:
通过context获取到pipeline即可。
传播事件
测试下一个ChannelHandler
的类型是否与方向一致。
pipeline编排
1 | public static void modifyPipeline() { |
pipeline的事件API
主要用于触发下一个handler
的事件,触发下一个入站事件一般带个前缀fire
,触发出站事件则没有这个前缀。
例如:fireChannelRegistered
: 触发pipeline中下一个channelInboundHandler的channelRegistered
事件。(注意是Inbound
)connect
: 将channel连接到一个远程地址,将调用下一个channelOutboundHandler
的connect方法。(注意是outbound
)
ChannelHandlerContext接口
ChannelHandlerContext
记录channelHandler
和channel
的联系,类似于一个弱实体。
它也有很多事件API,含义与其他类的不同,是基于当前上下文的,也就是说:
从当前关联的ChannelHandler
开始,传播给下一个。
ChannelHandlerContext
部分API:
// Channel相关:alloc
: 返回Channel
的ByteBufAllocator
;executor
: 返回调度事件的EventExecutor
;
// handler相关:fireChannelRead
: 触发下一个InboundHanlder
的ChannelRead
方法;(入站)write
: 通过当前实例写入消息,并经过pipeline。
1 | public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker |
如上所示,这个接口其实继承了inbound和outbound,所以出站方法也有。
综上所述,如果要从某一个ChannelHandler
A开始传递事件,要先获得它的上一个handler的context。如上图所示,调用pipeline
或者channel
上的事件的话,事件就会从1号位置开始流动,调用channelContext
上的事件则会从2,3号位置(也就是下一个)开始流动。
优势:
- 减少事件传播开销;
- 避开一些
handler
的处理。
上述API可能的具体用途:
- 动态切换协议.
- 其他用途(暂时不知道还有啥)
Sharable
可以将一个ChannelHandler
绑定到多个pipeline
(此时会产生多个ChannelHandlerContext
)。这样做的场景: 比如需要收集跨越多个Channel
的统计信息时。
加上@Sharable
注解的ChannelHandler
(语法上)可以绑定到多个pipeline
上,但程序员需要注意解决线程安全的问题。// 要么无状态不可变,要么加锁,要么CAS,要么threadlocal。
异常处理
参考前文中出站事件的通知机制,因此出站事件中的异常也是封装在ChannelFuture
中的,而不是像入站事件用exceptionCaught
。
// 换言之, ChannelOutboundHandler
没有exceptionCaught
API。
入站事件异常:// 消费完异常才不会向尾端传播
1 |
|
出站事件异常:
1 |
|