写在前面
作为一个写了多年 CRUD 的后端架构师,我曾经以为 Spring MVC + Tomcat 就是 Web 开发的全部。直到有一天,领导说:"我们要做一个 IM 系统,支持百万长连接。"
我信心满满地说:"没问题,Tomcat 搞起!"
然后...连接数到 1 万的时候,服务器就开始冒烟了。
那是我第一次认真审视 Netty。从此,我的技术世界观被重塑了。
Netty 不是让你写得更少,而是让你理解得更多。 当你真正理解了 Netty 的设计哲学,你会发现它不仅仅是一个网络框架,更是一本分布式系统设计的教科书。
这篇文章,我会用最接地气的方式,带你走进 Netty 的核心世界。
一、Netty 是什么?为什么要用它?
1.1 官方定义(装逼用)
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
翻译成人话:Netty 是一个基于 NIO 的异步事件驱动网络框架,用来快速开发高性能的网络服务器和客户端。
1.2 为什么不直接用 Java NIO?
你可能会问:Java 不是有 NIO 吗?为什么还要 Netty?
让我用一个故事告诉你:
用原生 NIO 写网络程序,就像用汇编写业务代码。 理论上可行,实际上会疯。
原生 NIO 的痛点:
// 这只是冰山一角...
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(8080));
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
// 处理连接...一堆代码
} else if (key.isReadable()) {
// 处理读取...又是一堆代码
// 还要处理半包、粘包...
// 还要处理断线重连...
// 还要处理异常...
// 我已经开始头疼了
}
}
}原生 NIO 的问题:
| 问题 | 描述 | 后果 |
|---|---|---|
| API 复杂 | Selector、Channel、Buffer 概念绑定 | 学习曲线陡峭 |
| 空轮询 Bug | Linux epoll 的著名 Bug | CPU 100% |
| 半包/粘包 | 需要自己处理拆包组包 | 容易出 Bug |
| 异常处理 | 各种边界情况 | 代码量爆炸 |
| 线程模型 | 需要自己设计 | 容易写出性能差的代码 |
而 Netty:
// 同样的功能,Netty 版本
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4))
.addLast(new StringDecoder())
.addLast(new MyBusinessHandler());
}
});
bootstrap.bind(8080).sync();清爽多了吧?半包粘包?一行代码搞定。线程模型?框架帮你设计好了。
1.3 谁在用 Netty?
基本上,你能想到的 Java 高性能中间件,底层都是 Netty:
| 项目 | 用途 | 备注 |
|---|---|---|
| Dubbo | RPC 框架 | 阿里出品 |
| gRPC-Java | RPC 框架 | Google 出品 |
| RocketMQ | 消息队列 | 阿里出品 |
| Elasticsearch | 搜索引擎 | 传输层用 Netty |
| Spark | 大数据计算 | Shuffle 用 Netty |
| Flink | 流计算 | 网络层用 Netty |
| Cassandra | 分布式数据库 | 客户端协议 |
| Zuul 2 | API 网关 | Netflix 出品 |
结论:不学 Netty,你连中间件源码都看不懂。
二、核心概念:先把零件认全
学 Netty 最重要的是先把核心组件搞清楚,它们就像乐高积木,理解了每块积木的用途,才能拼出想要的东西。
2.1 整体架构
2.2 Channel:网络连接的抽象
Channel 是 Netty 对网络连接的抽象,代表一个到实体(如硬件设备、文件、网络套接字)的开放连接。
人话:Channel 就是一根网线,数据从这里进,也从这里出。
// Channel 的核心方法
Channel channel = ...;
// 获取远端地址
SocketAddress remoteAddress = channel.remoteAddress();
// 写数据(异步)
ChannelFuture future = channel.writeAndFlush("Hello");
// 关闭连接
channel.close();
// 判断是否活跃
boolean active = channel.isActive();
// 获取 Pipeline
ChannelPipeline pipeline = channel.pipeline();常用 Channel 实现:
| 类型 | 类名 | 用途 |
|---|---|---|
| NIO Server | NioServerSocketChannel | 服务端监听 |
| NIO Client | NioSocketChannel | 客户端连接 |
| Epoll Server | EpollServerSocketChannel | Linux 高性能服务端 |
| Epoll Client | EpollSocketChannel | Linux 高性能客户端 |
| Local | LocalChannel | 进程内通信 |
| Embedded | EmbeddedChannel | 单元测试 |
2.3 EventLoop:事件循环引擎
EventLoop 是 Netty 的核心引擎,负责处理 Channel 上的所有 I/O 事件。
人话:EventLoop 就是一个死循环的线程,不停地检查"有没有新数据?有没有新连接?"
EventLoopGroup:一组 EventLoop 的集合。
// Boss Group:负责接收新连接,通常 1 个线程就够
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// Worker Group:负责处理 I/O 读写,通常是 CPU 核心数 * 2
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 或者使用 Linux 原生 Epoll(性能更好)
EventLoopGroup epollGroup = new EpollEventLoopGroup();EventLoop 与 Channel 的关系:
关键原则:
- 一个 Channel 在其生命周期内只绑定一个 EventLoop
- 一个 EventLoop 可以处理多个 Channel
- 所有 I/O 操作都在 EventLoop 线程中执行(线程安全)
2.4 ChannelHandler:业务逻辑处理器
ChannelHandler 是你写业务逻辑的地方,处理入站和出站事件。
人话:Handler 就是流水线上的工人,每个工人负责一道工序。
两种 Handler 类型:
// 入站处理器:处理读取的数据
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("连接建立:" + ctx.channel().remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("收到数据:" + msg);
// 传递给下一个 Handler
ctx.fireChannelRead(msg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("连接断开:" + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
// 出站处理器:处理写出的数据
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println("写出数据:" + msg);
// 传递给下一个 Handler
ctx.write(msg, promise);
}
}2.5 ChannelPipeline:处理器链
ChannelPipeline 是 ChannelHandler 的容器,按顺序组织多个 Handler。
人话:Pipeline 就是流水线,Handler 是流水线上的工位。
// 构建 Pipeline
ChannelPipeline pipeline = channel.pipeline();
// 添加 Handler
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("handler", new MyBusinessHandler());
// 动态添加/删除
pipeline.addFirst("logger", new LoggingHandler());
pipeline.remove("logger");
pipeline.replace("handler", "newHandler", new NewBusinessHandler());事件传播方向:
| 事件类型 | 传播方向 | Handler 类型 | 常见事件 |
|---|---|---|---|
| 入站事件 | Head → Tail | ChannelInboundHandler | channelRead, channelActive |
| 出站事件 | Tail → Head | ChannelOutboundHandler | write, flush, close |
2.6 ByteBuf:高性能缓冲区
ByteBuf 是 Netty 的数据容器,比 Java NIO 的 ByteBuffer 好用 100 倍。
人话:ByteBuf 就是一个字节数组的包装,但是加了很多便利功能。
为什么不用 ByteBuffer?
| 特性 | ByteBuffer | ByteBuf |
|---|---|---|
| 读写切换 | 需要 flip() | 自动分离读写指针 |
| 扩容 | 不支持 | 自动扩容 |
| 池化 | 不支持 | 支持池化复用 |
| 组合 | 不支持 | CompositeByteBuf |
| 引用计数 | 不支持 | 支持,避免内存泄漏 |
// 创建 ByteBuf
ByteBuf buf = Unpooled.buffer(256); // 堆内存
ByteBuf buf = Unpooled.directBuffer(256); // 直接内存
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(256); // 池化
// 写入数据
buf.writeInt(100);
buf.writeLong(System.currentTimeMillis());
buf.writeBytes("Hello".getBytes());
// 读取数据
int value = buf.readInt();
long timestamp = buf.readLong();
byte[] bytes = new byte[5];
buf.readBytes(bytes);
// 读写指针
int readerIndex = buf.readerIndex(); // 读指针位置
int writerIndex = buf.writerIndex(); // 写指针位置
int readableBytes = buf.readableBytes(); // 可读字节数
// 标记和重置
buf.markReaderIndex();
buf.resetReaderIndex();
// 引用计数(重要!)
buf.retain(); // 增加引用
buf.release(); // 减少引用,为 0 时释放内存ByteBuf 内存结构:
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity三、线程模型:Reactor 模式
Netty 的高性能秘密武器之一就是它的线程模型。理解了这个,你就理解了 Netty 的灵魂。
3.1 传统 BIO 模型
3.2 单 Reactor 单线程
// 单 Reactor 单线程配置
EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group) // 同一个 group 处理连接和 I/O
.channel(NioServerSocketChannel.class)
.childHandler(new MyHandler());3.3 单 Reactor 多线程
3.4 主从 Reactor 多线程(Netty 默认)
这是 Netty 的默认模型,也是最推荐的模型:
// 主从 Reactor 配置(推荐)
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 接收连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理 I/O
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) // 分开配置
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new MyHandler());
}
});为什么这个模型最优?
- Boss 专注接收:一个线程专门处理 Accept,不会被 I/O 阻塞
- Worker 并行处理:多个线程并行处理 I/O,充分利用多核
- 线程绑定:一个 Channel 绑定一个 EventLoop,无锁化设计
四、编解码器:半包粘包终结者
4.1 什么是半包粘包?
TCP 是流式协议,不保证消息边界。这导致两个问题:
发送端发送:[Hello][World][!]
接收端可能收到:[Hel][loWorld!] ← 这就是半包和粘包4.2 解决方案
Netty 提供了多种开箱即用的解码器:
方案一:固定长度
// 每条消息固定 100 字节
pipeline.addLast(new FixedLengthFrameDecoder(100));方案二:分隔符
// 以 \r\n 作为分隔符
pipeline.addLast(new DelimiterBasedFrameDecoder(
1024, // 最大长度
Delimiters.lineDelimiter() // 分隔符
));
// 简化版:按行分割
pipeline.addLast(new LineBasedFrameDecoder(1024));方案三:长度字段(最常用)
// 消息格式:[4字节长度][消息内容]
pipeline.addLast(new LengthFieldBasedFrameDecoder(
65535, // maxFrameLength: 最大帧长度
0, // lengthFieldOffset: 长度字段偏移量
4, // lengthFieldLength: 长度字段本身的长度
0, // lengthAdjustment: 长度调整值
4 // initialBytesToStrip: 跳过的字节数(去掉长度字段)
));
// 编码端
pipeline.addLast(new LengthFieldPrepender(4)); // 自动添加 4 字节长度头LengthFieldBasedFrameDecoder 参数详解:
消息格式示例:
+--------+----------------+
| Length | Content |
| 4 bytes| N bytes |
+--------+----------------+
lengthFieldOffset = 0 // 长度字段从第 0 字节开始
lengthFieldLength = 4 // 长度字段占 4 字节
lengthAdjustment = 0 // 长度值就是内容长度,无需调整
initialBytesToStrip = 4 // 解码后去掉长度字段,只保留内容4.3 自定义编解码器
// 自定义消息对象
@Data
public class MyMessage {
private int type;
private String content;
}
// 编码器:对象 → 字节
public class MyMessageEncoder extends MessageToByteEncoder<MyMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, MyMessage msg, ByteBuf out) {
byte[] contentBytes = msg.getContent().getBytes(StandardCharsets.UTF_8);
// 写入消息类型
out.writeInt(msg.getType());
// 写入内容长度
out.writeInt(contentBytes.length);
// 写入内容
out.writeBytes(contentBytes);
}
}
// 解码器:字节 → 对象
public class MyMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 检查可读字节数
if (in.readableBytes() < 8) { // type(4) + length(4)
return; // 数据不够,等待更多数据
}
in.markReaderIndex();
int type = in.readInt();
int length = in.readInt();
if (in.readableBytes() < length) {
in.resetReaderIndex(); // 数据不够,重置读指针
return;
}
byte[] contentBytes = new byte[length];
in.readBytes(contentBytes);
MyMessage message = new MyMessage();
message.setType(type);
message.setContent(new String(contentBytes, StandardCharsets.UTF_8));
out.add(message);
}
}4.4 常用编解码器
| 编解码器 | 用途 |
|---|---|
| StringEncoder/Decoder | 字符串编解码 |
| ObjectEncoder/Decoder | Java 对象序列化(不推荐) |
| ProtobufEncoder/Decoder | Protobuf 编解码 |
| JsonObjectDecoder | JSON 编解码 |
| HttpRequestEncoder/Decoder | HTTP 协议 |
| WebSocketFrameEncoder/Decoder | WebSocket 协议 |
五、实战代码:从零构建 RPC 框架
5.1 项目结构
netty-rpc/
├── rpc-common/ # 公共模块
│ ├── RpcRequest.java
│ ├── RpcResponse.java
│ └── RpcCodec.java
├── rpc-server/ # 服务端
│ ├── RpcServer.java
│ └── RpcServerHandler.java
└── rpc-client/ # 客户端
├── RpcClient.java
└── RpcClientHandler.java5.2 公共模块
// RpcRequest.java - RPC 请求对象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest implements Serializable {
private String requestId;
private String className;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] parameters;
}
// RpcResponse.java - RPC 响应对象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcResponse implements Serializable {
private String requestId;
private Object result;
private Throwable error;
public boolean hasError() {
return error != null;
}
}// RpcEncoder.java - 通用编码器
public class RpcEncoder extends MessageToByteEncoder<Object> {
private final Class<?> genericClass;
private final ObjectMapper objectMapper = new ObjectMapper();
public RpcEncoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
if (genericClass.isInstance(in)) {
byte[] data = objectMapper.writeValueAsBytes(in);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}
// RpcDecoder.java - 通用解码器
public class RpcDecoder extends ByteToMessageDecoder {
private final Class<?> genericClass;
private final ObjectMapper objectMapper = new ObjectMapper();
public RpcDecoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = objectMapper.readValue(data, genericClass);
out.add(obj);
}
}5.3 服务端实现
// RpcServer.java
@Slf4j
public class RpcServer {
private final int port;
private final Map<String, Object> serviceRegistry = new ConcurrentHashMap<>();
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public RpcServer(int port) {
this.port = port;
}
/**
* 注册服务
*/
public void registerService(String serviceName, Object serviceImpl) {
serviceRegistry.put(serviceName, serviceImpl);
log.info("注册服务: {} -> {}", serviceName, serviceImpl.getClass().getName());
}
/**
* 启动服务器
*/
public void start() throws InterruptedException {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 心跳检测:60秒没有读事件就触发
pipeline.addLast(new IdleStateHandler(60, 0, 0));
// 编解码
pipeline.addLast(new RpcDecoder(RpcRequest.class));
pipeline.addLast(new RpcEncoder(RpcResponse.class));
// 业务处理
pipeline.addLast(new RpcServerHandler(serviceRegistry));
}
});
ChannelFuture future = bootstrap.bind(port).sync();
log.info("RPC Server 启动成功,监听端口: {}", port);
future.channel().closeFuture().sync();
} finally {
shutdown();
}
}
public void shutdown() {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
// RpcServerHandler.java
@Slf4j
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private final Map<String, Object> serviceRegistry;
public RpcServerHandler(Map<String, Object> serviceRegistry) {
this.serviceRegistry = serviceRegistry;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) {
log.info("收到 RPC 请求: {}.{}", request.getClassName(), request.getMethodName());
RpcResponse response = new RpcResponse();
response.setRequestId(request.getRequestId());
try {
Object result = handleRequest(request);
response.setResult(result);
} catch (Exception e) {
log.error("处理 RPC 请求失败", e);
response.setError(e);
}
ctx.writeAndFlush(response);
}
private Object handleRequest(RpcRequest request) throws Exception {
// 获取服务实例
Object service = serviceRegistry.get(request.getClassName());
if (service == null) {
throw new RuntimeException("服务未找到: " + request.getClassName());
}
// 反射调用方法
Class<?> serviceClass = service.getClass();
Method method = serviceClass.getMethod(
request.getMethodName(),
request.getParameterTypes()
);
return method.invoke(service, request.getParameters());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
log.warn("连接空闲超时,关闭连接: {}", ctx.channel().remoteAddress());
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("处理异常", cause);
ctx.close();
}
}5.4 客户端实现
// RpcClient.java
@Slf4j
public class RpcClient {
private final String host;
private final int port;
private EventLoopGroup group;
private Channel channel;
private final Map<String, CompletableFuture<RpcResponse>> pendingRequests =
new ConcurrentHashMap<>();
public RpcClient(String host, int port) {
this.host = host;
this.port = port;
}
public void connect() throws InterruptedException {
group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 心跳:30秒发送一次心跳
pipeline.addLast(new IdleStateHandler(0, 30, 0));
// 编解码
pipeline.addLast(new RpcEncoder(RpcRequest.class));
pipeline.addLast(new RpcDecoder(RpcResponse.class));
// 业务处理
pipeline.addLast(new RpcClientHandler(pendingRequests));
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
this.channel = future.channel();
log.info("连接 RPC Server 成功: {}:{}", host, port);
}
/**
* 同步调用
*/
public Object invoke(String className, String methodName,
Class<?>[] parameterTypes, Object[] parameters)
throws Exception {
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(className);
request.setMethodName(methodName);
request.setParameterTypes(parameterTypes);
request.setParameters(parameters);
// 创建 Future 等待响应
CompletableFuture<RpcResponse> future = new CompletableFuture<>();
pendingRequests.put(request.getRequestId(), future);
// 发送请求
channel.writeAndFlush(request);
// 等待响应(超时 10 秒)
RpcResponse response = future.get(10, TimeUnit.SECONDS);
if (response.hasError()) {
throw new RuntimeException(response.getError());
}
return response.getResult();
}
/**
* 创建代理对象
*/
@SuppressWarnings("unchecked")
public <T> T createProxy(Class<T> interfaceClass) {
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class[]{interfaceClass},
(proxy, method, args) -> invoke(
interfaceClass.getName(),
method.getName(),
method.getParameterTypes(),
args
)
);
}
public void close() {
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
}
// RpcClientHandler.java
@Slf4j
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
private final Map<String, CompletableFuture<RpcResponse>> pendingRequests;
public RpcClientHandler(Map<String, CompletableFuture<RpcResponse>> pendingRequests) {
this.pendingRequests = pendingRequests;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) {
String requestId = response.getRequestId();
CompletableFuture<RpcResponse> future = pendingRequests.remove(requestId);
if (future != null) {
future.complete(response);
} else {
log.warn("收到未知响应: {}", requestId);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
// 发送心跳
RpcRequest heartbeat = new RpcRequest();
heartbeat.setRequestId("HEARTBEAT");
ctx.writeAndFlush(heartbeat);
log.debug("发送心跳");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("客户端异常", cause);
ctx.close();
}
}5.5 使用示例
// 定义服务接口
public interface HelloService {
String sayHello(String name);
int add(int a, int b);
}
// 服务实现
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String name) {
return "Hello, " + name + "!";
}
@Override
public int add(int a, int b) {
return a + b;
}
}
// 启动服务端
public class ServerMain {
public static void main(String[] args) throws Exception {
RpcServer server = new RpcServer(8080);
server.registerService(HelloService.class.getName(), new HelloServiceImpl());
server.start();
}
}
// 启动客户端
public class ClientMain {
public static void main(String[] args) throws Exception {
RpcClient client = new RpcClient("localhost", 8080);
client.connect();
// 创建代理
HelloService helloService = client.createProxy(HelloService.class);
// 调用远程方法(就像调用本地方法一样)
String result = helloService.sayHello("Netty");
System.out.println(result); // Hello, Netty!
int sum = helloService.add(10, 20);
System.out.println(sum); // 30
client.close();
}
}六、性能优化:榨干每一滴性能
6.1 内存优化
使用池化 ByteBuf
// 开启池化(Netty 4.1+ 默认开启)
System.setProperty("io.netty.allocator.type", "pooled");
// 或在代码中使用
ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
ByteBuf buf = allocator.buffer(1024);使用直接内存
// 直接内存减少一次数据拷贝
ByteBuf directBuf = Unpooled.directBuffer(1024);
// 配置 Channel 默认使用直接内存
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);及时释放 ByteBuf
// 手动释放
ByteBuf buf = ...;
try {
// 使用 buf
} finally {
buf.release(); // 必须释放!
}
// 使用 ReferenceCountUtil
ReferenceCountUtil.release(msg);
// 使用 SimpleChannelInboundHandler(自动释放)
public class MyHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
// msg 会被自动释放
}
}6.2 线程优化
合理配置线程数
// Boss 线程:通常 1 个就够
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// Worker 线程:CPU 核心数 * 2
int workerThreads = Runtime.getRuntime().availableProcessors() * 2;
EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreads);
// 业务线程池:处理耗时操作
ExecutorService businessExecutor = new ThreadPoolExecutor(
16, 32, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);避免阻塞 EventLoop
// 错误示范:在 EventLoop 中执行耗时操作
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这会阻塞 EventLoop,影响其他 Channel!
Thread.sleep(1000);
database.query(...); // 同步数据库操作
}
// 正确做法:提交到业务线程池
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
businessExecutor.submit(() -> {
Object result = database.query(...);
ctx.writeAndFlush(result); // 写回 EventLoop 线程
});
}
// 或者使用 Netty 的 EventExecutorGroup
EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(16);
pipeline.addLast(businessGroup, "businessHandler", new BusinessHandler());6.3 网络参数优化
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 服务端 Socket 参数
.option(ChannelOption.SO_BACKLOG, 1024) // 连接队列大小
.option(ChannelOption.SO_REUSEADDR, true) // 允许端口重用
// 客户端 Socket 参数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 开启 TCP 保活
.childOption(ChannelOption.TCP_NODELAY, true) // 关闭 Nagle 算法
.childOption(ChannelOption.SO_SNDBUF, 65535) // 发送缓冲区
.childOption(ChannelOption.SO_RCVBUF, 65535) // 接收缓冲区
// Netty 参数
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(32 * 1024, 64 * 1024)) // 写缓冲区水位
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);6.4 Linux 平台优化
// 使用 Epoll(仅限 Linux)
if (Epoll.isAvailable()) {
bossGroup = new EpollEventLoopGroup(1);
workerGroup = new EpollEventLoopGroup();
bootstrap.channel(EpollServerSocketChannel.class);
// Epoll 特有参数
bootstrap.option(EpollChannelOption.SO_REUSEPORT, true); // 端口复用
} else {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
bootstrap.channel(NioServerSocketChannel.class);
}Linux 系统参数调优:
# /etc/sysctl.conf
# 增加文件描述符限制
fs.file-max = 1000000
# TCP 连接队列
net.core.somaxconn = 65535
net.ipv4.tcp_max_syn_backlog = 65535
# TCP 缓冲区
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 87380 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216
# TIME_WAIT 优化
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_fin_timeout = 30
# 应用配置
sysctl -p七、常见问题与最佳实践
7.1 内存泄漏排查
开启泄漏检测:
// 启动参数
-Dio.netty.leakDetection.level=PARANOID
// 或代码中设置
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);常见泄漏场景:
// 场景1:忘记释放 ByteBuf
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
// 处理数据...
// 忘记 buf.release()!
}
// 场景2:异常分支未释放
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
try {
if (someCondition) {
return; // 提前返回,未释放!
}
// 正常处理
} finally {
buf.release(); // 应该在 finally 中释放
}
}
// 场景3:fire 后又 release
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg); // 传递给下游
((ByteBuf) msg).release(); // 重复释放!下游会出问题
}7.2 连接管理
// 连接池管理
public class ChannelPool {
private final Queue<Channel> pool = new ConcurrentLinkedQueue<>();
private final Bootstrap bootstrap;
private final int maxSize;
public Channel acquire() throws InterruptedException {
Channel channel = pool.poll();
if (channel != null && channel.isActive()) {
return channel;
}
return bootstrap.connect().sync().channel();
}
public void release(Channel channel) {
if (channel.isActive() && pool.size() < maxSize) {
pool.offer(channel);
} else {
channel.close();
}
}
}
// 或使用 Netty 内置的连接池
ChannelPool pool = new FixedChannelPool(
bootstrap,
new AbstractChannelPoolHandler() {
@Override
public void channelCreated(Channel ch) {
ch.pipeline().addLast(new MyHandler());
}
},
10 // 最大连接数
);7.3 优雅关闭
public class GracefulShutdown {
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final Channel serverChannel;
public void shutdown() {
// 1. 停止接收新连接
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
// 2. 等待现有请求处理完成
try {
Thread.sleep(5000); // 等待 5 秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 3. 关闭线程池
Future<?> bossShutdown = bossGroup.shutdownGracefully(1, 10, TimeUnit.SECONDS);
Future<?> workerShutdown = workerGroup.shutdownGracefully(1, 10, TimeUnit.SECONDS);
try {
bossShutdown.sync();
workerShutdown.sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 注册 JVM 关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
server.shutdown();
}));7.4 最佳实践清单
✅ DO:
- 使用池化的 ByteBufAllocator
- 及时释放 ByteBuf(或使用 SimpleChannelInboundHandler)
- 耗时操作提交到业务线程池
- 使用 IdleStateHandler 做心跳检测
- Linux 环境使用 Epoll
- 实现优雅关闭
❌ DON'T:
- 在 EventLoop 线程中执行阻塞操作
- 忘记处理异常(exceptionCaught)
- 在 Handler 中存储状态(除非标注 @Sharable)
- 创建大量短连接
- 忽略内存泄漏检测日志八、Netty 在主流框架中的应用
8.1 Dubbo 中的 Netty
// Dubbo 默认使用 Netty 4
<dubbo:protocol name="dubbo" port="20880" server="netty4" />
// 源码中的 NettyServer
public class NettyServer extends AbstractServer {
private ServerBootstrap bootstrap;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
@Override
protected void doOpen() {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup(
getUrl().getPositiveParameter("iothreads", Constants.DEFAULT_IO_THREADS));
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// Dubbo 协议编解码
ch.pipeline()
.addLast("decoder", new DubboCodec())
.addLast("handler", new NettyServerHandler());
}
});
}
}8.2 Spring WebFlux 中的 Netty
// Spring WebFlux 默认使用 Netty
@SpringBootApplication
public class ReactiveApplication {
public static void main(String[] args) {
SpringApplication.run(ReactiveApplication.class, args);
}
}
// 自定义 Netty 配置
@Bean
public NettyReactiveWebServerFactory nettyFactory() {
NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
factory.addServerCustomizers(server ->
server.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.TCP_NODELAY, true)
);
return factory;
}九、总结:Netty 知识图谱
结语
写到这里,我想说的是:Netty 确实有一定的学习曲线,但它的设计哲学值得每个后端工程师学习。
它教会我们:
- 如何设计高性能的线程模型
- 如何优雅地处理异步编程
- 如何用组合模式构建灵活的处理链
- 如何在性能和易用性之间取得平衡
当你真正理解了 Netty,再去看 Dubbo、gRPC、RocketMQ 的源码,你会发现:原来它们都是 Netty 的变体。
最后送你一句话:学习 Netty,不是为了用它写业务代码,而是为了理解高性能网络编程的本质。
Happy Coding!