- Netty是什么?
- Netty核心组件
- 组件协作流程
- 实战代码Demo
Netty是什么?
(抄的)Netty是一个高性能、异步的网络应用程序框架,可以轻松地开发基于TCP、UDP和HTTP等协议的网络应用程序。它是基于Java NIO技术实现的,具有较高的性能和可扩展性。Netty不仅可以用于开发网络客户端和服务器端,还可以用于开发其他类型的网络应用程序,如网络代理、网关和中间件等。
Netty的主要作用是为开发人员提供一个高效、可靠和可扩展的网络通信框架,从而降低网络应用程序的开发难度和维护成本。它提供了一系列的编解码器、处理器和协议支持,使得开发人员可以更加专注于业务逻辑的实现,而不必关心底层网络通信细节。
Netty核心组件
Channel(通道)
- 作用:抽象了网络连接(如 Socket),提供统一的 API 操作(读、写、绑定、连接等),支持多种传输类型(NIO、Epoll、OIO 等)。
- 关键实现:
NioSocketChannel(TCP 客户端)、NioServerSocketChannel(TCP 服务端)。
EventLoop & EventLoopGroup(事件循环和线程组)
- EventLoop:
- 每个
EventLoop绑定一个线程,负责处理多个Channel的 I/O 事件和异步任务。 - 遵循 单线程模型,确保线程安全。
- 每个
- EventLoopGroup:
- 管理一组
EventLoop,通常分为bossGroup(接收连接)和workerGroup(处理 I/O)。 - 示例:
NioEventLoopGroup。
- 管理一组
ChannelHandler & ChannelPipeline(处理器和处理链)
- ChannelHandler:
- 处理入站/出站事件和数据,用户自定义逻辑的核心扩展点。
- 分类:
ChannelInboundHandler(处理连接建立、数据读取等入站事件)。ChannelOutboundHandler(处理连接关闭、数据写入等出站操作)。ChannelDuplexHandler(继承以上两个类,出、入站都会处理)。ChannelInitializer(继承ChannelInboundHandler,在连接建立时初始化)。
- ChannelPipeline:
- 由多个
ChannelHandler组成的责任链,数据按顺序流经各个处理器。 - 支持动态增删处理器。
- 由多个
ByteBuf(数据容器)
- 作用:替代 Java NIO 的
ByteBuffer,提供更高效灵活的数据存储(如池化内存、零拷贝优化)。 - 特性:支持读写索引分离、引用计数、复合缓冲区等。
Bootstrap & ServerBootstrap(启动引导类)
- Bootstrap:
- 客户端启动类,配置线程组、Channel 类型、处理器等。
- ServerBootstrap:
- 服务端启动类,额外绑定端口并管理
bossGroup和workerGroup。
- 服务端启动类,额外绑定端口并管理
ChannelFuture(异步结果)
- 作用:表示异步 I/O 操作的结果(如连接、写入),通过添加监听器 (
ChannelFutureListener) 实现回调逻辑。 - 示例:
channel.writeAndFlush(data).addListener(...)。
编解码器(Codec)
- 作用:将原始字节流与业务对象相互转换(如 HTTP 协议解析、Protobuf 序列化)。
- 常见实现:
MessageToByteEncoder(编码器)、ByteToMessageDecoder(解码器)。- 内置编解码器:
StringEncoder/Decoder、ObjectEncoder/Decoder等。
ChannelHandlerContext(处理器上下文)
- 作用:关联
ChannelHandler与ChannelPipeline,提供方法触发事件传播(如ctx.write())或获取 Channel 信息。
组件协作流程
- 通过
Bootstrap、ServerBootstrap配置并启动服务端、客户端。 ChannelInitializer添加连接建立时初始化内容。- 通过添加
ChannelInboundHandler、ChannelOutboundHandler配置数据出、入站处理。 - 操作
ByteBuf数据类或ChannelFuture异步回调控制业务逻辑。
实战代码Demo
Spring boot 添加Netty包
<!-- 通过注解,添加语法糖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- json解析 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.54</version>
</dependency>
<!-- NIO框架,主要用于IM实时通讯 -->
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.118.Final</version>
</dependency>
<!-- 如果使用 webflux 默认集成Netty,就不需要上面的 netty-all 包 -->
<!-- 使用 webflux 响应式流(Reactive Streams)默认通过 Netty 启动排除 Tomcat -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
自定义配置项,方便bootstrap.yml配置
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "netty")
@Data
public class NettyProperties {
/**
* boss线程数量
*/
private Integer bossThread = 1;
/**
* worker线程数量
*/
private Integer workerThread = 1;
/**
* 连接超时时间
*/
private Integer timeout = 30000;
/**
* 服务器主端口
*/
private Integer port = 9000;
/**
* 服务器地址 默认为本地
*/
// private String host = "127.0.0.1";
}
编写消息类,用于传输数据
目前暂定只有默认String数据类,ChatMessagsIM聊天数据类(可以自行添加model进行编写序列化代码)
import com.alibaba.fastjson2.JSONObject;
import com.jagger.model.ChatMessage;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
public class Message {
// 消息类型,占用 2 字节
private short type;
// 消息长度,占用 4 字节
private int length;
// 消息内容
private byte[] body;
/***
* {@link Object} 转 {@link Message} 类型
* @return {@link Message}
*/
public static Message objectToMessage(Object body) {
Message message = new Message();
message.setType(Message.MessageType.getType(body).getValue());
if (message.getType() == Message.MessageType.STRING.getValue()) {
message.setBody(((String) body).getBytes());
} else {
String bodyJsonStr = JSONObject.toJSONString(body);
message.setBody(bodyJsonStr.getBytes());
message.setLength(message.getBody().length);
}
return message;
}
/***
* {@link Message} 转 {@link Object} 类型
* @return {@link Object}
*/
public static Object messageToObject(Message message) {
String bodyStr = new String(message.getBody());
if (message.getType() == Message.MessageType.STRING.getValue()) {
return bodyStr;
} else if (message.getType() == MessageType.CHAT_MESSAGE.getValue()) {
return JSONObject.parseObject(bodyStr, ChatMessage.class);
} else {
throw new RuntimeException("Message 转 Object 错误,未定义对象类型。");
}
}
/***
* {@link Message} 转 {@link Object} 类型
* @return {@link Object}
*/
public Object toObject() {
return messageToObject(this);
}
/***
* 消息类型
*/
@Getter
public enum MessageType {
STRING((short) 0),
CHAT_MESSAGE((short) 1); // 聊天消息
private final short value;
private MessageType(short value) {
this.value = value;
}
public static MessageType getType(Object o) {
if (o instanceof String) {
return STRING;
} else if (o instanceof ChatMessage) {
return CHAT_MESSAGE;
} else {
throw new RuntimeException("不存在消息类型: " + o);
}
}
}
}
定义数据出、入站处理类
- 入站类数据处理,继承实现
ChannelInboundHandlerAdapterMessageDecodeHandlerMessage数据解码,继承实现ByteToMessageDecoderBusinessHandler业务数据处理
- 出站类数据处理,继承实现
ChannelOutboundHandlerAdapterObjectToMessageHandler对象封装,Object转MessageMessageEncoderHandler数据编码PushHandler数据推送前最后操作
- 出、入站数据处理统一处理,继承实现
ChannelDuplexHandlerExceptionHandler异常数据处理
MessageDecodeHandlerMessage数据解码
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/***
* 消息解码器
*/
@Slf4j
public class MessageDecodeHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
log.debug("开始解码");
Message message = new Message();
message.setType(byteBuf.readShort());
//如果byteBuf剩下的长度还有大于4个字节,说明body不为空
if (byteBuf.readableBytes() > 4) {
message.setLength(byteBuf.readInt());
byte[] contents = new byte[message.getLength()];
byteBuf.readBytes(contents, 0, message.getLength());
message.setBody(contents);
list.add(message);
log.debug("成功解码,接收消息类型: {}", message.getType());
} else {
log.warn("解码失败: 消息内容为空");
}
}
}
BusinessHandler业务数据处理
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
/***
* 业务处理
*/
@Slf4j
public class BusinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("开始业务处理");
log.debug("业务数据: {}", ((Message) msg).toObject());
}
}
ObjectToMessageHandler对象封装
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ObjectToMessageHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("分析数据类型");
super.write(ctx, Message.objectToMessage(msg), promise);
}
}
MessageEncoderHandlerMessage数据编码
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;
/***
* 消息编码器
*/
@Slf4j
public class MessageEncoderHandler extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
log.debug("开始编码");
byteBuf.writeShort(message.getType());
if (message.getBody() != null) {
byteBuf.writeInt(message.getBody().length);
byteBuf.writeBytes(message.getBody());
}
log.debug("成功编码");
}
}
PushHandler数据推送前最后操作
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import lombok.extern.slf4j.Slf4j;
/***
* 推送消息时处理
*/
@Slf4j
public class PushHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("分析数据类型");
super.write(ctx, msg, promise);
}
}
ExceptionHandler异常数据处理
/***
* ChannelDuplexHandler 它同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口
* ChannelInboundHandler 所有异常会再最后统一处理
* ChannelOutboundHandler 所有异常需要通过 addListener 监听事件才能处理,需要放在第一位先添加监听
* ExceptionHandler 作为全局异常处理,放在最后正好 in 最后处理,out 最先处理
*
*/
@Slf4j
public class ExceptionHandler extends ChannelDuplexHandler {
/***
* 接收数据异常处理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("接收消息处理异常: ", cause);
}
/***
* 出站数据异常处理
* @param ctx
* @param msg
* @param promise
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.write(msg, promise.addListener(future -> {
if (!future.isSuccess()) {
// 处理异常
log.error("推送消息异常", future.cause());
}
}));
}
}
编写服务端类NettyServer,服务端连接初始化类NettyServerInit
初始化添加Handler实现要注意顺序,ChannelInboundHandler先添加先调用,ChannelOutboundHandler后添加先调用。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class NettyServer {
private final NettyProperties nettyProperties;
private EventLoopGroup bossGroup; // 负责处理连接请求的线程组(NIO 事件循环组)
private EventLoopGroup workerGroup; // 负责处理 I/O 事件的线程组(NIO 事件循环组)
private ChannelFuture channelFuture; // 代表异步操作的结果
public void start() throws InterruptedException {
log.info("NettyServer 启动");
bossGroup = new NioEventLoopGroup(nettyProperties.getBossThread()); // 创建 boss 线程组,用于处理连接请求
workerGroup = new NioEventLoopGroup(nettyProperties.getWorkerThread()); // 创建 worker 线程组,用于处理 I/O 事件
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
// 设置线程组
.group(bossGroup, workerGroup)
// 设置为NIO模式
.channel(NioServerSocketChannel.class)
// 设置TCP sync队列大小, 防止洪泛攻击
.childOption(ChannelOption.SO_BACKLOG, 1024)
// 设置初始化类
.childHandler(new NettyServerInit());
channelFuture = serverBootstrap.bind(nettyProperties.getPort()).addListener(future -> {
if (future.isSuccess()) {
log.info("Netty 服务启动,端口: {}", nettyProperties.getPort());
} else {
log.error("启动失败,请检查端口是否被占用: {}", nettyProperties.getPort(), future.cause());
}
}).sync();
}
/***
* 停止服务
*/
public void stop() {
channelFuture.channel().close();
log.info("Netty 服务端停止");
}
/**
* 通过 Channel 发送数据
*
* @param body 文本数据
*/
public void send(Object body) {
log.debug("服务端发送数据");
channelFuture.channel().writeAndFlush(body);
}
/***
* 初始化类
*/
private static class NettyServerInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
log.info("新 Netty 连接: {}", socketChannel.remoteAddress());
// ChannelInboundHandlerAdapter 输入数据Handler实现调用顺序,先 addLast() 先处理
socketChannel.pipeline()
.addLast(new MessageDecodeHandler()) // 数据解码
.addLast(new BusinessHandler()) // 业务处理
;
// ChannelOutboundHandlerAdapter 输出数据Handler实现调用顺序, 后 addLast() 先处理
socketChannel.pipeline()
.addLast(new PushHandler()) // 数据推送前操作
.addLast(new MessageEncoderHandler()) // 数据编码
.addLast(new ObjectToMessageHandler()) // 数据编码
;
// ChannelDuplexHandler 它同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口
// ChannelInboundHandler 所有异常会再最后统一处理
// ChannelOutboundHandler 所有异常需要通过 addListener 监听事件才能处理,需要放在第一位先添加监听
// ExceptionHandler 作为全局异常处理,放在最后正好 in 最后处理,out 最先处理
socketChannel.pipeline().addLast(new ExceptionHandler()); // 消息接收异常
}
}
}
编写客户端类NettyClient,服务端连接初始化类NettyClientInit
注意点同服务端。
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class NettyClient {
private final NettyProperties nettyProperties;
private ChannelFuture channelFuture;
Bootstrap bootstrap;
public void start() throws InterruptedException {
log.info("NettyClient 启动");
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(nettyProperties.getWorkerThread());
bootstrap = new Bootstrap();
bootstrap.group(eventExecutors) // 指定线程组
.option(ChannelOption.SO_KEEPALIVE, true)
.channel(NioSocketChannel.class) // 指定通道
.handler(new NettyClientInit()); // 指定处理器
channelFuture = bootstrap.connect("127.0.0.1", nettyProperties.getPort()).addListener(future -> {
log.info("客户端启动成功,并监听端口:{} ", nettyProperties.getPort());
});
}
/**
* 客户端通过 Channel 对象向服务器端发送数据
*
* @param body 文本数据
*/
public void send(Object body) {
log.debug("客户端发送数据");
channelFuture.channel().writeAndFlush(body);
}
/***
* 停止服务
*/
public void stop() {
channelFuture.channel().close();
log.info("Netty 客户端服务停止");
}
/***
* 客户端初始化
*/
public static class NettyClientInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
log.debug("初始化 Netty 客户端");
socketChannel.pipeline()
.addLast(new MessageDecodeHandler()) // 数据解码
.addLast(new ClientHandler()) // 客户端handler
;
socketChannel.pipeline()
.addLast(new PushHandler()) // 数据推送前操作
.addLast(new MessageEncoderHandler()) // 数据编码
.addLast(new ObjectToMessageHandler()) // 对象转Message
;
socketChannel.pipeline().addLast(new ExceptionHandler()); // 消息接收异常
}
}
}
编写ImRunner实现ApplicationRunner
在Spring boot启动时统一启动服务端、客户端,这里客户端、服务端都统一放到一个项目里好方便测试
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class ImRunner implements ApplicationRunner {
private final NettyServer nettyServer;
private final NettyClient nettyClient;
@Override
public void run(ApplicationArguments args) throws Exception {
log.debug("启动IM服务");
nettyServer.start();
log.debug("启动 netty 客户端");
nettyClient.start();
}
@PreDestroy
public void destroy() {
log.debug("退出IM服务");
nettyServer.stop();
nettyClient.stop();
}
}
添加Controller测试
这里通过调用NettyClient模拟客户端发送数据,测试数据ChatMessage为自定义model
@Slf4j
@RestController
@RequiredArgsConstructor
public class ChatController implements ChatApi {
private final NettyClient nettyClient;
@Override
public List<ChatMessage> getMessages(int chatRoomId, int serialNumber, int messagesNumber) {
nettyClient.send("测试数据");
nettyClient.send(new ChatMessage());
return null;
}
}
可以看到后台日志正常输出。
