Netty 是一个基于 JAVA NIO 类库的
异步通信框架
。用于创建异步非阻塞、基于事件驱动、高性能、高可靠性和高可定制性的
网络客户端和服务器端
。
Netty
主要针对在TCP协议下
,面向Clients端的高并发应用
- 或者
Peer-to-Peer
场景下的大量数据持续传输
的应用Netty本质是一个
NIO框架
,适用于服务器通讯
相关的多种应用场景。
原生NIO存在的问题
NIO 的类库和 API 繁杂,使用麻烦。
要熟悉 Java 多线程编程
- 因为 NIO 编程涉及到
Reactor
模式
- 必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。
开发工作量和难度都非常大:
- 例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
JDK NIO 的 Bug:
- 例如臭名昭著的
Epoll Bug
:它会导致Selector
空轮询,最终导致 CPU 100%。
- 直到 JDK 1.7 版本该问题仍旧存在,没有被根本解决。
应用场景
互联网行业:
- 在 分布式系统中,各个节点之间需要 远程服务调用,高性能的
RPC
框架必不可少- Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些
RPC
框架使用典型的应用有:
- 阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信
- Dubbo 协议默认使用
Netty
作为基础通信组件,用于实现各进程节点之间的内部通信游戏行业
Netty 作为高性能的基础通信组件,提供了
TCP/UDP
和HTTP
协议栈
- 方便定制和开发私有协议栈,账号登录服务器
地图服务器之间可以方便的通过
Netty
进行高性能的通信
Netty案例
搭建 Server 端服务器:
- 创建
ServerBootstrap
对象。
ServerBootstrap
配置eventLoopGroup
:
- BossGroup : 处理连接请求,用于创建 Channel 转发给 WorkerGroup。
- workerGroup : 处理已建立的连接。
- 配置服务器处理的
Channel
连接的类型 ,包括 NIO 类型 , EPOLL 类型等。- 配置整个处理流程的
ChannelHandler
,用于特定的处理。- Server 端
bind()
一个端口 , 同时阻塞等待绑定完成。
// 第一步 :准备后续处理业务的 Handler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 客户端连接会触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("S2 : 服务端建立连接,触发 Active .....");
}
/**
* 客户端发消息会触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器收到消息: " + msg.toString());
// 接收到消息后,返回一条消息给客户端
ctx.write(msg + "World!");
ctx.flush();
}
/**
* 发生异常触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
// 第二步 :创建 Initializer 用于初始化 Channel
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加编解码
socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
// 此处将 Handler 绑定到 ChannelPipeline 中
socketChannel.pipeline().addLast(new NettyServerHandler());
}
}
// 第三步 : 准备好 Netty 客户端
public class NettyServer extends Thread {
public void run() {
// S1 : 准备 Boss 管理线程组 和 Worker 工作线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup(200);
// S2 : 绑定 Group , 绑定渠道 , 绑定 Handler
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer());
try {
InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8090);
// S3 : 绑定对应端口和地址,用于后续阻塞监听
ChannelFuture future = bootstrap.bind(socketAddress).sync();
System.out.println("S1 : 服务端构建完成 .....");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//关闭主线程组
bossGroup.shutdownGracefully();
//关闭工作线程组
workGroup.shutdownGracefully();
}
}
}
搭建客户端:
- 不同于 Server ,Client 端只需要一个
EventLoopGroup
。- 创建 Bootstrap ,绑定
Group / channel / handler
。- 通过
writeAndFlush
发送消息。
// 第一步 :准备客户端的 Handler 接收消息
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("S2 : 客户端建立连接,触发 Active .....");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("客户端收到消息: " + msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
// 第二步 :创建 Initializer 用于初始化 Channel
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast("decoder", new StringDecoder());
socketChannel.pipeline().addLast("encoder", new StringEncoder());
// 此处将 Handler 绑定到 ChannelPipeline 中
socketChannel.pipeline().addLast(new NettyClientHandler());
}
}
// 第三步 : 准备启动类
public class NettyClient extends Thread {
public void run() {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new NettyClientInitializer());
try {
ChannelFuture future = bootstrap.connect("127.0.0.1", 8090).sync();
System.out.println("S2 : 客户端构建完成 .....");
for (int i = 0; i < 10; i++) {
String message = "第" + i + "条消息 , " + "Hello ";
future.channel().writeAndFlush(message);
Thread.sleep(1000); // 暂停一秒钟
}
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
发起请求:
- 通过2个线程模拟消息的发送和接收。
public class DemoService {
public static void main(String[] args) throws InterruptedException {
System.out.println("Start :开始整个 Netty 搭建流程");
// S1 : 分别开2个线程模拟 Server 和 Client 的开启
NettyServer server = new NettyServer();
server.start();
NettyClient client = new NettyClient();
client.start();
}
}
Reactor
核心组成
Reactor:
Reactor
在一个单独的线程中运行,负责监听和分发事件
- 分发给适当的处理程序来对 IO 事件做出反应。
Handlers:
- 处理程序执行
I/O
事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。
Reactor
通过调度适当的处理程序来响应I/O
事件,处理程序执行非阻塞操作。
模式分类
根据
Reactor
的数量和处理资源池线程的数量不同,有 3 种典型的实现:
- 单 Reactor 单线程
- 单 Reactor 多线程
- 主从 Reactor 多线程
单 Reactor 单线程
Select 实现应用程序通过一个阻塞对象监听多路连接请求
Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发
如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求
- 然后创建一个Handler 对象处理连接完成后的后续业务处理
如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应
Handler 会完成 Read ==> 业务处理 ==> Send 的完整业务流程
服务器端用 一个线程 通过 多路复用 搞定所有的 IO 操作(包括连接,读、写等),编码简单,清晰明了
- 但是如果客户端连接数量较多,将无法支撑,前面的NIO案例就属于这种模型。
优点:
模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
缺点:
性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。
Handler
在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈可靠性问题,线程意外终止,或者进入死循环
- 会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
使用场景:
- 客户端的数量有限,业务处理非常快速,比如 Redis在业务处理的时间复杂度
O(1)
的情况。
单 Reactor 多线程
Reactor 对象通过 select 监控客户端请求事件, 收到事件后,通过dispatch进行分发
如果建立连接请求, 则右 Acceptor 通过
accept
处理连接请求
- 然后创建一个
Handler
对象处理完成连接后的各种事件如果不是连接请求,则由reactor分发调用连接对应的
handler
来处理handler 只负责响应事件,不做具体的业务处理,通过read 读取数据后
- 会分发给后面的worker线程池的某个线程处理业务
worker 线程池 会分配独立线程完成真正的业务,并将结果返回给handler
handler收到响应后,通过send 将结果返回给 client
优点:
- 可以充分的利用多核cpu 的处理能力。
缺点:
- 多线程数据共享和访问比较复杂,
reactor
处理所有的事件的监听和响应- 在单线程运行, 在高并发场景容易出现性能瓶颈。
主从 Reactor 多线程
针对单 Reactor 多线程模型中,Reactor 在单线程中运行
- 高并发场景下容易成为性能瓶颈,可以让 Reactor 在多线程中运行。
Reactor主线程 MainReactor 对象通过 select 监听连接事件
- 收到事件后,通过 Acceptor 处理连接事件
当 Acceptor 处理连接事件后,MainReactor 将连接分配给SubReactor
SubReactor 将连接加入到连接队列进行监听,并创建handler进行各种事件处理
当有新事件发生时,subreactor 就会调用对应的 handler 处理
handler 通过 read 读取数据,分发给后面的worker 线程处理
worker 线程池分配独立的 worker 线程进行业务处理,并返回结果
优点
父线程与子线程的数据交互简单职责明确
- 父线程只需要接收新连接,子线程完成后续的业务处理。
父线程与子线程的数据交互简单
- Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。
缺点:
- 编程复杂度较高
结合实例:
这种模型在许多项目中广泛使用:
- 包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持
线程模型
Netty 主要基于 主从
Reactors
多线程模型 做了一定的 改进:
- 其中主从 Reactor 多线程模型有多个 Reactor。
Netty 抽象出两组线程池:BossGroup 和 WorkerGroup
BossGroup 专门负责接收客户端的连接
WorkerGroup 专门负责网络的读写
BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup
- NioEventLoopGroup 相当于一个 事件循环组
- 这个组中 含有多个事件循环 ,每一个事件循环是 NioEventLoop
NioEventLoop 表示一个不断循环的执行处理任务的线程
- 每个NioEventLoop 都有一个selector,用于监听绑定在其上的socket的网络通讯。
NioEventLoopGroup 可以有多个线程,即可以含有多个NioEventLoop
每个Boss Group 中的 NioEventLoop 循环执行的步骤:
轮询accept 事件
处理accept 事件,与client建立连接,生成 NioScocketChannel
- 并将其注册 Worker Group 上的某个 NIOEventLoop 上的 selector
处理任务队列的任务,即 runAllTasks
每个 Worker Group 中的 NIOEventLoop 循环执行的步骤:
轮询 read/write 事件
处理 I/O 事件,即 read/write 事件,在对应的 NioScocketChannel 上处理
处理任务队列的任务,即 runAllTasks
每个Worker NIOEventLoop 处理业务时,会使用 pipeline(管道)。
- pipline中包含了 channel,即通过pipline可以获取到对应的 channel
- 并且pipline维护了很多的 handler(处理器)来对我们的数据进行一系列的处理。
handler(处理器) 有Netty内置的,我们也可以自己定义。
编解码
半包和粘包
半包问题:
指一个完整的应用层消息被分成多个
TCP
数据包发送,接收端在一次读取操作中只接收到消息的一部分
例如,发送端发送了一条 100 字节的消息,但由于网络原因,这条消息被拆分成了两个
TCP
数据包
- 一个 60 字节,另一个 40 字节
接收端可能在第一次读取时只接收到前 60 字节的数据,剩下的 40 字节需要在后续的读取操作中才能接收到
粘包问题:
粘包问题是指多个应用层消息在传输过程中被粘在一起,接收端在一次读取操作中接收到大于 1个 消息的情况
- 例如,发送端发送了两条消息,每条 50 字节,但接收端在一次读取操作中收到了 80 字节的数据,超过了 1条 消息的内容
解码器
固定长度解码器:FixedLengthFrameDecoder:
直接通过构造函数设置固定长度的大小
frameLength
无论接收方一次获取多大的数据,都会严格按照
frameLength
进行解码如果累积读取到长度大小为
frameLength
的消息,那么解码器认为已经获取到了一个完整的消息如果消息长度小于 frameLength:
FixedLengthFrameDecoder
解码器会一直等后续数据包的到达,直至获得完整的消息
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;
public FixedLengthFrameDecoder(int frameLength) {
this.frameLength = frameLength;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
while (in.readableBytes() >= frameLength) {
ByteBuf buf = in.readBytes(frameLength);
out.add(buf);
}
}
}
特殊分隔符解码器:
DelimiterBasedFrameDecoder
- 基于换行符解码器和自定义分隔符解码器(比如 特殊字符)来划分消息边界,从而解决半包和粘包问题
- 使用者可以根据自己的需求灵活确定分隔符
长度域解码器:
LengthFieldBasedFrameDecoder
- 基于长度字段的解码器是指在消息头部添加长度字段,指示消息的总长度
基本架构
逻辑架构
Netty的逻辑处理架构共分为网络通信层、事件调度层、服务编排层,每一层各司其职。
网络通信层:
网络通信层的职责是执行网络 I/O 的操作。
- 它支持多种网络协议和 I/O 模型的连接操作。
当网络数据读取到内核缓冲区后,会触发各种网络事件,这些网络事件会分发给事件调度层进行处理。
网络通信层的包含BootStrap、ServerBootStrap、Channel三个组件。
BootStrap & ServerBootStrap
- Bootstrap:
- 主要负责整个 Netty 程序的启动、初始化、服务器连接等过程。
Netty中的引导器共分为两种类型:
- 用于客户端引导的 Bootstrap。
- 用于服务端引导的 ServerBootStrap,它们都继承自抽象类 AbstractBootstrap。
事件调度层:
通过 Reactor 线程模型对各类事件进行聚合处理
通过 Selector 主循环线程集成多种事件( I/O 事件、信号事件、定时事件等)
- 实际的业务处理逻辑是交由服务编排层中相关的
Handler
完成。事件调度层的核心组件包括 EventLoopGroup、EventLoop。
- EventLoopGroup & EventLoop
EventLoopGroup 本质是一个线程池,主要负责接收 I/O 请求,并分配线程执行处理请求。
EventLoopGroup、EventLoop、Channel 的关系:
- 一个 EventLoopGroup 往往包含一个或者多个 EventLoop。
- EventLoop 用于处理 Channel 生命周期内的所有 I/O 事件
- 如 accept、connect、read、write 等 I/O 事件。
- EventLoop 同一时间会与一个线程绑定,每个 EventLoop 负责处理多个 Channel。
- 每新建一个 Channel,EventLoopGroup 会选择一个 EventLoop 与其绑定。
- 该 Channel 在生命周期内都可以对 EventLoop 进行多次绑定和解绑。
EventLoopGroup 的实现类是 NioEventLoopGroup。
可以把 NioEventLoopGroup 理解为一个线程池
- 每个线程负责处理多个 Channel,而同一个 Channel 只会对应一个线程。
Netty 通过创建不同的 EventLoopGroup 参数配置,可以支持 Reactor 的三种线程模型:
- 单线程模型:
- EventLoopGroup 只包含一个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup。
- 多线程模型:
- EventLoopGroup 包含多个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup。
- 主从多线程模型:
- EventLoopGroup 包含多个 EventLoop,Boss 是主 Reactor,Worker 是从 Reactor
- 它们分别使用不同的 EventLoopGroup
- 主 Reactor 负责新的网络连接 Channel 创建,然后把 Channel 注册到从 Reactor。
服务编排层:
服务编排层的职责是负责组装各类服务,它是 Netty 的核心处理链,用以实现网络事件的动态编排和有序传播。
服务编排层的核心组件包括 ChannelPipeline、ChannelHandler、ChannelHandlerContext。
ChannelPipeline
负责组装各种 ChannelHandler,实际数据的编解码以及加工处理操作都是由 ChannelHandler 完成的。
ChannelPipeline 可以理解为ChannelHandler 的实例列表:
- 内部通过双向链表将不同的 ChannelHandler 链接在一起。
当 I/O 读写事件触发时,ChannelPipeline 会依次调用 ChannelHandler 列表对 Channel 的数据进行拦截和处理。
ChannelPipeline 是线程安全的,因为每一个新的 Channel 都会对应绑定一个新的 ChannelPipeline。
一个 ChannelPipeline 关联一个 EventLoop,一个 EventLoop 仅会绑定一个线程。
ChannelHandler & ChannelHandlerContext
- ChannelHandlerContext 用于保存 ChannelHandler 上下文。
组件关系
服务端启动初始化时有 Boss EventLoopGroup 和 Worker EventLoopGroup 两个组件,其中 Boss 负责监听网络连接事件。
- 当有新的网络连接事件到达时,则将 Channel 注册到 Worker EventLoopGroup。
Worker EventLoopGroup 会被分配一个 EventLoop 负责处理该 Channel 的读写事件。
- 每个 EventLoop 都是单线程的,通过 Selector 进行事件循环。
当客户端发起 I/O 读写事件时,服务端 EventLoop 会进行数据的读取
- 然后通过 Pipeline 触发各种监听器进行数据的加工处理。
客户端数据会被传递到 ChannelPipeline 的第一个 ChannelInboundHandler 中
- 数据处理完成后,将加工完成的数据传递给下一个 ChannelInboundHandler。
当数据写回客户端时,会将处理结果在 ChannelPipeline 的 ChannelOutboundHandler 中传播,最后到达客户端。
异步模型
Netty 中的 I/O 操作 是 异步 的,包括 Bind、Write、Connect 等操作会简单的返回一个
ChannelFuture
。调用者并不能立刻获得结果,而是通过
Future-Listener
机制
- 用户可以方便的 主动获取 或者通过 通知机制 获得 IO 操作结果。
Netty 的异步模型是建立在 future 和 callback 的之上的。
- callback 就是回调。
Future的核心思想是:
假设一个方法 fun,计算过程可能非常耗时,等待 fun返回显然不合适。
那么可以在调用 fun 的时候,立马返回一个 Future
- 后续可以通过Future去监控方法 fun 的处理过程(即
Future-Listener
机制)
常见操作
通过 isDone 方法来判断当前操作是否完成(注意不是判断完成成功)
通过 isSuccess 方法来判断已完成的当前操作是否成功
通过 getCause 方法来获取已完成的当前操作失败的原因
通过 isCancelled 方法来判断已完成的当前操作是否被取消
通过 addListener 方法来注册监听器
- 当操作已完成(isDone 方法返回完成),将会通知指定的监听器
- 如果 Future 对象已完成,则通知指定的监听器。
Future Listener 机制
当 Future 对象刚刚创建时,处于非完成状态
- 调用者可以通过返回的
ChannelFuture
来获取操作执行的状态,注册监听函数来执行完成后的操作
。
案例说明
绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑
// 3.绑定端口并且同步,生成一个 ChannelFuture 对象
// 这里服务器就已经启动服务器了
ChannelFuture cf = bootstrap.bind(6668).sync();
// 给 cf 注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6668 成功");
} else {
System.out.println("监听端口 6668 失败");
}
}
});
相比传统阻塞 I/O,执行 I/O 操作后线程会被阻塞住,直到操作完成
异步处理的好处是不会造成线程阻塞
- 线程在 I/O 操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量。
任务队列
在 事件循环(
NioEventLoop
) 的过程中,会在 pipline 中调用 Handler 来处理业务假如在某一个 Handler有一个长时间的操作,这必会造成 pipiline 的阻塞
- 可以将这个耗时的处理提交到
TaskQueue
进行异步的执行。
使用场景
用户程序自定义的普通任务
用户自定义定时任务
非当前
Reactor
线程调用Channel
的各种方法。
例如在 推送系统 的业务线程里面,根据 用户的标识,找到对应的 Channel 引用
然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。
最终的 Write 会提交到任务队列中后被 异步消费。
代码演示
在 NettyServerHandler 的
channelRead()
方法中模拟一下耗时的业务:此时客户端必然是等待 5秒 后才能收到消息:
- 耗时业务执行完毕…;然后再收到消息:hello,客户端…。
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 比如这里我们有一个非常耗时的业务,我们想让它异步的执行
// 只需要将它提交到该 channel 对应的 NioEventLoop 的 TaskQueue 中
Thread.sleep(5 * 1000);// 模拟耗时业务
// 执行完业务向客户端返回消息
ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务执行完毕...",CharsetUtil.UTF_8));
System.out.println("go on...");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// writeAndFlush 是 write + flush
// 将数据写入到缓存,并刷新
// 一般我们会对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端...",CharsetUtil.UTF_8));
}
}
解决方案一:用户程序自定义的普通任务
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 比如这里我们有一个非常耗时的业务,我们想让它异步的执行
// 只需要将它提交到该 channel 对应的 NioEventLoop 的 TaskQueue 中
// 解决方案1:用户程序自定义的普通程序
ctx.channel().eventLoop().execute(new Runnable() {
public void run() {
try {
Thread.sleep(5 * 1000);// 模拟耗时业务
// 执行完业务向客户端返回消息
ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务执行完毕...",CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println("go on...");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// writeAndFlush 是 write + flush
// 将数据写入到缓存,并刷新
// 一般我们会对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端...",CharsetUtil.UTF_8));
}
解决方案二:用户自定义定时任务
- 这种解决方案任务是提交到
scheduleTaskQueue
中。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 比如这里我们有一个非常耗时的业务,我们想让它异步的执行
// 只需要将它提交到该 channel 对应的 NioEventLoop 的 TaskQueue 中
// 解决方案2:用户自定义定时任务,该任务提交到 scheduleTaskQueue 中
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);// 模拟耗时业务
// 执行完业务向客户端返回消息
ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务执行完毕...",CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},5, TimeUnit.SECONDS);
System.out.println("go on...");
}