老蒋的知识库

  • 首页
  • 文章归档
  • 关于页面

  • 搜索

Netty自定义通信协议,实现IM(即时通讯)

发表于 2025-02-20 | 分类于 Java | 0 | 阅读次数 26
  1. Netty是什么?
  2. Netty核心组件
  3. 组件协作流程
  4. 实战代码Demo

Netty是什么?

(抄的)Netty是一个高性能、异步的网络应用程序框架,可以轻松地开发基于TCP、UDP和HTTP等协议的网络应用程序。它是基于Java NIO技术实现的,具有较高的性能和可扩展性。Netty不仅可以用于开发网络客户端和服务器端,还可以用于开发其他类型的网络应用程序,如网络代理、网关和中间件等。
Netty的主要作用是为开发人员提供一个高效、可靠和可扩展的网络通信框架,从而降低网络应用程序的开发难度和维护成本。它提供了一系列的编解码器、处理器和协议支持,使得开发人员可以更加专注于业务逻辑的实现,而不必关心底层网络通信细节。

Netty核心组件

Channel(通道)

  • 作用:抽象了网络连接(如 Socket),提供统一的 API 操作(读、写、绑定、连接等),支持多种传输类型(NIO、Epoll、OIO 等)。
  • 关键实现:NioSocketChannel(TCP 客户端)、NioServerSocketChannel(TCP 服务端)。

EventLoop & EventLoopGroup(事件循环和线程组)

  • EventLoop:
    • 每个 EventLoop 绑定一个线程,负责处理多个 Channel 的 I/O 事件和异步任务。
    • 遵循 单线程模型,确保线程安全。
  • EventLoopGroup:
    • 管理一组 EventLoop,通常分为 bossGroup(接收连接)和 workerGroup(处理 I/O)。
    • 示例:NioEventLoopGroup。

ChannelHandler & ChannelPipeline(处理器和处理链)

  • ChannelHandler:
    • 处理入站/出站事件和数据,用户自定义逻辑的核心扩展点。
    • 分类:
      • ChannelInboundHandler(处理连接建立、数据读取等入站事件)。
      • ChannelOutboundHandler(处理连接关闭、数据写入等出站操作)。
      • ChannelDuplexHandler(继承以上两个类,出、入站都会处理)。
      • ChannelInitializer (继承ChannelInboundHandler,在连接建立时初始化)。
  • ChannelPipeline:
    • 由多个 ChannelHandler 组成的责任链,数据按顺序流经各个处理器。
    • 支持动态增删处理器。

ByteBuf(数据容器)

  • 作用:替代 Java NIO 的 ByteBuffer,提供更高效灵活的数据存储(如池化内存、零拷贝优化)。
  • 特性:支持读写索引分离、引用计数、复合缓冲区等。

Bootstrap & ServerBootstrap(启动引导类)

  • Bootstrap:
    • 客户端启动类,配置线程组、Channel 类型、处理器等。
  • ServerBootstrap:
    • 服务端启动类,额外绑定端口并管理 bossGroup 和 workerGroup。

ChannelFuture(异步结果)

  • 作用:表示异步 I/O 操作的结果(如连接、写入),通过添加监听器 (ChannelFutureListener) 实现回调逻辑。
  • 示例:channel.writeAndFlush(data).addListener(...)。

编解码器(Codec)

  • 作用:将原始字节流与业务对象相互转换(如 HTTP 协议解析、Protobuf 序列化)。
  • 常见实现:
    • MessageToByteEncoder(编码器)、ByteToMessageDecoder(解码器)。
    • 内置编解码器:StringEncoder/Decoder、ObjectEncoder/Decoder 等。

ChannelHandlerContext(处理器上下文)

  • 作用:关联 ChannelHandler 与 ChannelPipeline,提供方法触发事件传播(如 ctx.write())或获取 Channel 信息。

组件协作流程

  1. 通过 Bootstrap、ServerBootstrap 配置并启动服务端、客户端。
  2. ChannelInitializer 添加连接建立时初始化内容。
  3. 通过添加ChannelInboundHandler、ChannelOutboundHandler 配置数据出、入站处理。
  4. 操作 ByteBuf 数据类或 ChannelFuture 异步回调控制业务逻辑。

实战代码Demo

Spring boot 添加Netty包


        <!-- 通过注解,添加语法糖	-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!-- json解析	-->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.54</version>
        </dependency>

            <!-- NIO框架,主要用于IM实时通讯  -->
            <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.118.Final</version>
            </dependency>
            
        <!-- 如果使用 webflux 默认集成Netty,就不需要上面的 netty-all 包 -->
        <!-- 使用 webflux 响应式流(Reactive Streams)默认通过 Netty 启动排除 Tomcat -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

自定义配置项,方便bootstrap.yml配置


import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "netty")
@Data
public class NettyProperties {

    /**
     * boss线程数量
     */
    private Integer bossThread = 1;

    /**
     * worker线程数量
     */
    private Integer workerThread = 1;

    /**
     * 连接超时时间
     */
    private Integer timeout = 30000;

    /**
     * 服务器主端口
     */
    private Integer port = 9000;


    /**
     * 服务器地址 默认为本地
     */
//    private String host = "127.0.0.1";
}

编写消息类,用于传输数据

目前暂定只有默认String数据类,ChatMessagsIM聊天数据类(可以自行添加model进行编写序列化代码)


import com.alibaba.fastjson2.JSONObject;
import com.jagger.model.ChatMessage;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Data
public class Message {
    // 消息类型,占用 2 字节
    private short type;

    // 消息长度,占用 4 字节
    private int length;

    // 消息内容
    private byte[] body;

    /***
     * {@link Object} 转 {@link Message} 类型
     * @return {@link Message}
     */
    public static Message objectToMessage(Object body) {
        Message message = new Message();
        message.setType(Message.MessageType.getType(body).getValue());

        if (message.getType() == Message.MessageType.STRING.getValue()) {
            message.setBody(((String) body).getBytes());
        } else {
            String bodyJsonStr = JSONObject.toJSONString(body);
            message.setBody(bodyJsonStr.getBytes());
            message.setLength(message.getBody().length);
        }
        return message;
    }

    /***
     * {@link Message} 转 {@link Object} 类型
     * @return {@link Object}
     */
    public static Object messageToObject(Message message) {
        String bodyStr = new String(message.getBody());
        if (message.getType() == Message.MessageType.STRING.getValue()) {
            return bodyStr;
        } else if (message.getType() == MessageType.CHAT_MESSAGE.getValue()) {
            return JSONObject.parseObject(bodyStr, ChatMessage.class);
        } else {
            throw new RuntimeException("Message 转 Object 错误,未定义对象类型。");
        }
    }

    /***
     * {@link Message} 转 {@link Object} 类型
     * @return {@link Object}
     */
    public Object toObject() {
        return messageToObject(this);
    }

    /***
     * 消息类型
     */
    @Getter
    public enum MessageType {
        STRING((short) 0),
        CHAT_MESSAGE((short) 1); // 聊天消息

        private final short value;

        private MessageType(short value) {
            this.value = value;
        }

        public static MessageType getType(Object o) {
            if (o instanceof String) {
                return STRING;
            } else if (o instanceof ChatMessage) {
                return CHAT_MESSAGE;
            } else {
                throw new RuntimeException("不存在消息类型: " + o);
            }
        }
    }
}

定义数据出、入站处理类

  • 入站类数据处理,继承实现ChannelInboundHandlerAdapter
    • MessageDecodeHandlerMessage数据解码,继承实现ByteToMessageDecoder
    • BusinessHandler业务数据处理
  • 出站类数据处理,继承实现ChannelOutboundHandlerAdapter
    • ObjectToMessageHandler对象封装,Object转Message
    • MessageEncoderHandler数据编码
    • PushHandler数据推送前最后操作
  • 出、入站数据处理统一处理,继承实现ChannelDuplexHandler
    • ExceptionHandler异常数据处理
MessageDecodeHandlerMessage数据解码

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;

import java.util.List;


/***
 * 消息解码器
 */
@Slf4j
public class MessageDecodeHandler extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        log.debug("开始解码");
        Message message = new Message();
        message.setType(byteBuf.readShort());
        //如果byteBuf剩下的长度还有大于4个字节,说明body不为空
        if (byteBuf.readableBytes() > 4) {
            message.setLength(byteBuf.readInt());
            byte[] contents = new byte[message.getLength()];
            byteBuf.readBytes(contents, 0, message.getLength());
            message.setBody(contents);
            list.add(message);
            log.debug("成功解码,接收消息类型: {}", message.getType());
        } else {
            log.warn("解码失败: 消息内容为空");
        }
    }
}

BusinessHandler业务数据处理


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

/***
 * 业务处理
 */
@Slf4j
public class BusinessHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.debug("开始业务处理");
        log.debug("业务数据: {}", ((Message) msg).toObject());
    }
}

ObjectToMessageHandler对象封装


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ObjectToMessageHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log.debug("分析数据类型");
        super.write(ctx, Message.objectToMessage(msg), promise);
    }
}

MessageEncoderHandlerMessage数据编码


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;

/***
 * 消息编码器
 */
@Slf4j
public class MessageEncoderHandler extends MessageToByteEncoder<Message> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
        log.debug("开始编码");
        byteBuf.writeShort(message.getType());
        if (message.getBody() != null) {
            byteBuf.writeInt(message.getBody().length);
            byteBuf.writeBytes(message.getBody());
        }
        log.debug("成功编码");
    }
}

PushHandler数据推送前最后操作

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import lombok.extern.slf4j.Slf4j;

/***
 * 推送消息时处理
 */
@Slf4j
public class PushHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log.debug("分析数据类型");
        super.write(ctx, msg, promise);
    }
}

ExceptionHandler异常数据处理

/***
 * ChannelDuplexHandler 它同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口
 * ChannelInboundHandler 所有异常会再最后统一处理
 * ChannelOutboundHandler 所有异常需要通过 addListener 监听事件才能处理,需要放在第一位先添加监听
 * ExceptionHandler 作为全局异常处理,放在最后正好 in 最后处理,out 最先处理
 *
 */
@Slf4j
public class ExceptionHandler extends ChannelDuplexHandler {
    /***
     * 接收数据异常处理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("接收消息处理异常: ", cause);
    }

    /***
     * 出站数据异常处理
     * @param ctx
     * @param msg
     * @param promise
     */
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        ctx.write(msg, promise.addListener(future -> {
            if (!future.isSuccess()) {
                // 处理异常
                log.error("推送消息异常", future.cause());
            }
        }));
    }
}

编写服务端类NettyServer,服务端连接初始化类NettyServerInit

初始化添加Handler实现要注意顺序,ChannelInboundHandler先添加先调用,ChannelOutboundHandler后添加先调用。


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class NettyServer {

    private final NettyProperties nettyProperties;

    private EventLoopGroup bossGroup; // 负责处理连接请求的线程组(NIO 事件循环组)
    private EventLoopGroup workerGroup; // 负责处理 I/O 事件的线程组(NIO 事件循环组)
    private ChannelFuture channelFuture; // 代表异步操作的结果


    public void start() throws InterruptedException {
        log.info("NettyServer 启动");
        bossGroup = new NioEventLoopGroup(nettyProperties.getBossThread()); // 创建 boss 线程组,用于处理连接请求
        workerGroup = new NioEventLoopGroup(nettyProperties.getWorkerThread()); // 创建 worker 线程组,用于处理 I/O 事件
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                // 设置线程组
                .group(bossGroup, workerGroup)
                // 设置为NIO模式
                .channel(NioServerSocketChannel.class)
                // 设置TCP sync队列大小, 防止洪泛攻击
                .childOption(ChannelOption.SO_BACKLOG, 1024)
                // 设置初始化类
                .childHandler(new NettyServerInit());
        channelFuture = serverBootstrap.bind(nettyProperties.getPort()).addListener(future -> {
            if (future.isSuccess()) {
                log.info("Netty 服务启动,端口: {}", nettyProperties.getPort());
            } else {
                log.error("启动失败,请检查端口是否被占用: {}", nettyProperties.getPort(), future.cause());
            }
        }).sync();
    }


    /***
     * 停止服务
     */
    public void stop() {
        channelFuture.channel().close();
        log.info("Netty 服务端停止");
    }

    /**
     * 通过 Channel 发送数据
     *
     * @param body 文本数据
     */
    public void send(Object body) {
        log.debug("服务端发送数据");
        channelFuture.channel().writeAndFlush(body);
    }

    /***
     * 初始化类
     */
    private static class NettyServerInit extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            log.info("新 Netty 连接: {}", socketChannel.remoteAddress());

            // ChannelInboundHandlerAdapter 输入数据Handler实现调用顺序,先 addLast() 先处理
            socketChannel.pipeline()
                    .addLast(new MessageDecodeHandler())  // 数据解码
                    .addLast(new BusinessHandler()) // 业务处理
            ;

            // ChannelOutboundHandlerAdapter 输出数据Handler实现调用顺序, 后 addLast() 先处理
            socketChannel.pipeline()
                    .addLast(new PushHandler())  // 数据推送前操作
                    .addLast(new MessageEncoderHandler())  // 数据编码
                    .addLast(new ObjectToMessageHandler())  // 数据编码
            ;

            // ChannelDuplexHandler 它同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口
            // ChannelInboundHandler 所有异常会再最后统一处理
            // ChannelOutboundHandler 所有异常需要通过 addListener 监听事件才能处理,需要放在第一位先添加监听
            // ExceptionHandler 作为全局异常处理,放在最后正好 in 最后处理,out 最先处理
            socketChannel.pipeline().addLast(new ExceptionHandler()); // 消息接收异常
        }
    }
}

编写客户端类NettyClient,服务端连接初始化类NettyClientInit

注意点同服务端。


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
@Slf4j
public class NettyClient {

    private final NettyProperties nettyProperties;

    private ChannelFuture channelFuture;

    Bootstrap bootstrap;

    public void start() throws InterruptedException {
        log.info("NettyClient 启动");
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup(nettyProperties.getWorkerThread());
        bootstrap = new Bootstrap();
        bootstrap.group(eventExecutors)   // 指定线程组
                .option(ChannelOption.SO_KEEPALIVE, true)
                .channel(NioSocketChannel.class) // 指定通道
                .handler(new NettyClientInit()); // 指定处理器
        channelFuture = bootstrap.connect("127.0.0.1", nettyProperties.getPort()).addListener(future -> {
            log.info("客户端启动成功,并监听端口:{} ", nettyProperties.getPort());
        });

    }


    /**
     * 客户端通过 Channel 对象向服务器端发送数据
     *
     * @param body 文本数据
     */
    public void send(Object body) {
        log.debug("客户端发送数据");
        channelFuture.channel().writeAndFlush(body);
    }

    /***
     * 停止服务
     */
    public void stop() {
        channelFuture.channel().close();
        log.info("Netty 客户端服务停止");
    }


    /***
     * 客户端初始化
     */
    public static class NettyClientInit extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            log.debug("初始化 Netty 客户端");
            socketChannel.pipeline()
                    .addLast(new MessageDecodeHandler())  // 数据解码
                    .addLast(new ClientHandler()) // 客户端handler
            ;

            socketChannel.pipeline()
                    .addLast(new PushHandler())  // 数据推送前操作
                    .addLast(new MessageEncoderHandler())  // 数据编码
                    .addLast(new ObjectToMessageHandler())  // 对象转Message
            ;

            socketChannel.pipeline().addLast(new ExceptionHandler()); // 消息接收异常
        }
    }

}

编写ImRunner实现ApplicationRunner

在Spring boot启动时统一启动服务端、客户端,这里客户端、服务端都统一放到一个项目里好方便测试


import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class ImRunner implements ApplicationRunner {
    private final NettyServer nettyServer;

    private final NettyClient nettyClient;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.debug("启动IM服务");
        nettyServer.start();

        log.debug("启动 netty 客户端");
        nettyClient.start();
    }


    @PreDestroy
    public void destroy() {
        log.debug("退出IM服务");
        nettyServer.stop();
        nettyClient.stop();
    }

}
添加Controller测试

这里通过调用NettyClient模拟客户端发送数据,测试数据ChatMessage为自定义model


@Slf4j
@RestController
@RequiredArgsConstructor
public class ChatController implements ChatApi {

    private final NettyClient nettyClient;
    
    @Override
    public List<ChatMessage> getMessages(int chatRoomId, int serialNumber, int messagesNumber) {
        nettyClient.send("测试数据");
        nettyClient.send(new ChatMessage());
        return null;
    }
}

可以看到后台日志正常输出。
image

  • 本文作者: jagger
  • 本文链接: /archives/netty-zi-ding-yi-tong-xin-xie-yi--shi-xian-im-ji-shi-tong-xun-
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
Spring boot、Swagger 3 统一接口响应,处理异常统一返回。
Swagger Ui 404 问题,更新配置后不显示Controller对应API问题。
jagger

jagger

66 日志
31 分类
0 标签
Creative Commons
0%
© 2026 jagger
由 Halo 强力驱动