自定义Protobuf编解码器

Netty内置了一组Protobuf编解码器——ProtobufDecoder解码器和ProtobufEncoder编码器,它们负责Protobuf生成的POJO实例和二进制字节之间的编码和解码。

Netty还自带了一组配套的半包处理器:ProtobufVarint32FrameDecoder、ProtobufVarint32LengthFieldPrepender拆包解码和编码器,它们为二进制ByteBuf加上varint32格式的可变长度,解决了Protobuf传输过程中的粘包/半包问题。

使用Netty内置的Protobuf系列编解码器,虽然可以解决简单的Protobuf协议的传输问题,但是对复杂Head-Content协议(例如数据包头部存在魔数、版本号字段,具体如图8-1所示)的解析,内置Protobuf系列编解码器就显得无能为力了,这种情况下需要自定义Protobuf编码器和解码器。

自定义Protobuf编码器

继承Netty中基础的MessageToByteEncoder编码器类,实现其抽象的编码方法encode(),在该方法中把以下内容写入目标ByteBuf:

1、写入待发送的Protobuf POJO实例的二进制字节长度。

2、写入其他的字段,如魔数、版本号。

3、写入Protobuf POJO实例的二进制字节码内容。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
public class SimpleProtobufEncoder extends MessageToByteEncoder<ProtoMsg.Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx,
                          ProtoMsg.Message msg, ByteBuf out)
            throws Exception {
        encode0(msg, out);
    }

    public static void encode0(
            ProtoMsg.Message msg, ByteBuf out) {
        out.writeShort(ProtoInstant.MAGIC_CODE);
        out.writeShort(ProtoInstant.VERSION_CODE);

        byte[] bytes = msg.toByteArray();// 将 ProtoMsg.Message 对象转换为byte
        int length = bytes.length;// 读取消息的长度

        // 先将消息长度写入,也就是消息头
        out.writeInt(length);
        // 消息体中包含我们要发送的数据
        out.writeBytes(bytes);
    }

}

自定义Protobuf解码器

继承Netty中基础的ByteToMessageDecoder解码器类实现,在其继承的decode()方法中,将ByteBuf字节码解码成Protobuf的POJO实例,大致过程如下:

1、读取长度,如果长度位数不够,就终止读取。

2、读取魔数、版本号等其他字段。

3、按照净长度读取内容。如果内容的字节数不够,则恢复到之前的起始位置(也就是长度的位置),然后终止读取。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@Slf4j
public class SimpleProtobufDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx,
                          ByteBuf in,
                          List<Object> out) throws Exception {

        Object outmsg = decode0(ctx, in);
        if (outmsg != null) {
            // 获取业务消息
            out.add(outmsg);
        }

    }

    public static Object decode0(ChannelHandlerContext ctx,
                                 ByteBuf in) throws InvalidFrameException, InvalidProtocolBufferException {
        // 标记一下当前的readIndex的位置
        in.markReaderIndex();
        // 判断包头长度
        if (in.readableBytes() < 8) {// 不够包头
            return null;
        }
      
        //读取魔数
        short magic = in.readShort();
        if (magic != ProtoInstant.MAGIC_CODE) {
            String error = "客户端口令不对:" + ctx.channel().remoteAddress();
            //异常连接,直接报错,关闭连接
            throw new InvalidFrameException(error);
        }
      
        //读取版本
        short version = in.readShort();
        if (version != ProtoInstant.VERSION_CODE) {
            String error = "协议的版本不对:" + ctx.channel().remoteAddress();
            //异常连接,直接报错,关闭连接
            throw new InvalidFrameException(error);
        }
      
        // 读取传送过来的消息的长度。
        int length = in.readInt();
        // 长度如果小于0
        if (length < 0) {
            // 非法数据,关闭连接
            ctx.close();
        }
        if (length > in.readableBytes()) {// 读到的消息体长度如果小于传送过来的消息长度
            // 重置读取位置
            in.resetReaderIndex();
            return null;
        }
        Logger.cfo("decoder length=" + in.readableBytes());

        byte[] array;
        if (in.hasArray()) {
            //堆缓冲
            array = new byte[length];
            in.readBytes(array, 0, length);
        } else {
            //直接缓冲
            array = new byte[length];
            in.readBytes(array, 0, length);
        }

        // 字节转成对象
        ProtoMsg.Message outmsg =
                ProtoMsg.Message.parseFrom(array);

        return outmsg;
    }
}

Protobuf消息格式的设计

消息类型使用enum定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
enum HeadType
{
  LOGIN_REQUEST = 1;//登陆请求
  LOGIN_RESPONSE = 2;//登录响应
  LOGOUT_REQUEST = 3;//退出请求
  LOGOUT_RESPONSE = 4;//退出响应
  KEEPALIVE_REQUEST = 5;//心跳请求PING;
  KEEPALIVE_RESPONSE = 6;
  MESSAGE_REQUEST = 7;//消息请求;
  MESSAGE_RESPONSE = 8;//消息回执;
  MESSAGE_NOTIFICATION = 9;//通知消息
}

使用一个Protobuf消息结构定义一类消息

1
2
3
4
5
6
7
message LoginRequest{
	required string uid = 1;        // 用户唯一id
	required string deviceId = 2;     // 设备ID
	required string token = 3;       // 用户token
	optional uint32 platform = 4;      //客户端平台Windows、Mac、Android、IOS、Web
	optional string app_version = 5;    // APP版本号
}

给应答消息加上成功标记和应答序号

应答序号的作用:如果一个请求有多个响应,则发送端可以设计为每个响应消息可以包含一个应答的序号,最后一个响应消息包含一个结束标记。接收端在处理时,根据应答序号和结束标记可以合并所有的响应消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
/*聊天响应*/
message MessageResponse
{
    required bool result = 1; //true表示发送成功,false表示发送失败
    required uint32 code = 2;	//错误码
    required string info = 3;	//错误描述
    required uint32 expose = 4; //错误描述是否提示给用户:1 提示;0 不提示
    required bool last_block = 5;//是否为最后的应答
    required fixed32 block_index = 6;//应答的序号
}

编解码从顶级消息开始

定义一个外层的消息,把所有的消息类型全部封装在一起。在通信时可以从外层消息开始编码或者解码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
/*顶层消息*/
//顶层消息是一种嵌套消息,嵌套了各种类型消息
//内部的消息类型,全部使用optional字段
//根据消息类型type的值,最多只有一个有效
message Message
{
 required HeadType type = 1; //消息类型
 required uint64   sequence = 2;//消息系列号
 required string  session_id = 3;
 optional LoginRequest loginRequest = 4;
 optional LoginResponse loginResponse = 5;
 optional MessageRequest messageRequest = 6;
 optional MessageResponse messageResponse = 7;
 optional MessageNotification notification = 8;
}

登录

从端到端(End to End)的角度来说,登录的流程包括以下环节:

1、客户端发送登录数据包。

2、服务端进行用户信息验证。

3、服务端创建会话。

4、服务端返回登录结果的信息给客户端,包括成功标志、Session ID等。

整个端到端(End to End)的登录流程涉及4次编码/解码:

1、客户端编码:客户端对登录请求的Protobuf数据包进行编码。

2、服务端解码:服务端对登录请求的Protobuf数据包进行解码。

3、服务端编码:服务端对编码登录响应的Protobuf数据包进行编码。

4、客户端解码:客户端对登录响应的Protobuf数据包进行解码。

从客户端到服务端再到客户端,9个环节的相关介绍如下:

1、客户端收集用户ID和密码,需要使用LoginConsoleCommand控制台命令类。

2、客户端发送Protobuf数据包到客户端通道,需要通过LoginSender发送器组装Protobuf数据包。

3、客户端通道将Protobuf数据包发送到对端,需要通过Netty底层来完成。

4、服务器子通道收到Protobuf数据包,需要通过Netty底层来完成。

5、服务端UserLoginHandler入站处理器收到登录消息,交给业务处理器LoginMsgProcesser处理异步的业务逻辑。

6、服务端LoginMsgProcesser处理完异步的业务逻辑,将处理结果写入用户绑定的子通道。

7、服务器子通道将登录响应Protobuf数据帧发送到客户端,需要通过Netty底层来完成。

8、客户端通道收到Protobuf登录响应数据包,需要通过Netty底层来完成。

9、客户端LoginResponseHandler业务处理器处理登录响应,例如设置登录的状态、保存会话的Session ID等。

客户端

在IM登录的整体执行流程中,客户端所涉及的主要模块大致如下:

(1)ClientCommand模块:控制台命令收集器。

(2)ProtobufBuilder模块:Protobuf数据包构造者。

(3)Sender模块:数据包发送器。

(4)Handler模块:服务器响应处理器。

上面的这些模块都有一个或者多个专门的POJO Java类来完成对应的工作:

(1)LoginConsoleCommand类:属于ClientCommand模块,负责收集用户在控制台输入的用户ID和密码。

(2)CommandController类:属于ClientCommand模块,负责收集用户在控制台输入的命令类型,根据相应的类型调用相应的命令处理器,然后收集相应的信息。

例如,如果用户输入的命令类型为登录,则调用LoginConsoleCommand命令处理器将收集到的用户ID和密码封装成User类,然后启动登录处理。

(3)LoginMsgBuilder类:属于ProtobufBuilder模块,负责将User类组装成Protobuf登录请求数据包。

(4)LoginSender类:属于Sender模块,负责将组装好的Protobuf登录数据包发送到服务端。

(5)LoginResponseHandler类:属于Handler模块,负责处理服务端的登录响应。

LoginSender消息发送器

LoginSender消息发送器的sendLoginMsg()方法主要有两步:

  • 生成Protobuf登录数据包
  • 调用BaseSender基类的sendMsg()方法来发送数据包
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Slf4j
@Service("loginSender")
public class LoginSender extends BaseSender {
    public void sendLoginMsg() {
        if (!isConnected()) { log.info("还没有建立连接!");return; }

        log.info("构造登录消息");
        ProtoMsg.Message message =
                LoginMsgConverter.build(getUser(), getSession());
        
        log.info("发送登录消息");
        super.sendMsg(message);
    }
}

调用BaseSender的sendMsg()方法来发送登录消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Data
@Slf4j
public abstract class BaseSender {
    private User user;
    private ClientSession session;

    public boolean isConnected() {
        if (null == session) {
            log.info("session is null");
            return false;
        }
        return session.isConnected();
    }

    public boolean isLogin() {
        if (null == session) {
            log.info("session is null");
            return false;
        }
        return session.isLogin();
    }

    public void sendMsg(ProtoMsg.Message message) {
        if (null == getSession() || !isConnected()) {
            log.info("连接还没成功");
            return;
        }

        Channel channel = getSession().getChannel();
        ChannelFuture f = channel.writeAndFlush(message);
        f.addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future)
                    throws Exception {
                // 回调
                if (future.isSuccess()) {
                    sendSucced(message);
                } else {
                    sendfailed(message);
                }
            }

        });
        
    }

    protected void sendSucced(ProtoMsg.Message message) {
        log.info("发送成功");

    }

    protected void sendfailed(ProtoMsg.Message message) {
        log.info("发送失败");
    }

}

在Netty中会调用write(pkg)或者writeAndFlush(pkg)方法来发送数据包,发送方法调用后会立即返回,返回的类型是一个ChannelFuture异步任务实例。

发送方法返回时,数据包并未发送到对端?比如在write(pkg)方法返回时,真正的TCP写入的操作其实还没有执行。这和Netty中在同一个通道上的同一个处理器的出入站操作的串行执行特点有关。

在Netty中,无论是入站操作还是出站操作,都有两大特点:

(1)同一条通道的同一个Handler处理器的所有出/入站处理都是串行的,而不是并行的。

Netty是如何保障这一点的呢?在某个出/入站开启时,Netty会对当前的执行线程进行判断:如果当前线程不是Handler的执行线程,则处理暂时不执行,Netty会为当前处理建立一个新的异步可执行任务,加入Handler的执行线程任务队列中。

(2)Netty的出/入站操作不是单个Handler业务处理器操作,而是流水线上一系列的出/入站处理流程。只有整个流程都处理完,出/入站操作才真正处理完成。

ClientSession

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**
 * 实现客户端 Session会话
 */
@Slf4j
@Data
public class ClientSession {
    public static final AttributeKey<ClientSession> SESSION_KEY = AttributeKey.valueOf("SESSION_KEY");

    /**
     * 用户实现客户端会话管理的核心
     */
    private Channel channel;
    private User user;

    /**
     * 保存登录后的服务端sessionid
     */
    private String sessionId;

    private boolean isConnected = false;
    private boolean isLogin = false;


    //绑定通道
    //连接成功之后
    public ClientSession(Channel channel) {
        //正向的绑定
        this.channel = channel;
        this.sessionId = UUID.randomUUID().toString();
        //反向的绑定
        channel.attr(ClientSession.SESSION_KEY).set(this);
    }

    //登录成功之后,设置sessionId
    public static void loginSuccess(ChannelHandlerContext ctx, ProtoMsg.Message pkg) {
        Channel channel = ctx.channel();
        ClientSession session = channel.attr(ClientSession.SESSION_KEY).get();
        session.setSessionId(pkg.getSessionId());
        session.setLogin(true);
        log.info("登录成功");
    }

    //获取channel
    public static ClientSession getSession(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        ClientSession session = channel.attr(ClientSession.SESSION_KEY).get();
        return session;
    }

    public String getRemoteAddress() {
        return channel.remoteAddress().toString();
    }

    //写protobuf 数据帧
    public ChannelFuture witeAndFlush(Object pkg) {
        ChannelFuture f = channel.writeAndFlush(pkg);
        return f;
    }

    public void writeAndClose(Object pkg) {
        ChannelFuture future = channel.writeAndFlush(pkg);
        future.addListener(ChannelFutureListener.CLOSE);
    }

    //关闭通道
    public void close() {
        isConnected = false;
        ChannelFuture future = channel.close();
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    log.error("连接顺利断开");
                }
            }
        });
    }
    
}

什么时候创建客户端会话呢?在Netty客户端发起连接请求之后,增加一个连接建立完成的异步回调任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
@Data
@Service("CommandController")
public class CommandController {
    //...
    GenericFutureListener<ChannelFuture> connectedListener = (ChannelFuture f) ->
    {
        final EventLoop eventLoop
                = f.channel().eventLoop();
        if (!f.isSuccess()) {
            log.info("连接失败!在10s之后准备尝试重连!");
            eventLoop.schedule(() -> chatNettyClient.doConnect(), 10,
                    TimeUnit.SECONDS);
            connectFlag = false;
        } else {
            connectFlag = true;
            log.info("IM 服务器 连接成功!");
            channel = f.channel();
            // 创建会话
            session = new ClientSession(channel);
            session.setConnected(true);
            channel.closeFuture().addListener(closeListener);
            //唤醒用户线程
            notifyCommandThread();
        }

    };
    //...
}

LoginResponseHandler

LoginResponseHandler登录响应处理器对消息类型进行判断:

(1)如果消息类型不是请求响应消息,则调用父类默认的super.channelRead()入站处理方法,将数据包交给流水线的下一站Handler业务处理器去处理。

(2)如果消息类型是请求响应消息并且登录成功,则取出绑定的会话(Session),再设置登录成功的状态。完成登录成功处理之后,进行其他的客户端业务处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Slf4j
@ChannelHandler.Sharable
@Service("LoginResponseHandler")
public class LoginResponseHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    private ChatMsgHandler chatMsgHandler;

    @Autowired
    private HeartBeatClientHandler heartBeatClientHandler;

    /**
     * 业务逻辑处理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        //判断消息实例
        if (null == msg || !(msg instanceof ProtoMsg.Message)) {
            super.channelRead(ctx, msg);
            return;
        }
        //判断类型
        ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
        ProtoMsg.HeadType headType = ((ProtoMsg.Message) msg).getType();
        if (!headType.equals(ProtoMsg.HeadType.LOGIN_RESPONSE)) {
            super.channelRead(ctx, msg);
            return;
        }
        //判断返回是否成功
        ProtoMsg.LoginResponse info = pkg.getLoginResponse();

        ProtoInstant.ResultCodeEnum result =
                ProtoInstant.ResultCodeEnum.values()[info.getCode()];

        if (!result.equals(ProtoInstant.ResultCodeEnum.SUCCESS)) {
            //登录失败
            log.info(result.getDesc());
        } else {
            //登录成功
            ClientSession.loginSuccess(ctx, pkg);
            ChannelPipeline p = ctx.pipeline();
            //移除登录响应处理器
            p.remove(this);
            //在编码器后面,动态插入心跳处理器
            p.addAfter("encoder", "heartbeat", heartBeatClientHandler);
            p.addAfter("encoder", "chat", chatMsgHandler);

            heartBeatClientHandler.channelActive(ctx);
        }

    }

}

在登录成功之后,需要将LoginResponseHandler登录响应处理器实例从流水线上移除,因为不需要再处理登录响应了。同时,需要在客户端和服务端(即服务器器)之间开启定时的心跳处理。

ChatNettyClient

在客户端的业务处理器流水线(Pipeline)上,装配顺序:

  • ProtobufDecoder解码器
  • ProtobufEncoder编码器
  • LoginResponseHandler登录响应处理器:业务处理器。
  • ExceptionHandler异常处理器:是一个入站处理器,用来实现Netty异常的处理以及在连接异常中断后进行重连。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@Slf4j
@Data
@Service("chatNettyClient")
public class ChatNettyClient {
    // 服务器ip地址
    @Value("${chat.server.ip}")
    private String host;
    // 服务器端口
    @Value("${chat.server.port}")
    private int port;
    @Autowired
    private SystemConfig systemConfig;
    @Autowired
    private LoginResponseHandler loginResponseHandler;
    @Autowired
    private ExceptionHandler exceptionHandler;
    private Channel channel;
    private ChatSender sender;
    private LoginSender l;

    /**
     * 唯一标记
     */
    private boolean initFalg = true;
    private User user;
    private GenericFutureListener<ChannelFuture> connectedListener;
    private Bootstrap bootstrap;
    private EventLoopGroup g;

    public ChatNettyClient() {

        /**
         * 客户端的是Bootstrap,服务端的则是 ServerBootstrap。
         * 都是AbstractBootstrap的子类。
         **/

        /**
         * 通过nio方式来接收连接和处理连接
         */

        g = new NioEventLoopGroup(1);
    }

    /**
     * 重连
     */
    public void doConnect() {
        try {
            bootstrap = new Bootstrap();

            bootstrap.group(g);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            bootstrap.remoteAddress(host, port);

            // 设置通道初始化
            bootstrap.handler(
                    new ChannelInitializer<SocketChannel>() {
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast("decoder", new SimpleProtobufDecoder());
                            ch.pipeline().addLast("encoder", new SimpleProtobufEncoder());
                            ch.pipeline().addLast(loginResponseHandler);
                            //ch.pipeline().addLast(chatMsgHandler);
                            ch.pipeline().addLast(exceptionHandler);
                        }
                    }
            );
            log.info("客户端开始连接 [疯狂创客圈IM]");

            ChannelFuture f = bootstrap.connect();//异步发起连接
            f.addListener(connectedListener);
        } catch (Exception e) {
            log.info("客户端连接失败!" + e.getMessage());
        }
    }

    public void close() {
        g.shutdownGracefully();
    }

}

处理器装配次序说明:loginResponseHandler登录响应处理器必须装配在ProtobufDecoder解码器之后。

原因:Netty客户端读到二进制ByteBuf数据包之后,需要通过ProtobufDecoder完成解码操作。解码后组装好Protobuf消息POJO,再进入LoginResponseHandler。

服务端

在IM登录的整体执行流程中,服务端涉及的主要模块如下:

(1)Handler模块:客户端请求的处理。

(2)Processer模块:以异步方式完成请求的业务逻辑处理。

(3)Session模块:管理用户与通道的绑定关系。

在具体的服务器登录流程中,上面的这些模块都有一个或者多个专门的Java类来完成对应的工作,大致的类为:

(1)UserLoginRequestHandler类:属于Handler模块,负责处理收到的Protobuf登录请求包,然后使用LoginProcesser类以异步方式进行用户校验。

(2)LoginProcesser类:属于Processer模块,完成服务端的用户校验,再将校验的结果组装成一个登录响应Protobuf数据包写回到客户端。

(3)ServerSession类:属于Session模块,如果校验成功,设置相应的会话状态;然后,将会话加入服务端的SessionMap映射中,这样该用户就可以接受其他用户发送的聊天消息了。

服务端的登录处理流程是:

(1)ProtobufDecoder解码器把请求ByteBuf数据包解码成Protobuf数据包。

(2)UserLoginRequestHandler登录处理器负责处理Protobuf数据包,进行一些必要的判断和预处理后,启动LoginProcesser登录业务处理器,以异步方式进行登录验证处理。

(3)LoginProcesser通过数据库或者远程接口完成用户验证,根据验证处理的结果生成登录成功/失败的登录响应报文,并发送给客户端

ChatServer

与客户端类似,服务端流水线需要装配:

  • ProtobufDecoder解码器
  • ProtobufEncoder编码器
  • loginRequestHandler登录业务处理器实例
  • serverExceptionHandler异常处理器实例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@Data
@Slf4j
@Service("ChatServer")
public class ChatServer {
    // 服务器端口
    @Value("${server.port}")
    private int port;
    // 通过nio方式来接收连接和处理连接
    private EventLoopGroup bg;
    private EventLoopGroup wg;

    // 启动引导器
    private ServerBootstrap b = new ServerBootstrap();
    @Autowired
    private LoginRequestHandler loginRequestHandler;

    @Autowired
    private ServerExceptionHandler serverExceptionHandler;

    @Autowired
    private ChatRedirectHandler chatRedirectHandler;

    public void run() {
        //连接监听线程组
        bg = new NioEventLoopGroup(1);
        //传输处理线程组
        wg = new NioEventLoopGroup();

        try {
            //1 设置reactor 线程
            b.group(bg, wg);
            //2 设置nio类型的channel
            b.channel(NioServerSocketChannel.class);
            //3 设置监听端口
            b.localAddress(new InetSocketAddress(port));
            //4 设置通道选项
            //b.option(ChannelOption.SO_KEEPALIVE, true);
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 装配流水线
            b.childHandler(new ChannelInitializer<SocketChannel>() {
                //有连接到达时会创建一个channel
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 管理pipeline中的Handler
                    ch.pipeline().addLast(new SimpleProtobufDecoder());
                    ch.pipeline().addLast(new SimpleProtobufEncoder());
                    // ch.pipeline().addLast(new HeartBeatServerHandler());
                    // 在流水线中添加handler来处理登录,登录后删除
                    ch.pipeline().addLast("login", loginRequestHandler);
                    //ch.pipeline().addLast(chatRedirectHandler);
                    ch.pipeline().addLast(serverExceptionHandler);
                }
            });
            // 6 开始绑定server
            // 通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture channelFuture = b.bind().sync();
            log.info(
                    "CrazyIM 服务启动, 端口 " +
                            channelFuture.channel().localAddress());
            // 7 监听通道关闭事件
            // 应用程序会一直等待,直到channel关闭
            ChannelFuture closeFuture =
                    channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 8 优雅关闭EventLoopGroup,
            // 释放掉所有资源包括创建的线程
            wg.shutdownGracefully();
            bg.shutdownGracefully();
        }

    }

}

LoginRequestHandler

这是一个入站处理器,继承自ChannelInboundHandlerAdapter入站适配器,重写了适配器的channelRead()方法,主要工作如下:

(1)对消息进行必要的判断:判断是否为登录请求Protobuf数据包。如果不是,通过super.channelRead(ctx, msg)将消息交给流水线的下一个入站处理器。

(2)如果是登录请求Protobuf数据包,准备进行登录处理,提前为客户建立一个服务端的会话ServerSession。

(3)使用自定义的CallbackTaskScheduler异步任务调度器提交一个异步任务,启动LoginProcesser执行登录用户验证逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Slf4j
@Service("LoginRequestHandler")
@ChannelHandler.Sharable
public class LoginRequestHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    LoginProcesser loginProcesser;
    @Autowired
    private ChatRedirectHandler chatRedirectHandler;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("收到一个新的连接,但是没有登录 {}", ctx.channel().id());
        super.channelActive(ctx);
    }

    /**
     * 收到消息
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        if (null == msg || !(msg instanceof ProtoMsg.Message)) {
            super.channelRead(ctx, msg);
            return;
        }

        ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
        //取得请求类型
        ProtoMsg.HeadType headType = pkg.getType();
        if (!headType.equals(loginProcesser.type())) {
            super.channelRead(ctx, msg);
            return;
        }
        //异步任务,处理登录的逻辑
        CallbackTaskScheduler.add(new CallbackTask<Boolean>() {
            @Override
            public Boolean execute() throws Exception {
                boolean r = loginProcesser.action(session, pkg);
                return r;
            }

            //异步任务返回
            @Override
            public void onBack(Boolean r) {
                if (r) {
                    ctx.pipeline().addAfter("login", "chat", chatRedirectHandler);
                    ctx.pipeline().addAfter("login", "heartBeat", new HeartBeatServerHandler());
                    ctx.pipeline().remove("login");
                    log.info("登录成功:" + session.getUser());
                } else {
                    ServerSession.closeSession(ctx);
                    log.info("登录失败:" + session.getUser());
                }

            }

            //异步任务异常
            @Override
            public void onException(Throwable t) {
                ServerSession.closeSession(ctx);
                log.info("登录失败:" + session.getUser());

            }
        });
        
    }
    
}

LoginProcessor

LoginProcesser用户验证逻辑主要包括:

  • 密码验证
  • 将验证的结果写入通道
  • 如果登录验证成功,实现通道与服务端会话的双向绑定,并且将服务端会话加入到在线用户列表中
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Slf4j
@Service("LoginProcesser")
public class LoginProcesser implements ServerProcesser {
    @Autowired
    LoginResponceConverter loginResponceConverter;

    @Override
    public ProtoMsg.HeadType type() {
        return ProtoMsg.HeadType.LOGIN_REQUEST;
    }

    @Override
    public boolean action(ServerSession session,
                          ProtoMsg.Message proto) {
        // 取出token验证
        ProtoMsg.LoginRequest info = proto.getLoginRequest();
        long seqNo = proto.getSequence();
        User user = User.fromMsg(info);
        //检查用户
        boolean isValidUser = checkUser(user);
        if (!isValidUser) {
            ProtoInstant.ResultCodeEnum resultcode =
                    ProtoInstant.ResultCodeEnum.NO_TOKEN;
            //构造登录失败的报文
            ProtoMsg.Message response =
                    loginResponceConverter.build(resultcode, seqNo, "-1");
            //发送登录失败的报文
            session.writeAndFlush(response);
            return false;
        }
        session.setUser(user);
        //服务端session和传输channel绑定的核心代码
        session.reverseBind();
        //登录成功
        ProtoInstant.ResultCodeEnum resultcode =
                ProtoInstant.ResultCodeEnum.SUCCESS;
        //构造登录成功的报文
        ProtoMsg.Message response = loginResponceConverter.build(resultcode, seqNo, session.getSessionId());
        //发送登录成功的报文
        session.writeAndFlush(response);
        return true;
    }

    private boolean checkUser(User user) {
        if (SessionMap.inst().hasLogin(user)) {
            return false;
        }
        //校验用户,比较耗时的操作,需要100 ms以上的时间
        //方法1:调用远程用户restfull 校验服务
        //方法2:调用数据库接口校验
        return true;
    }

}

用户密码验证的逻辑在checkUser()方法中完成。在实际的生产场景中,LoginProcesser进行用户登录验证的方式比较多:

  • 通过RESTful接口验证用户。
  • 通过数据库去验证用户。
  • 通过认证(Auth)服务器去验证用户。

总之,验证用户涉及RPC等耗时操作,为了尽量简化流程,示例程序代码省去了通过账号和密码验证的过程,checkUser()方法直接返回true,也就是默认所有的登录都是成功的。

在用户校验成功后,服务端就需要向客户端发送登录响应,具体的方法是:调用登录响应的Protobuf消息构造器loginResponseBuilder,构造一个登录响应POJO实例,设置好校验成功的标志位,调用会话(Session)的writeAndFlush()方法把数据写到客户端。

Reactor线程和业务线程相互隔离

为什么在服务端的登录处理需要分成两个模块,而不是像客户端一样在InboundHandler入站处理器中统一完成处理呢?答案是在服务端需要隔离EventLoop(Reactor)线程业务线程。基本方法是使用独立、异步的业务线程去执行用户验证的逻辑,而不是在EventLoop线程中去执行用户验证的逻辑。

在服务端Reactor线程和业务线程相互隔离非常重要。具体来说,就是专门开辟一个独立的线程池,负责一个独立的异步任务处理。对于耗时的业务操作封装成异步任务,并放入独立的线程池中去处理。这样的话服务端的性能会提升很多,避免了对IO操作的阻塞。

有两种办法使用独立的线程池:

  • 使用Netty的EventLoopGroup线程池
  • 使用自己创建的Java线程池