Netty

月伴飞鱼 2024-09-05 21:20:17
中间件
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者!

官网:https://netty.io/

Netty 是一个基于 JAVA NIO 类库的异步通信框架

用于创建异步非阻塞、基于事件驱动、高性能、高可靠性和高可定制性的网络客户端和服务器端

Netty主要针对在 TCP协议下,面向Clients端的高并发应用

  • 或者Peer-to-Peer场景下的 大量数据持续传输 的应用

Netty本质是一个 NIO框架,适用于 服务器通讯 相关的多种应用场景。

在这里插入图片描述
原生NIO存在的问题

NIO 的类库和 API 繁杂,使用麻烦。

要熟悉 Java 多线程编程

  • 因为 NIO 编程涉及到 Reactor 模式
    • 必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。

开发工作量和难度都非常大:

  • 例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。

JDK NIO 的 Bug:

  • 例如臭名昭著的 Epoll Bug:它会导致 Selector 空轮询,最终导致 CPU 100%。
    • 直到 JDK 1.7 版本该问题仍旧存在,没有被根本解决。

应用场景

互联网行业:

  • 在 分布式系统中,各个节点之间需要 远程服务调用,高性能的 RPC 框架必不可少
  • Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用

典型的应用有:

  • 阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信
  • Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信

游戏行业

  • Netty 作为高性能的基础通信组件,提供了 TCP/UDPHTTP 协议栈

    • 方便定制和开发私有协议栈,账号登录服务器
  • 地图服务器之间可以方便的通过 Netty 进行高性能的通信

Netty案例

搭建 Server 端服务器:

  • 创建 ServerBootstrap 对象。
  • ServerBootstrap配置eventLoopGroup
    • BossGroup : 处理连接请求,用于创建 Channel 转发给 WorkerGroup。
    • workerGroup : 处理已建立的连接。
  • 配置服务器处理的 Channel 连接的类型 ,包括 NIO 类型 , EPOLL 类型等。
  • 配置整个处理流程的 ChannelHandler,用于特定的处理。
  • Server 端 bind() 一个端口 , 同时阻塞等待绑定完成。
// 第一步 :准备后续处理业务的 Handler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 客户端连接会触发
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("S2 : 服务端建立连接,触发 Active .....");
    }

    /**
     * 客户端发消息会触发
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器收到消息: " + msg.toString());
        // 接收到消息后,返回一条消息给客户端
        ctx.write(msg + "World!");
        ctx.flush();
    }

    /**
     * 发生异常触发
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}


// 第二步 :创建 Initializer 用于初始化 Channel
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //添加编解码
        socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
        socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
        // 此处将 Handler 绑定到 ChannelPipeline 中
        socketChannel.pipeline().addLast(new NettyServerHandler());
    }
}

// 第三步 : 准备好 Netty 客户端
public class NettyServer extends Thread {

    public void run() {

        // S1 : 准备 Boss 管理线程组 和 Worker 工作线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup(200);

        // S2 : 绑定 Group , 绑定渠道 , 绑定 Handler
        ServerBootstrap bootstrap = new ServerBootstrap()
                .group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ServerChannelInitializer());

        try {
            InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8090);
            
            // S3 : 绑定对应端口和地址,用于后续阻塞监听
            ChannelFuture future = bootstrap.bind(socketAddress).sync();
            System.out.println("S1 : 服务端构建完成 .....");
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //关闭主线程组
            bossGroup.shutdownGracefully();
            //关闭工作线程组
            workGroup.shutdownGracefully();
        }
    }
}

搭建客户端:

  • 不同于 Server ,Client 端只需要一个 EventLoopGroup
  • 创建 Bootstrap ,绑定 Group / channel / handler
  • 通过 writeAndFlush 发送消息。
// 第一步 :准备客户端的 Handler 接收消息
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("S2 : 客户端建立连接,触发 Active .....");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("客户端收到消息: " + msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}


// 第二步 :创建 Initializer 用于初始化 Channel
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast("decoder", new StringDecoder());
        socketChannel.pipeline().addLast("encoder", new StringEncoder());
         // 此处将 Handler 绑定到 ChannelPipeline 中
        socketChannel.pipeline().addLast(new NettyClientHandler());
    }
}


// 第三步 : 准备启动类
public class NettyClient extends Thread {

    public void run() {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new NettyClientInitializer());
        try {

            ChannelFuture future = bootstrap.connect("127.0.0.1", 8090).sync();
            System.out.println("S2 : 客户端构建完成 .....");

            for (int i = 0; i < 10; i++) {
                String message = "第" + i + "条消息 , " + "Hello ";
                future.channel().writeAndFlush(message);
                Thread.sleep(1000); // 暂停一秒钟
            }

            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

发起请求:

  • 通过2个线程模拟消息的发送和接收。
public class DemoService {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Start :开始整个 Netty 搭建流程");

        // S1 : 分别开2个线程模拟 Server 和 Client 的开启
        NettyServer server = new NettyServer();
        server.start();

        NettyClient client = new NettyClient();
        client.start();
    }

}

Reactor

核心组成

Reactor:

  • Reactor 在一个单独的线程中运行,负责监听和分发事件
    • 分发给适当的处理程序来对 IO 事件做出反应。

Handlers:

  • 处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。
    • Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。

模式分类

根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现:

  • 单 Reactor 单线程
  • 单 Reactor 多线程
  • 主从 Reactor 多线程

单 Reactor 单线程

Select 实现应用程序通过一个阻塞对象监听多路连接请求

Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发

如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求

  • 然后创建一个Handler 对象处理连接完成后的后续业务处理

如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应

Handler 会完成 Read ==> 业务处理 ==> Send 的完整业务流程

在这里插入图片描述

服务器端用 一个线程 通过 多路复用 搞定所有的 IO 操作(包括连接,读、写等),编码简单,清晰明了

  • 但是如果客户端连接数量较多,将无法支撑,前面的NIO案例就属于这种模型。

优点:

模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成

缺点:

性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。

  • Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈

可靠性问题,线程意外终止,或者进入死循环

  • 会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障

使用场景:

  • 客户端的数量有限,业务处理非常快速,比如 Redis在业务处理的时间复杂度 O(1) 的情况。

单 Reactor 多线程

Reactor 对象通过 select 监控客户端请求事件, 收到事件后,通过dispatch进行分发

如果建立连接请求, 则右 Acceptor 通过 accept 处理连接请求

  • 然后创建一个Handler对象处理完成连接后的各种事件

如果不是连接请求,则由reactor分发调用连接对应的 handler 来处理

handler 只负责响应事件,不做具体的业务处理,通过read 读取数据后

  • 会分发给后面的worker线程池的某个线程处理业务

worker 线程池 会分配独立线程完成真正的业务,并将结果返回给handler

handler收到响应后,通过send 将结果返回给 client

在这里插入图片描述

优点:

  • 可以充分的利用多核cpu 的处理能力。

缺点:

  • 多线程数据共享和访问比较复杂,reactor 处理所有的事件的监听和响应
  • 在单线程运行, 在高并发场景容易出现性能瓶颈。

主从 Reactor 多线程

针对单 Reactor 多线程模型中,Reactor 在单线程中运行

  • 高并发场景下容易成为性能瓶颈,可以让 Reactor 在多线程中运行。

Reactor主线程 MainReactor 对象通过 select 监听连接事件

  • 收到事件后,通过 Acceptor 处理连接事件

当 Acceptor 处理连接事件后,MainReactor 将连接分配给SubReactor

SubReactor 将连接加入到连接队列进行监听,并创建handler进行各种事件处理

当有新事件发生时,subreactor 就会调用对应的 handler 处理

handler 通过 read 读取数据,分发给后面的worker 线程处理

worker 线程池分配独立的 worker 线程进行业务处理,并返回结果

在这里插入图片描述

优点

  • 父线程与子线程的数据交互简单职责明确

    • 父线程只需要接收新连接,子线程完成后续的业务处理。
  • 父线程与子线程的数据交互简单

    • Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。

缺点:

  • 编程复杂度较高

结合实例:

这种模型在许多项目中广泛使用:

  • 包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持

线程模型

Netty 主要基于 主从 Reactors 多线程模型 做了一定的 改进:

  • 其中主从 Reactor 多线程模型有多个 Reactor。

Netty 抽象出两组线程池:BossGroup 和 WorkerGroup

  • BossGroup 专门负责接收客户端的连接

  • WorkerGroup 专门负责网络的读写

BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup

  • NioEventLoopGroup 相当于一个 事件循环组
    • 这个组中 含有多个事件循环 ,每一个事件循环是 NioEventLoop

NioEventLoop 表示一个不断循环的执行处理任务的线程

  • 每个NioEventLoop 都有一个selector,用于监听绑定在其上的socket的网络通讯。

NioEventLoopGroup 可以有多个线程,即可以含有多个NioEventLoop

每个Boss Group 中的 NioEventLoop 循环执行的步骤:

  • 轮询accept 事件

  • 处理accept 事件,与client建立连接,生成 NioScocketChannel

    • 并将其注册 Worker Group 上的某个 NIOEventLoop 上的 selector
  • 处理任务队列的任务,即 runAllTasks

每个 Worker Group 中的 NIOEventLoop 循环执行的步骤:

  • 轮询 read/write 事件

  • 处理 I/O 事件,即 read/write 事件,在对应的 NioScocketChannel 上处理

  • 处理任务队列的任务,即 runAllTasks

每个Worker NIOEventLoop 处理业务时,会使用 pipeline(管道)。

  • pipline中包含了 channel,即通过pipline可以获取到对应的 channel
    • 并且pipline维护了很多的 handler(处理器)来对我们的数据进行一系列的处理。

handler(处理器) 有Netty内置的,我们也可以自己定义。

在这里插入图片描述

编解码

半包和粘包

半包问题:

指一个完整的应用层消息被分成多个 TCP 数据包发送,接收端在一次读取操作中只接收到消息的一部分

  • 例如,发送端发送了一条 100 字节的消息,但由于网络原因,这条消息被拆分成了两个 TCP 数据包

    • 一个 60 字节,另一个 40 字节
  • 接收端可能在第一次读取时只接收到前 60 字节的数据,剩下的 40 字节需要在后续的读取操作中才能接收到

粘包问题:

粘包问题是指多个应用层消息在传输过程中被粘在一起,接收端在一次读取操作中接收到大于 1个 消息的情况

  • 例如,发送端发送了两条消息,每条 50 字节,但接收端在一次读取操作中收到了 80 字节的数据,超过了 1条 消息的内容

解码器

固定长度解码器:FixedLengthFrameDecoder:

直接通过构造函数设置固定长度的大小 frameLength

  • 无论接收方一次获取多大的数据,都会严格按照 frameLength 进行解码

  • 如果累积读取到长度大小为 frameLength 的消息,那么解码器认为已经获取到了一个完整的消息

如果消息长度小于 frameLength:

  • FixedLengthFrameDecoder 解码器会一直等后续数据包的到达,直至获得完整的消息
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;

    public FixedLengthFrameDecoder(int frameLength) {
        this.frameLength = frameLength;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        while (in.readableBytes() >= frameLength) {
            ByteBuf buf = in.readBytes(frameLength);
            out.add(buf);
        }
    }
}

特殊分隔符解码器:DelimiterBasedFrameDecoder

  • 基于换行符解码器和自定义分隔符解码器(比如 特殊字符)来划分消息边界,从而解决半包和粘包问题
    • 使用者可以根据自己的需求灵活确定分隔符

长度域解码器:LengthFieldBasedFrameDecoder

  • 基于长度字段的解码器是指在消息头部添加长度字段,指示消息的总长度

基本架构

逻辑架构

Netty的逻辑处理架构共分为网络通信层、事件调度层、服务编排层,每一层各司其职。

image-20231030162514048

网络通信层:

网络通信层的职责是执行网络 I/O 的操作。

  • 它支持多种网络协议和 I/O 模型的连接操作。

当网络数据读取到内核缓冲区后,会触发各种网络事件,这些网络事件会分发给事件调度层进行处理。

网络通信层的包含BootStrap、ServerBootStrap、Channel三个组件。

BootStrap & ServerBootStrap

  • Bootstrap:
    • 主要负责整个 Netty 程序的启动、初始化、服务器连接等过程。

Netty中的引导器共分为两种类型:

  • 用于客户端引导的 Bootstrap。
  • 用于服务端引导的 ServerBootStrap,它们都继承自抽象类 AbstractBootstrap。

事件调度层:

通过 Reactor 线程模型对各类事件进行聚合处理

通过 Selector 主循环线程集成多种事件( I/O 事件、信号事件、定时事件等)

  • 实际的业务处理逻辑是交由服务编排层中相关的 Handler 完成。

事件调度层的核心组件包括 EventLoopGroup、EventLoop

  • EventLoopGroup & EventLoop

EventLoopGroup 本质是一个线程池,主要负责接收 I/O 请求,并分配线程执行处理请求。

EventLoopGroup、EventLoop、Channel 的关系:

  • 一个 EventLoopGroup 往往包含一个或者多个 EventLoop。
    • EventLoop 用于处理 Channel 生命周期内的所有 I/O 事件
      • 如 accept、connect、read、write 等 I/O 事件。
  • EventLoop 同一时间会与一个线程绑定,每个 EventLoop 负责处理多个 Channel。
    • 每新建一个 Channel,EventLoopGroup 会选择一个 EventLoop 与其绑定。
    • 该 Channel 在生命周期内都可以对 EventLoop 进行多次绑定和解绑。

EventLoopGroup 的实现类是 NioEventLoopGroup。

可以把 NioEventLoopGroup 理解为一个线程池

  • 每个线程负责处理多个 Channel,而同一个 Channel 只会对应一个线程。

Netty 通过创建不同的 EventLoopGroup 参数配置,可以支持 Reactor 的三种线程模型:

  • 单线程模型
    • EventLoopGroup 只包含一个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup。
  • 多线程模型
    • EventLoopGroup 包含多个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup。
  • 主从多线程模型
    • EventLoopGroup 包含多个 EventLoop,Boss 是主 Reactor,Worker 是从 Reactor
    • 它们分别使用不同的 EventLoopGroup
      • 主 Reactor 负责新的网络连接 Channel 创建,然后把 Channel 注册到从 Reactor。

服务编排层:

服务编排层的职责是负责组装各类服务,它是 Netty 的核心处理链,用以实现网络事件的动态编排和有序传播。

服务编排层的核心组件包括 ChannelPipelineChannelHandler、ChannelHandlerContext

ChannelPipeline

  • 负责组装各种 ChannelHandler,实际数据的编解码以及加工处理操作都是由 ChannelHandler 完成的。

  • ChannelPipeline 可以理解为ChannelHandler 的实例列表:

    • 内部通过双向链表将不同的 ChannelHandler 链接在一起。
  • 当 I/O 读写事件触发时,ChannelPipeline 会依次调用 ChannelHandler 列表对 Channel 的数据进行拦截和处理。

ChannelPipeline 是线程安全的,因为每一个新的 Channel 都会对应绑定一个新的 ChannelPipeline。

一个 ChannelPipeline 关联一个 EventLoop,一个 EventLoop 仅会绑定一个线程。

ChannelHandler & ChannelHandlerContext

  • ChannelHandlerContext 用于保存 ChannelHandler 上下文。

组件关系

服务端启动初始化时有 Boss EventLoopGroup 和 Worker EventLoopGroup 两个组件,其中 Boss 负责监听网络连接事件。

  • 当有新的网络连接事件到达时,则将 Channel 注册到 Worker EventLoopGroup。

Worker EventLoopGroup 会被分配一个 EventLoop 负责处理该 Channel 的读写事件。

  • 每个 EventLoop 都是单线程的,通过 Selector 进行事件循环。

当客户端发起 I/O 读写事件时,服务端 EventLoop 会进行数据的读取

  • 然后通过 Pipeline 触发各种监听器进行数据的加工处理。

客户端数据会被传递到 ChannelPipeline 的第一个 ChannelInboundHandler 中

  • 数据处理完成后,将加工完成的数据传递给下一个 ChannelInboundHandler。

当数据写回客户端时,会将处理结果在 ChannelPipeline 的 ChannelOutboundHandler 中传播,最后到达客户端。

异步模型

Netty 中的 I/O 操作 是 异步 的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture

调用者并不能立刻获得结果,而是通过 Future-Listener 机制

  • 用户可以方便的 主动获取 或者通过 通知机制 获得 IO 操作结果。

Netty 的异步模型是建立在 future 和 callback 的之上的。

  • callback 就是回调。

Future的核心思想是:

假设一个方法 fun,计算过程可能非常耗时,等待 fun返回显然不合适。

那么可以在调用 fun 的时候,立马返回一个 Future

  • 后续可以通过Future去监控方法 fun 的处理过程(即 Future-Listener 机制)

常见操作

通过 isDone 方法来判断当前操作是否完成(注意不是判断完成成功)

通过 isSuccess 方法来判断已完成的当前操作是否成功

通过 getCause 方法来获取已完成的当前操作失败的原因

通过 isCancelled 方法来判断已完成的当前操作是否被取消

通过 addListener 方法来注册监听器

  • 当操作已完成(isDone 方法返回完成),将会通知指定的监听器
  • 如果 Future 对象已完成,则通知指定的监听器。

Future Listener 机制

当 Future 对象刚刚创建时,处于非完成状态

  • 调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作

案例说明

绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑

// 3.绑定端口并且同步,生成一个 ChannelFuture 对象
// 这里服务器就已经启动服务器了
ChannelFuture cf = bootstrap.bind(6668).sync();
// 给 cf 注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (cf.isSuccess()) {
            System.out.println("监听端口 6668 成功");
        } else {
            System.out.println("监听端口 6668 失败");
        }
    }
});

相比传统阻塞 I/O,执行 I/O 操作后线程会被阻塞住,直到操作完成

异步处理的好处是不会造成线程阻塞

  • 线程在 I/O 操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量。

任务队列

在 事件循环(NioEventLoop) 的过程中,会在 pipline 中调用 Handler 来处理业务

假如在某一个 Handler有一个长时间的操作,这必会造成 pipiline 的阻塞

  • 可以将这个耗时的处理提交到 TaskQueue 进行异步的执行。

使用场景

用户程序自定义的普通任务

用户自定义定时任务

非当前 Reactor 线程调用 Channel 的各种方法。

  • 例如在 推送系统 的业务线程里面,根据 用户的标识,找到对应的 Channel 引用

  • 然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。

  • 最终的 Write 会提交到任务队列中后被 异步消费。

代码演示

在 NettyServerHandler 的 channelRead() 方法中模拟一下耗时的业务:

此时客户端必然是等待 5秒 后才能收到消息:

  • 耗时业务执行完毕…;然后再收到消息:hello,客户端…。
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
	    // 比如这里我们有一个非常耗时的业务,我们想让它异步的执行
	    // 只需要将它提交到该 channel 对应的 NioEventLoop 的 TaskQueue 中
	    Thread.sleep(5 * 1000);// 模拟耗时业务
	    // 执行完业务向客户端返回消息
	    ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务执行完毕...",CharsetUtil.UTF_8));
	    System.out.println("go on...");
	}
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
	    // writeAndFlush 是 write + flush
	    // 将数据写入到缓存,并刷新
	    // 一般我们会对这个发送的数据进行编码
	    ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端...",CharsetUtil.UTF_8));
	}
}

解决方案一:用户程序自定义的普通任务

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 比如这里我们有一个非常耗时的业务,我们想让它异步的执行
    // 只需要将它提交到该 channel 对应的 NioEventLoop 的 TaskQueue 中

    // 解决方案1:用户程序自定义的普通程序
    ctx.channel().eventLoop().execute(new Runnable() {
        public void run() {
            try {
                Thread.sleep(5 * 1000);// 模拟耗时业务
                // 执行完业务向客户端返回消息
                ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务执行完毕...",CharsetUtil.UTF_8));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    System.out.println("go on...");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   // writeAndFlush 是 write + flush
   // 将数据写入到缓存,并刷新
   // 一般我们会对这个发送的数据进行编码
   ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端...",CharsetUtil.UTF_8));
}

解决方案二:用户自定义定时任务

  • 这种解决方案任务是提交到 scheduleTaskQueue 中。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 比如这里我们有一个非常耗时的业务,我们想让它异步的执行
    // 只需要将它提交到该 channel 对应的 NioEventLoop 的 TaskQueue 中

    // 解决方案2:用户自定义定时任务,该任务提交到 scheduleTaskQueue 中
    ctx.channel().eventLoop().schedule(new Runnable() {
       @Override
       public void run() {
           try {
               Thread.sleep(5 * 1000);// 模拟耗时业务
               // 执行完业务向客户端返回消息
               ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务执行完毕...",CharsetUtil.UTF_8));
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
    },5, TimeUnit.SECONDS);
    System.out.println("go on...");
}
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者!