搜 索

Netty核心编程

  • 184阅读
  • 2022年07月17日
  • 0评论
首页 / 编程 / 正文

写在前面

作为一个写了多年 CRUD 的后端架构师,我曾经以为 Spring MVC + Tomcat 就是 Web 开发的全部。直到有一天,领导说:"我们要做一个 IM 系统,支持百万长连接。"

我信心满满地说:"没问题,Tomcat 搞起!"

然后...连接数到 1 万的时候,服务器就开始冒烟了。

那是我第一次认真审视 Netty。从此,我的技术世界观被重塑了。

Netty 不是让你写得更少,而是让你理解得更多。 当你真正理解了 Netty 的设计哲学,你会发现它不仅仅是一个网络框架,更是一本分布式系统设计的教科书。

这篇文章,我会用最接地气的方式,带你走进 Netty 的核心世界。


一、Netty 是什么?为什么要用它?

1.1 官方定义(装逼用)

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

翻译成人话:Netty 是一个基于 NIO 的异步事件驱动网络框架,用来快速开发高性能的网络服务器和客户端。

1.2 为什么不直接用 Java NIO?

你可能会问:Java 不是有 NIO 吗?为什么还要 Netty?

让我用一个故事告诉你:

用原生 NIO 写网络程序,就像用汇编写业务代码。 理论上可行,实际上会疯。

原生 NIO 的痛点:

// 这只是冰山一角...
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(8080));
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
    selector.select();
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> iter = keys.iterator();
    while (iter.hasNext()) {
        SelectionKey key = iter.next();
        iter.remove();
        
        if (key.isAcceptable()) {
            // 处理连接...一堆代码
        } else if (key.isReadable()) {
            // 处理读取...又是一堆代码
            // 还要处理半包、粘包...
            // 还要处理断线重连...
            // 还要处理异常...
            // 我已经开始头疼了
        }
    }
}

原生 NIO 的问题

问题描述后果
API 复杂Selector、Channel、Buffer 概念绑定学习曲线陡峭
空轮询 BugLinux epoll 的著名 BugCPU 100%
半包/粘包需要自己处理拆包组包容易出 Bug
异常处理各种边界情况代码量爆炸
线程模型需要自己设计容易写出性能差的代码

而 Netty

// 同样的功能,Netty 版本
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) {
            ch.pipeline()
                .addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4))
                .addLast(new StringDecoder())
                .addLast(new MyBusinessHandler());
        }
    });

bootstrap.bind(8080).sync();

清爽多了吧?半包粘包?一行代码搞定。线程模型?框架帮你设计好了。

1.3 谁在用 Netty?

基本上,你能想到的 Java 高性能中间件,底层都是 Netty:

项目用途备注
DubboRPC 框架阿里出品
gRPC-JavaRPC 框架Google 出品
RocketMQ消息队列阿里出品
Elasticsearch搜索引擎传输层用 Netty
Spark大数据计算Shuffle 用 Netty
Flink流计算网络层用 Netty
Cassandra分布式数据库客户端协议
Zuul 2API 网关Netflix 出品

结论:不学 Netty,你连中间件源码都看不懂。


二、核心概念:先把零件认全

学 Netty 最重要的是先把核心组件搞清楚,它们就像乐高积木,理解了每块积木的用途,才能拼出想要的东西。

2.1 整体架构

graph TB subgraph Application[应用层] Handler1[ChannelHandler 1] Handler2[ChannelHandler 2] Handler3[ChannelHandler N] end subgraph Pipeline[ChannelPipeline] Head[HeadContext] Tail[TailContext] Head --> Handler1 Handler1 --> Handler2 Handler2 --> Handler3 Handler3 --> Tail end subgraph Transport[传输层] Channel[Channel] EventLoop[EventLoop] ByteBuf[ByteBuf] end subgraph Network[网络层] NIO[Java NIO] Epoll[Native Epoll] KQueue[Native KQueue] end Pipeline --> Channel Channel --> EventLoop EventLoop --> NIO EventLoop --> Epoll EventLoop --> KQueue

2.2 Channel:网络连接的抽象

Channel 是 Netty 对网络连接的抽象,代表一个到实体(如硬件设备、文件、网络套接字)的开放连接。

人话:Channel 就是一根网线,数据从这里进,也从这里出。

// Channel 的核心方法
Channel channel = ...;

// 获取远端地址
SocketAddress remoteAddress = channel.remoteAddress();

// 写数据(异步)
ChannelFuture future = channel.writeAndFlush("Hello");

// 关闭连接
channel.close();

// 判断是否活跃
boolean active = channel.isActive();

// 获取 Pipeline
ChannelPipeline pipeline = channel.pipeline();

常用 Channel 实现

类型类名用途
NIO ServerNioServerSocketChannel服务端监听
NIO ClientNioSocketChannel客户端连接
Epoll ServerEpollServerSocketChannelLinux 高性能服务端
Epoll ClientEpollSocketChannelLinux 高性能客户端
LocalLocalChannel进程内通信
EmbeddedEmbeddedChannel单元测试

2.3 EventLoop:事件循环引擎

EventLoop 是 Netty 的核心引擎,负责处理 Channel 上的所有 I/O 事件。

人话:EventLoop 就是一个死循环的线程,不停地检查"有没有新数据?有没有新连接?"

graph LR subgraph EventLoop[EventLoop 工作循环] A[检查 I/O 事件] --> B[处理 I/O 事件] B --> C[执行任务队列] C --> A end subgraph Events[事件类型] E1[Accept 新连接] E2[Read 数据到达] E3[Write 可写] E4[Connect 连接完成] end subgraph Tasks[任务类型] T1[普通任务] T2[定时任务] T3[用户提交的任务] end Events --> A Tasks --> C

EventLoopGroup:一组 EventLoop 的集合。

// Boss Group:负责接收新连接,通常 1 个线程就够
EventLoopGroup bossGroup = new NioEventLoopGroup(1);

// Worker Group:负责处理 I/O 读写,通常是 CPU 核心数 * 2
EventLoopGroup workerGroup = new NioEventLoopGroup();

// 或者使用 Linux 原生 Epoll(性能更好)
EventLoopGroup epollGroup = new EpollEventLoopGroup();

EventLoop 与 Channel 的关系

graph TB subgraph EventLoopGroup[EventLoopGroup] EL1[EventLoop 1] EL2[EventLoop 2] EL3[EventLoop 3] end subgraph Channels[Channels] C1[Channel A] C2[Channel B] C3[Channel C] C4[Channel D] C5[Channel E] C6[Channel F] end EL1 --> C1 EL1 --> C2 EL2 --> C3 EL2 --> C4 EL3 --> C5 EL3 --> C6

关键原则

  • 一个 Channel 在其生命周期内只绑定一个 EventLoop
  • 一个 EventLoop 可以处理多个 Channel
  • 所有 I/O 操作都在 EventLoop 线程中执行(线程安全)

2.4 ChannelHandler:业务逻辑处理器

ChannelHandler 是你写业务逻辑的地方,处理入站和出站事件。

人话:Handler 就是流水线上的工人,每个工人负责一道工序。

graph LR subgraph Inbound[入站方向 - 数据进来] I1[ByteToMessageDecoder] --> I2[MessageToMessageDecoder] I2 --> I3[SimpleChannelInboundHandler] end subgraph Outbound[出站方向 - 数据出去] O1[业务 Handler] --> O2[MessageToByteEncoder] O2 --> O3[写入 Socket] end

两种 Handler 类型

// 入站处理器:处理读取的数据
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("连接建立:" + ctx.channel().remoteAddress());
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("收到数据:" + msg);
        // 传递给下一个 Handler
        ctx.fireChannelRead(msg);
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        System.out.println("连接断开:" + ctx.channel().remoteAddress());
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

// 出站处理器:处理写出的数据
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.out.println("写出数据:" + msg);
        // 传递给下一个 Handler
        ctx.write(msg, promise);
    }
}

2.5 ChannelPipeline:处理器链

ChannelPipeline 是 ChannelHandler 的容器,按顺序组织多个 Handler。

人话:Pipeline 就是流水线,Handler 是流水线上的工位。

graph LR subgraph Pipeline[ChannelPipeline] Head[Head] --> Dec1[LengthDecoder] Dec1 --> Dec2[ProtobufDecoder] Dec2 --> Biz[BusinessHandler] Biz --> Enc1[ProtobufEncoder] Enc1 --> Enc2[LengthEncoder] Enc2 --> Tail[Tail] end Socket[Socket] --> Head Tail --> Socket style Head fill:#f9f,stroke:#333 style Tail fill:#f9f,stroke:#333
// 构建 Pipeline
ChannelPipeline pipeline = channel.pipeline();

// 添加 Handler
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("handler", new MyBusinessHandler());

// 动态添加/删除
pipeline.addFirst("logger", new LoggingHandler());
pipeline.remove("logger");
pipeline.replace("handler", "newHandler", new NewBusinessHandler());

事件传播方向

事件类型传播方向Handler 类型常见事件
入站事件Head → TailChannelInboundHandlerchannelRead, channelActive
出站事件Tail → HeadChannelOutboundHandlerwrite, flush, close

2.6 ByteBuf:高性能缓冲区

ByteBuf 是 Netty 的数据容器,比 Java NIO 的 ByteBuffer 好用 100 倍。

人话:ByteBuf 就是一个字节数组的包装,但是加了很多便利功能。

为什么不用 ByteBuffer?

特性ByteBufferByteBuf
读写切换需要 flip()自动分离读写指针
扩容不支持自动扩容
池化不支持支持池化复用
组合不支持CompositeByteBuf
引用计数不支持支持,避免内存泄漏
// 创建 ByteBuf
ByteBuf buf = Unpooled.buffer(256);          // 堆内存
ByteBuf buf = Unpooled.directBuffer(256);    // 直接内存
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(256);  // 池化

// 写入数据
buf.writeInt(100);
buf.writeLong(System.currentTimeMillis());
buf.writeBytes("Hello".getBytes());

// 读取数据
int value = buf.readInt();
long timestamp = buf.readLong();
byte[] bytes = new byte[5];
buf.readBytes(bytes);

// 读写指针
int readerIndex = buf.readerIndex();  // 读指针位置
int writerIndex = buf.writerIndex();  // 写指针位置
int readableBytes = buf.readableBytes();  // 可读字节数

// 标记和重置
buf.markReaderIndex();
buf.resetReaderIndex();

// 引用计数(重要!)
buf.retain();   // 增加引用
buf.release();  // 减少引用,为 0 时释放内存

ByteBuf 内存结构

+-------------------+------------------+------------------+
| discardable bytes |  readable bytes  |  writable bytes  |
|                   |     (CONTENT)    |                  |
+-------------------+------------------+------------------+
|                   |                  |                  |
0      <=      readerIndex   <=   writerIndex    <=    capacity

三、线程模型:Reactor 模式

Netty 的高性能秘密武器之一就是它的线程模型。理解了这个,你就理解了 Netty 的灵魂。

3.1 传统 BIO 模型

graph TB subgraph BIOModel[BIO 模型 - 一连接一线程] Client1[Client 1] --> Thread1[Thread 1] Client2[Client 2] --> Thread2[Thread 2] Client3[Client 3] --> Thread3[Thread 3] ClientN[Client N] --> ThreadN[Thread N] end Problem[问题: 线程数 = 连接数, 万级连接就炸了]

3.2 单 Reactor 单线程

graph TB subgraph SingleReactor[单 Reactor 单线程] Clients[所有 Clients] --> Reactor[Reactor] Reactor --> Accept[处理 Accept] Reactor --> Read[处理 Read] Reactor --> Write[处理 Write] Reactor --> Business[业务处理] end Problem[问题: 业务处理阻塞会影响 I/O]
// 单 Reactor 单线程配置
EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)  // 同一个 group 处理连接和 I/O
    .channel(NioServerSocketChannel.class)
    .childHandler(new MyHandler());

3.3 单 Reactor 多线程

graph TB subgraph SingleReactorMT[单 Reactor 多线程] Clients[所有 Clients] --> Reactor[Reactor] Reactor --> Accept[处理 Accept] Reactor --> Read[处理 Read] Reactor --> Write[处理 Write] subgraph ThreadPool[业务线程池] T1[Worker 1] T2[Worker 2] T3[Worker N] end Read --> ThreadPool end Problem[问题: 单 Reactor 成为瓶颈]

3.4 主从 Reactor 多线程(Netty 默认)

这是 Netty 的默认模型,也是最推荐的模型:

graph TB subgraph MainSubReactor[主从 Reactor 多线程] Clients[Clients] --> MainReactor[Main Reactor - Boss] MainReactor --> SubReactor1[Sub Reactor 1] MainReactor --> SubReactor2[Sub Reactor 2] MainReactor --> SubReactor3[Sub Reactor N] SubReactor1 --> Handler1[Handler] SubReactor2 --> Handler2[Handler] SubReactor3 --> Handler3[Handler] subgraph BossGroup[Boss Group - 1个线程] MainReactor end subgraph WorkerGroup[Worker Group - N个线程] SubReactor1 SubReactor2 SubReactor3 end end
// 主从 Reactor 配置(推荐)
EventLoopGroup bossGroup = new NioEventLoopGroup(1);      // 接收连接
EventLoopGroup workerGroup = new NioEventLoopGroup();     // 处理 I/O

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)  // 分开配置
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) {
            ch.pipeline().addLast(new MyHandler());
        }
    });

为什么这个模型最优?

  1. Boss 专注接收:一个线程专门处理 Accept,不会被 I/O 阻塞
  2. Worker 并行处理:多个线程并行处理 I/O,充分利用多核
  3. 线程绑定:一个 Channel 绑定一个 EventLoop,无锁化设计

四、编解码器:半包粘包终结者

4.1 什么是半包粘包?

TCP 是流式协议,不保证消息边界。这导致两个问题:

发送端发送:[Hello][World][!]
接收端可能收到:[Hel][loWorld!]  ← 这就是半包和粘包
graph LR subgraph Send[发送端] S1[Packet 1] S2[Packet 2] S3[Packet 3] end subgraph TCP[TCP 传输] direction LR T1[可能合并] T2[可能拆分] end subgraph Recv[接收端] R1[Packet 1 + 半个 2] R2[半个 2 + Packet 3] end Send --> TCP --> Recv

4.2 解决方案

Netty 提供了多种开箱即用的解码器:

方案一:固定长度

// 每条消息固定 100 字节
pipeline.addLast(new FixedLengthFrameDecoder(100));

方案二:分隔符

// 以 \r\n 作为分隔符
pipeline.addLast(new DelimiterBasedFrameDecoder(
    1024,  // 最大长度
    Delimiters.lineDelimiter()  // 分隔符
));

// 简化版:按行分割
pipeline.addLast(new LineBasedFrameDecoder(1024));

方案三:长度字段(最常用)

// 消息格式:[4字节长度][消息内容]
pipeline.addLast(new LengthFieldBasedFrameDecoder(
    65535,  // maxFrameLength: 最大帧长度
    0,      // lengthFieldOffset: 长度字段偏移量
    4,      // lengthFieldLength: 长度字段本身的长度
    0,      // lengthAdjustment: 长度调整值
    4       // initialBytesToStrip: 跳过的字节数(去掉长度字段)
));

// 编码端
pipeline.addLast(new LengthFieldPrepender(4));  // 自动添加 4 字节长度头

LengthFieldBasedFrameDecoder 参数详解

消息格式示例:
+--------+----------------+
| Length |    Content     |
| 4 bytes|   N bytes      |
+--------+----------------+

lengthFieldOffset = 0     // 长度字段从第 0 字节开始
lengthFieldLength = 4     // 长度字段占 4 字节
lengthAdjustment = 0      // 长度值就是内容长度,无需调整
initialBytesToStrip = 4   // 解码后去掉长度字段,只保留内容

4.3 自定义编解码器

// 自定义消息对象
@Data
public class MyMessage {
    private int type;
    private String content;
}

// 编码器:对象 → 字节
public class MyMessageEncoder extends MessageToByteEncoder<MyMessage> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MyMessage msg, ByteBuf out) {
        byte[] contentBytes = msg.getContent().getBytes(StandardCharsets.UTF_8);
        
        // 写入消息类型
        out.writeInt(msg.getType());
        // 写入内容长度
        out.writeInt(contentBytes.length);
        // 写入内容
        out.writeBytes(contentBytes);
    }
}

// 解码器:字节 → 对象
public class MyMessageDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 检查可读字节数
        if (in.readableBytes() < 8) {  // type(4) + length(4)
            return;  // 数据不够,等待更多数据
        }
        
        in.markReaderIndex();
        
        int type = in.readInt();
        int length = in.readInt();
        
        if (in.readableBytes() < length) {
            in.resetReaderIndex();  // 数据不够,重置读指针
            return;
        }
        
        byte[] contentBytes = new byte[length];
        in.readBytes(contentBytes);
        
        MyMessage message = new MyMessage();
        message.setType(type);
        message.setContent(new String(contentBytes, StandardCharsets.UTF_8));
        
        out.add(message);
    }
}

4.4 常用编解码器

编解码器用途
StringEncoder/Decoder字符串编解码
ObjectEncoder/DecoderJava 对象序列化(不推荐)
ProtobufEncoder/DecoderProtobuf 编解码
JsonObjectDecoderJSON 编解码
HttpRequestEncoder/DecoderHTTP 协议
WebSocketFrameEncoder/DecoderWebSocket 协议

五、实战代码:从零构建 RPC 框架

5.1 项目结构

netty-rpc/
├── rpc-common/           # 公共模块
│   ├── RpcRequest.java
│   ├── RpcResponse.java
│   └── RpcCodec.java
├── rpc-server/           # 服务端
│   ├── RpcServer.java
│   └── RpcServerHandler.java
└── rpc-client/           # 客户端
    ├── RpcClient.java
    └── RpcClientHandler.java

5.2 公共模块

// RpcRequest.java - RPC 请求对象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest implements Serializable {
    private String requestId;
    private String className;
    private String methodName;
    private Class<?>[] parameterTypes;
    private Object[] parameters;
}

// RpcResponse.java - RPC 响应对象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcResponse implements Serializable {
    private String requestId;
    private Object result;
    private Throwable error;
    
    public boolean hasError() {
        return error != null;
    }
}
// RpcEncoder.java - 通用编码器
public class RpcEncoder extends MessageToByteEncoder<Object> {
    
    private final Class<?> genericClass;
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    public RpcEncoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }
    
    @Override
    protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (genericClass.isInstance(in)) {
            byte[] data = objectMapper.writeValueAsBytes(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}

// RpcDecoder.java - 通用解码器
public class RpcDecoder extends ByteToMessageDecoder {
    
    private final Class<?> genericClass;
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    public RpcDecoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        
        in.markReaderIndex();
        int dataLength = in.readInt();
        
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        
        byte[] data = new byte[dataLength];
        in.readBytes(data);
        
        Object obj = objectMapper.readValue(data, genericClass);
        out.add(obj);
    }
}

5.3 服务端实现

// RpcServer.java
@Slf4j
public class RpcServer {
    
    private final int port;
    private final Map<String, Object> serviceRegistry = new ConcurrentHashMap<>();
    
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    
    public RpcServer(int port) {
        this.port = port;
    }
    
    /**
     * 注册服务
     */
    public void registerService(String serviceName, Object serviceImpl) {
        serviceRegistry.put(serviceName, serviceImpl);
        log.info("注册服务: {} -> {}", serviceName, serviceImpl.getClass().getName());
    }
    
    /**
     * 启动服务器
     */
    public void start() throws InterruptedException {
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();
        
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        
                        // 心跳检测:60秒没有读事件就触发
                        pipeline.addLast(new IdleStateHandler(60, 0, 0));
                        
                        // 编解码
                        pipeline.addLast(new RpcDecoder(RpcRequest.class));
                        pipeline.addLast(new RpcEncoder(RpcResponse.class));
                        
                        // 业务处理
                        pipeline.addLast(new RpcServerHandler(serviceRegistry));
                    }
                });
            
            ChannelFuture future = bootstrap.bind(port).sync();
            log.info("RPC Server 启动成功,监听端口: {}", port);
            
            future.channel().closeFuture().sync();
        } finally {
            shutdown();
        }
    }
    
    public void shutdown() {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
    }
}

// RpcServerHandler.java
@Slf4j
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
    
    private final Map<String, Object> serviceRegistry;
    
    public RpcServerHandler(Map<String, Object> serviceRegistry) {
        this.serviceRegistry = serviceRegistry;
    }
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) {
        log.info("收到 RPC 请求: {}.{}", request.getClassName(), request.getMethodName());
        
        RpcResponse response = new RpcResponse();
        response.setRequestId(request.getRequestId());
        
        try {
            Object result = handleRequest(request);
            response.setResult(result);
        } catch (Exception e) {
            log.error("处理 RPC 请求失败", e);
            response.setError(e);
        }
        
        ctx.writeAndFlush(response);
    }
    
    private Object handleRequest(RpcRequest request) throws Exception {
        // 获取服务实例
        Object service = serviceRegistry.get(request.getClassName());
        if (service == null) {
            throw new RuntimeException("服务未找到: " + request.getClassName());
        }
        
        // 反射调用方法
        Class<?> serviceClass = service.getClass();
        Method method = serviceClass.getMethod(
            request.getMethodName(), 
            request.getParameterTypes()
        );
        
        return method.invoke(service, request.getParameters());
    }
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            log.warn("连接空闲超时,关闭连接: {}", ctx.channel().remoteAddress());
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("处理异常", cause);
        ctx.close();
    }
}

5.4 客户端实现

// RpcClient.java
@Slf4j
public class RpcClient {
    
    private final String host;
    private final int port;
    
    private EventLoopGroup group;
    private Channel channel;
    
    private final Map<String, CompletableFuture<RpcResponse>> pendingRequests = 
        new ConcurrentHashMap<>();
    
    public RpcClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    
    public void connect() throws InterruptedException {
        group = new NioEventLoopGroup();
        
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    
                    // 心跳:30秒发送一次心跳
                    pipeline.addLast(new IdleStateHandler(0, 30, 0));
                    
                    // 编解码
                    pipeline.addLast(new RpcEncoder(RpcRequest.class));
                    pipeline.addLast(new RpcDecoder(RpcResponse.class));
                    
                    // 业务处理
                    pipeline.addLast(new RpcClientHandler(pendingRequests));
                }
            });
        
        ChannelFuture future = bootstrap.connect(host, port).sync();
        this.channel = future.channel();
        log.info("连接 RPC Server 成功: {}:{}", host, port);
    }
    
    /**
     * 同步调用
     */
    public Object invoke(String className, String methodName, 
                         Class<?>[] parameterTypes, Object[] parameters) 
            throws Exception {
        
        RpcRequest request = new RpcRequest();
        request.setRequestId(UUID.randomUUID().toString());
        request.setClassName(className);
        request.setMethodName(methodName);
        request.setParameterTypes(parameterTypes);
        request.setParameters(parameters);
        
        // 创建 Future 等待响应
        CompletableFuture<RpcResponse> future = new CompletableFuture<>();
        pendingRequests.put(request.getRequestId(), future);
        
        // 发送请求
        channel.writeAndFlush(request);
        
        // 等待响应(超时 10 秒)
        RpcResponse response = future.get(10, TimeUnit.SECONDS);
        
        if (response.hasError()) {
            throw new RuntimeException(response.getError());
        }
        
        return response.getResult();
    }
    
    /**
     * 创建代理对象
     */
    @SuppressWarnings("unchecked")
    public <T> T createProxy(Class<T> interfaceClass) {
        return (T) Proxy.newProxyInstance(
            interfaceClass.getClassLoader(),
            new Class[]{interfaceClass},
            (proxy, method, args) -> invoke(
                interfaceClass.getName(),
                method.getName(),
                method.getParameterTypes(),
                args
            )
        );
    }
    
    public void close() {
        if (channel != null) {
            channel.close();
        }
        if (group != null) {
            group.shutdownGracefully();
        }
    }
}

// RpcClientHandler.java
@Slf4j
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
    
    private final Map<String, CompletableFuture<RpcResponse>> pendingRequests;
    
    public RpcClientHandler(Map<String, CompletableFuture<RpcResponse>> pendingRequests) {
        this.pendingRequests = pendingRequests;
    }
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) {
        String requestId = response.getRequestId();
        CompletableFuture<RpcResponse> future = pendingRequests.remove(requestId);
        
        if (future != null) {
            future.complete(response);
        } else {
            log.warn("收到未知响应: {}", requestId);
        }
    }
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            // 发送心跳
            RpcRequest heartbeat = new RpcRequest();
            heartbeat.setRequestId("HEARTBEAT");
            ctx.writeAndFlush(heartbeat);
            log.debug("发送心跳");
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("客户端异常", cause);
        ctx.close();
    }
}

5.5 使用示例

// 定义服务接口
public interface HelloService {
    String sayHello(String name);
    int add(int a, int b);
}

// 服务实现
public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHello(String name) {
        return "Hello, " + name + "!";
    }
    
    @Override
    public int add(int a, int b) {
        return a + b;
    }
}

// 启动服务端
public class ServerMain {
    public static void main(String[] args) throws Exception {
        RpcServer server = new RpcServer(8080);
        server.registerService(HelloService.class.getName(), new HelloServiceImpl());
        server.start();
    }
}

// 启动客户端
public class ClientMain {
    public static void main(String[] args) throws Exception {
        RpcClient client = new RpcClient("localhost", 8080);
        client.connect();
        
        // 创建代理
        HelloService helloService = client.createProxy(HelloService.class);
        
        // 调用远程方法(就像调用本地方法一样)
        String result = helloService.sayHello("Netty");
        System.out.println(result);  // Hello, Netty!
        
        int sum = helloService.add(10, 20);
        System.out.println(sum);  // 30
        
        client.close();
    }
}

六、性能优化:榨干每一滴性能

6.1 内存优化

使用池化 ByteBuf

// 开启池化(Netty 4.1+ 默认开启)
System.setProperty("io.netty.allocator.type", "pooled");

// 或在代码中使用
ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
ByteBuf buf = allocator.buffer(1024);

使用直接内存

// 直接内存减少一次数据拷贝
ByteBuf directBuf = Unpooled.directBuffer(1024);

// 配置 Channel 默认使用直接内存
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

及时释放 ByteBuf

// 手动释放
ByteBuf buf = ...;
try {
    // 使用 buf
} finally {
    buf.release();  // 必须释放!
}

// 使用 ReferenceCountUtil
ReferenceCountUtil.release(msg);

// 使用 SimpleChannelInboundHandler(自动释放)
public class MyHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
        // msg 会被自动释放
    }
}

6.2 线程优化

合理配置线程数

// Boss 线程:通常 1 个就够
EventLoopGroup bossGroup = new NioEventLoopGroup(1);

// Worker 线程:CPU 核心数 * 2
int workerThreads = Runtime.getRuntime().availableProcessors() * 2;
EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreads);

// 业务线程池:处理耗时操作
ExecutorService businessExecutor = new ThreadPoolExecutor(
    16, 32, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

避免阻塞 EventLoop

// 错误示范:在 EventLoop 中执行耗时操作
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 这会阻塞 EventLoop,影响其他 Channel!
    Thread.sleep(1000);
    database.query(...);  // 同步数据库操作
}

// 正确做法:提交到业务线程池
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    businessExecutor.submit(() -> {
        Object result = database.query(...);
        ctx.writeAndFlush(result);  // 写回 EventLoop 线程
    });
}

// 或者使用 Netty 的 EventExecutorGroup
EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(16);
pipeline.addLast(businessGroup, "businessHandler", new BusinessHandler());

6.3 网络参数优化

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    
    // 服务端 Socket 参数
    .option(ChannelOption.SO_BACKLOG, 1024)          // 连接队列大小
    .option(ChannelOption.SO_REUSEADDR, true)        // 允许端口重用
    
    // 客户端 Socket 参数
    .childOption(ChannelOption.SO_KEEPALIVE, true)   // 开启 TCP 保活
    .childOption(ChannelOption.TCP_NODELAY, true)    // 关闭 Nagle 算法
    .childOption(ChannelOption.SO_SNDBUF, 65535)     // 发送缓冲区
    .childOption(ChannelOption.SO_RCVBUF, 65535)     // 接收缓冲区
    
    // Netty 参数
    .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
        new WriteBufferWaterMark(32 * 1024, 64 * 1024))  // 写缓冲区水位
    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

6.4 Linux 平台优化

// 使用 Epoll(仅限 Linux)
if (Epoll.isAvailable()) {
    bossGroup = new EpollEventLoopGroup(1);
    workerGroup = new EpollEventLoopGroup();
    bootstrap.channel(EpollServerSocketChannel.class);
    
    // Epoll 特有参数
    bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);  // 端口复用
} else {
    bossGroup = new NioEventLoopGroup(1);
    workerGroup = new NioEventLoopGroup();
    bootstrap.channel(NioServerSocketChannel.class);
}

Linux 系统参数调优

# /etc/sysctl.conf

# 增加文件描述符限制
fs.file-max = 1000000

# TCP 连接队列
net.core.somaxconn = 65535
net.ipv4.tcp_max_syn_backlog = 65535

# TCP 缓冲区
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 87380 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216

# TIME_WAIT 优化
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_fin_timeout = 30

# 应用配置
sysctl -p

七、常见问题与最佳实践

7.1 内存泄漏排查

开启泄漏检测

// 启动参数
-Dio.netty.leakDetection.level=PARANOID

// 或代码中设置
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);

常见泄漏场景

// 场景1:忘记释放 ByteBuf
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf buf = (ByteBuf) msg;
    // 处理数据...
    // 忘记 buf.release()!
}

// 场景2:异常分支未释放
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf buf = (ByteBuf) msg;
    try {
        if (someCondition) {
            return;  // 提前返回,未释放!
        }
        // 正常处理
    } finally {
        buf.release();  // 应该在 finally 中释放
    }
}

// 场景3:fire 后又 release
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.fireChannelRead(msg);  // 传递给下游
    ((ByteBuf) msg).release();  // 重复释放!下游会出问题
}

7.2 连接管理

// 连接池管理
public class ChannelPool {
    private final Queue<Channel> pool = new ConcurrentLinkedQueue<>();
    private final Bootstrap bootstrap;
    private final int maxSize;
    
    public Channel acquire() throws InterruptedException {
        Channel channel = pool.poll();
        if (channel != null && channel.isActive()) {
            return channel;
        }
        return bootstrap.connect().sync().channel();
    }
    
    public void release(Channel channel) {
        if (channel.isActive() && pool.size() < maxSize) {
            pool.offer(channel);
        } else {
            channel.close();
        }
    }
}

// 或使用 Netty 内置的连接池
ChannelPool pool = new FixedChannelPool(
    bootstrap,
    new AbstractChannelPoolHandler() {
        @Override
        public void channelCreated(Channel ch) {
            ch.pipeline().addLast(new MyHandler());
        }
    },
    10  // 最大连接数
);

7.3 优雅关闭

public class GracefulShutdown {
    
    private final EventLoopGroup bossGroup;
    private final EventLoopGroup workerGroup;
    private final Channel serverChannel;
    
    public void shutdown() {
        // 1. 停止接收新连接
        if (serverChannel != null) {
            serverChannel.close().syncUninterruptibly();
        }
        
        // 2. 等待现有请求处理完成
        try {
            Thread.sleep(5000);  // 等待 5 秒
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        // 3. 关闭线程池
        Future<?> bossShutdown = bossGroup.shutdownGracefully(1, 10, TimeUnit.SECONDS);
        Future<?> workerShutdown = workerGroup.shutdownGracefully(1, 10, TimeUnit.SECONDS);
        
        try {
            bossShutdown.sync();
            workerShutdown.sync();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 注册 JVM 关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    server.shutdown();
}));

7.4 最佳实践清单

✅ DO:
- 使用池化的 ByteBufAllocator
- 及时释放 ByteBuf(或使用 SimpleChannelInboundHandler)
- 耗时操作提交到业务线程池
- 使用 IdleStateHandler 做心跳检测
- Linux 环境使用 Epoll
- 实现优雅关闭

❌ DON'T:
- 在 EventLoop 线程中执行阻塞操作
- 忘记处理异常(exceptionCaught)
- 在 Handler 中存储状态(除非标注 @Sharable)
- 创建大量短连接
- 忽略内存泄漏检测日志

八、Netty 在主流框架中的应用

graph TB subgraph RPC[RPC 框架] Dubbo[Dubbo] gRPC[gRPC-Java] Motan[Motan] end subgraph MQ[消息队列] RocketMQ[RocketMQ] Pulsar[Pulsar] end subgraph DB[数据库驱动] Cassandra[Cassandra] Redis[Lettuce] MySQL[R2DBC] end subgraph Gateway[网关] Zuul2[Zuul 2] SCG[Spring Cloud Gateway] end subgraph BigData[大数据] Spark[Spark] Flink[Flink] ES[Elasticsearch] end Netty[Netty] --> RPC Netty --> MQ Netty --> DB Netty --> Gateway Netty --> BigData

8.1 Dubbo 中的 Netty

// Dubbo 默认使用 Netty 4
<dubbo:protocol name="dubbo" port="20880" server="netty4" />

// 源码中的 NettyServer
public class NettyServer extends AbstractServer {
    private ServerBootstrap bootstrap;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    
    @Override
    protected void doOpen() {
        bootstrap = new ServerBootstrap();
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup(
            getUrl().getPositiveParameter("iothreads", Constants.DEFAULT_IO_THREADS));
        
        bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    // Dubbo 协议编解码
                    ch.pipeline()
                        .addLast("decoder", new DubboCodec())
                        .addLast("handler", new NettyServerHandler());
                }
            });
    }
}

8.2 Spring WebFlux 中的 Netty

// Spring WebFlux 默认使用 Netty
@SpringBootApplication
public class ReactiveApplication {
    public static void main(String[] args) {
        SpringApplication.run(ReactiveApplication.class, args);
    }
}

// 自定义 Netty 配置
@Bean
public NettyReactiveWebServerFactory nettyFactory() {
    NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
    factory.addServerCustomizers(server -> 
        server.option(ChannelOption.SO_BACKLOG, 1024)
              .childOption(ChannelOption.TCP_NODELAY, true)
    );
    return factory;
}

九、总结:Netty 知识图谱

graph TB subgraph Core[核心组件] Channel[Channel] EventLoop[EventLoop] Pipeline[ChannelPipeline] Handler[ChannelHandler] ByteBuf[ByteBuf] end subgraph Threading[线程模型] Reactor[Reactor 模式] Boss[Boss Group] Worker[Worker Group] end subgraph Codec[编解码] Decoder[解码器] Encoder[编码器] LengthField[长度字段拆包] end subgraph Protocol[协议支持] HTTP[HTTP/HTTP2] WebSocket[WebSocket] Custom[自定义协议] end subgraph Optimize[性能优化] Pool[内存池化] Direct[直接内存] Epoll[Native Epoll] end Core --> Threading Core --> Codec Codec --> Protocol Threading --> Optimize

结语

写到这里,我想说的是:Netty 确实有一定的学习曲线,但它的设计哲学值得每个后端工程师学习。

它教会我们:

  • 如何设计高性能的线程模型
  • 如何优雅地处理异步编程
  • 如何用组合模式构建灵活的处理链
  • 如何在性能和易用性之间取得平衡

当你真正理解了 Netty,再去看 Dubbo、gRPC、RocketMQ 的源码,你会发现:原来它们都是 Netty 的变体

最后送你一句话:学习 Netty,不是为了用它写业务代码,而是为了理解高性能网络编程的本质。

Happy Coding!


参考资源

评论区
暂无评论
avatar