源码版本:2.5.0

在 Seata 中有三类角色,TC、RM、TM,Seata Server 作为 TC 协调分支事务的提交和回滚,各个资源作为 RM 和 TM,那么这三者之间是如何通信的呢?相信读完这篇文章之后,你就会知道答案。

整体类层次结构

我们先着眼大局,看一看 Seata 整个网络通信的类层次结构。

从类结构层次分析可以看出,Seata 的网络通信底层基于 Netty 实现。AbstractNettyRemoting 作为整个通信层的核心抽象基类,封装了通用的 RPC 基础能力,例如同步调用 (sendSync) 和异步调用 (sendAsync) 等方法。

本质上,网络通信的核心操作可归结为同步和异步两种调用模式,而请求与响应的区别主要在于报文内容的语义。

虽然 AbstractNettyRemoting 提供了这些基础方法,但不同角色(Server、TM、RM)在调用行为上是存在差异的,比如 Server 的请求调用需要明确指定目标客户端进行消息发送,而 Client(TM/RM)发送请求时通常无需指定特定的服务端实例。它们只需将请求发出,具体的服务端节点选择由实现类中的负载均衡机制完成。

为了体现这种角色差异,Seata 设计了 RemotingServer 和 RemotingClient 两个接口。它们都依赖于 AbstractNettyRemoting 提供的底层网络通信能力,并由各自的具体子类继承 AbstractNettyRemoting 来实现相应的接口功能。

如何启动 Server 和 Client

下面分别以 Server 和 Client 的视角来看它们是如何启动的,以及在启动的时候需要做些什么事情。

Server 是怎么启动的

Seata Server 端作为一个独立的 SpringBoot 项目,要怎么样才能在 SpringBoot 启动的时候自动做点事呢?

Seata 的做法是实现了 CommandLineRunner 接口,我们主要关注它的 run 方法:

// org.apache.seata.server.ServerRunner#run
public void run(String... args) {
    try {
        long start = System.currentTimeMillis();
        seataServer.start(args);
        started = true;

        long cost = System.currentTimeMillis() - start;
        LOGGER.info("\r\n you can visit seata console UI on namingserver. \r\n log path: {}.", this.logPath);
        LOGGER.info("seata server started in {} millSeconds", cost);
    } catch (Throwable e) {
        started = Boolean.FALSE;
        LOGGER.error("seata server start error: {} ", e.getMessage(), e);
        System.exit(-1);
    }
}

这其中核心的逻辑就在 seataServer.start() 方法中:

// org.apache.seata.server.Server#start
public void start(String[] args) {
    // 参数解析器,用于解析启动参数
    ParameterParser parameterParser = new ParameterParser(args);
    
    // initialize the metrics
    MetricsManager.get().init();
    
    ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(
            NettyServerConfig.getMinServerPoolSize(),
            NettyServerConfig.getMaxServerPoolSize(),
            NettyServerConfig.getKeepAliveTime(),
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
            new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()),
            new ThreadPoolExecutor.CallerRunsPolicy());
    
    // 127.0.0.1 and 0.0.0.0 are not valid here.
    if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
        XID.setIpAddress(parameterParser.getHost());
    } else {
        String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
        if (StringUtils.isNotBlank(preferredNetworks)) {
            XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
        } else {
            XID.setIpAddress(NetUtil.getLocalIp());
        }
    }
    /**
     * 主要做这么几件事:
     * 1. 设置 workingThreads 为 AbstractNettyRemoting 的 messageExecutor 处理器
     * 2. 创建 ServerBootstrap,配置 Boss 和 Worker,传递 Netty 服务端配置,并设置 Seata Server 需要监听的端口
     * 3. 设置出栈、入栈处理器 ServerHandler,它是一个 ChannelDuplexHandler 复合的处理器
     */
    NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
    XID.setPort(nettyRemotingServer.getListenPort());
    UUIDGenerator.init(parameterParser.getServerNode());
    ConfigurableListableBeanFactory beanFactory = ((ConfigurableApplicationContext)
                    ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
            .getBeanFactory();
    DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
    if (coordinator instanceof ApplicationListener) {
        beanFactory.registerSingleton(NettyRemotingServer.class.getName(), nettyRemotingServer);
        beanFactory.registerSingleton(DefaultCoordinator.class.getName(), coordinator);
        ((ConfigurableApplicationContext) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
                .addApplicationListener((ApplicationListener<?>) coordinator);
    }
    // log store mode: file, db, redis
    SessionHolder.init();
    LockerManagerFactory.init();
    // 基于 ScheduledThreadPoolExecutor 启动一系列定时任务,用于重试事务提交/回滚、undolog 删除等
    coordinator.init();
    // 设置事务处理 Handler 为 DefaultCoordinator
    nettyRemotingServer.setHandler(coordinator);
    Optional.ofNullable(seataInstanceStrategy).ifPresent(SeataInstanceStrategy::init);
    // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
    ServerRunner.addDisposable(coordinator);
    // Server 初始化
    nettyRemotingServer.init();
}

最后的 nettyRemotingServer.init() 是整个 Seata Server 启动的重要逻辑,主要做了这么几件事:

  1. 注册一系列处理器
  2. 启动 ScheduledExecutorService,每 3 秒执行一次,主要用于清理过期的 MessageFuture
  3. 启动 ServerBootStrap 并将 TC 服务注册到注册中心,比如 Nacos

注册处理器

在 Seata 内部,用一个 Pair 对象关联了处理器和线程池,如下:

public final class Pair<T1, T2> {
    private final T1 first;
    private final T2 second;

    public Pair(T1 first, T2 second) {
        this.first = first;
        this.second = second;
    }

    public T1 getFirst() {
        return first;
    }

    public T2 getSecond() {
        return second;
    }
}

而注册处理器本质就是将报文类型、处理该报文的处理器以及具体执行的线程池关联起来,存到一张哈希表中。

// AbstractNettyRemotingServer
protected final Map<Integer/*MessageType*/, 
    Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
// org.apache.seata.core.rpc.netty.NettyRemotingServer#registerProcessor
private void registerProcessor() {
    // 1. registry on request message processor
    // 处理 RM/TM 客户端请求报文的处理器
    ServerOnRequestProcessor onRequestProcessor = new ServerOnRequestProcessor(this, getHandler());
    ShutdownHook.getInstance().addDisposable(onRequestProcessor);
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
    // 2. registry on response message processor
    // 处理 RM/TM 客户端响应报文的处理器
    ServerOnResponseProcessor onResponseProcessor = new ServerOnResponseProcessor(getHandler(), getFutures());
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
    // 3. registry rm message processor
    // 处理 RM 的注册请求的处理器
    RegRmProcessor regRmProcessor = new RegRmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
    // 4. registry tm message processor
    // 处理 TM 的注册请求的处理器
    RegTmProcessor regTmProcessor = new RegTmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
    // 5. registry heartbeat message processor
    // 处理客户端发送的心跳请求的处理器
    ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}


// org.apache.seata.core.rpc.netty.AbstractNettyRemotingServer#registerProcessor
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
    Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
    this.processorTable.put(messageType, pair);
}

同样,你可能会注意到,在注册处理器时,有一些注册时传入的线程池是 null,那么对应的报文会由哪个线程执行呢?后面会提到。

初始化定时线程池

// org.apache.seata.core.rpc.netty.AbstractNettyRemoting#init
public void init() {
    timerExecutor.scheduleAtFixedRate(
            new Runnable() {
                @Override
                public void run() {
                    for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
                        MessageFuture future = entry.getValue();
                        if (future.isTimeout()) {
                            futures.remove(entry.getKey());
                            RpcMessage rpcMessage = future.getRequestMessage();
                            future.setResultMessage(new TimeoutException(String.format(
                                    "msgId: %s ,msgType: %s ,msg: %s ,request timeout",
                                    rpcMessage.getId(),
                                    String.valueOf(rpcMessage.getMessageType()),
                                    rpcMessage.getBody().toString())));
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug(
                                        "timeout clear future: {}",
                                        entry.getValue().getRequestMessage().getBody());
                            }
                        }
                    }

                    nowMills = System.currentTimeMillis();
                }
            },
            TIMEOUT_CHECK_INTERVAL,
            TIMEOUT_CHECK_INTERVAL,
            TimeUnit.MILLISECONDS);
}

这个没啥好说的,就是启动了一个定时任务,定时清理那些超时的 MessageFuture,这里 MessageFuture 是 Seata 将异步调用转为同步调用的关键,后面也会详细说到。

启动 ServerBootStrap

最后启动 ServerBootStrap,这差不多就是 Netty 的内容了。

public void start() {
    int port = getListenPort();
    this.serverBootstrap
            .group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
            .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
            .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
            .option(ChannelOption.SO_REUSEADDR, true)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
            .childOption(
                    ChannelOption.WRITE_BUFFER_WATER_MARK,
                    new WriteBufferWaterMark(
                            nettyServerConfig.getWriteBufferLowWaterMark(),
                            nettyServerConfig.getWriteBufferHighWaterMark()))
            .localAddress(new InetSocketAddress(port))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline()
                            .addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
                            .addLast(new ProtocolDetectHandler(new ProtocolDetector[] {
                                new Http2Detector(getChannelHandlers()),
                                new SeataDetector(getChannelHandlers()),
                                new HttpDetector()
                            }));
                }
            });

    try {
        this.serverBootstrap.bind(port).sync();
        LOGGER.info("Server started, service listen port: {}", getListenPort());
        Instance instance = Instance.getInstance();
        // Lines 177-180 are just for compatibility with test cases
        if (instance.getTransaction() == null) {
            Instance.getInstance().setTransaction(new Node.Endpoint(XID.getIpAddress(), XID.getPort(), "netty"));
        }
        for (RegistryService<?> registryService : MultiRegistryFactory.getInstances()) {
            registryService.register(Instance.getInstance());
        }
        initialized.set(true);
    } catch (SocketException se) {
        throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se);
    } catch (Exception exx) {
        throw new RuntimeException("Server start failed", exx);
    }
}

ServerBootstrap 启动时的 childOption 属于网络部分的内容,不过多解释。

这里你可能有一点疑问,在 pipeline 中仅仅只是添加了一个 ProtocolDetectHandler,那业务处理器呢?

到这里为止,你只需要记住,创建 Http2Detector 和 SeataDetector 时的入参即为 ServerHandler,它是在创建 NettyRemotingServer 时在 AbstractNettyRemotingServer 的构造函数被设置的,其余的先按下不表。

Client 是怎么启动的

对于 Client 来说,由于我们一般是在 SpringBoot 中使用 Seata,所以我们需要关注的点在 SeataAutoConfiguratio 类中。

在这个类里面创建了一个 GlobalTransactionScanner 对象,我们注意到它实现了 InitializingBean,所以将目光转移到 afterPropertiesSet 方法上。

public void afterPropertiesSet() {
    if (disableGlobalTransaction) { // 默认是false
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global transaction is disabled.");
        }
        ConfigurationFactory.getInstance()
                .addConfigListener(
                        ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (CachedConfigurationChangeListener) this);
        return;
    }
    if (initialized.compareAndSet(false, true)) {
        initClient();
    }

    this.findBusinessBeanNamesNeededEnhancement();
}

果然在这个方法里面进行了 TM 和 RM 的初始化。

TM 的初始化

对于 TM 来说,初始化的逻辑如下:

public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
    /**
     * 主要做这么几件事
     * 1. 创建线程池作为 AbstractNettyRemotingClient 的 messageExecutor 消息执行器
     * 2. 设置事务角色 transactionRole 为 TMROLE
     * 3. 创建 Bootstrap 并设置出栈、入栈处理器 ClientHandler 和 ChannelEventHandler
     * 4. 创建客户端 Channel 管理器 NettyClientChannelManager
     */
    TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);

    /**
     * 主要做这么几件事:
     * 1. 注册一系列处理器
     * 2. 启动定时任务定时对事务组内的 Server 发起连接,如果连接断开,则尝试重新建立连接
     * 3. 如果客户端允许报文批量发送,则创建 mergeSendExecutorService,并提交 MergedSendRunnable 任务
     * 4. 启动定时任务每 3 秒清理过期的 MessageFuture
     * 5. 启动客户端 Bootstrap,装载 pipeline 上的处理器
     * 6. 初始化连接 initConnection,就是从注册中心拿到 Server 的地址,发起连接
     */
    tmNettyRemotingClient.init();
}

注册处理器代码如下:

private void registerProcessor() {
    // 1.registry TC response processor
    // 处理 TC Server 响应消息的处理器,即 RM 发出请求到 TC 之后,TC 响应回来的消息处理
    ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(
            mergeMsgMap, super.getFutures(), childToParentMap, getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);
    // 2.registry heartbeat message processor
    // 处理 TC Server 响应消息的处理器,即 TM 发出心跳请求到 TC 之后,TC 响应回来的消息处理
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

启动客户端 Bootstrap 的逻辑如下:

public void start() {
    this.bootstrap
            .group(eventLoopGroupWorker)
            .channel(nettyClientConfig.getClientChannelClazz())
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());

    if (nettyClientConfig.enableNative()) {
        if (PlatformDependent.isOsx()) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("client run on macOS");
            }
        } else {
            bootstrap
                    .option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
                    .option(EpollChannelOption.TCP_QUICKACK, true);
        }
    }

    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) {
            ChannelPipeline pipeline = ch.pipeline();
            if (nettyClientConfig.getProtocol().equals(Protocol.GRPC.value)) {
                pipeline.addLast(Http2FrameCodecBuilder.forClient().build())
                        .addLast(new Http2MultiplexHandler(new ChannelDuplexHandler()));
            } else {
                pipeline.addLast(new IdleStateHandler(
                        nettyClientConfig.getChannelMaxReadIdleSeconds(),
                        nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                        nettyClientConfig.getChannelMaxAllIdleSeconds()));
                pipeline.addLast(new ProtocolDecoderV1()).addLast(new ProtocolEncoderV1());
                if (channelHandlers != null) {
                    addChannelPipelineLast(ch, channelHandlers);
                }
            }
        }
    });

    if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
        LOGGER.info("NettyClientBootstrap has started");
    }
}

由于客户端的协议版本根据不同的 Seata 版本是可以确定的,所以这里直接添加了 V1 版本的编解码器,这里 channelHandlers 其实就是 ClientHandler 和 ChannelEventHandler,它们都是 Netty 中的复合处理器。

RM 的初始化

RM 的初始化大致逻辑和 TM 是类似的,这里只简单看一下注册处理器的部分。

private void registerProcessor() {
    // 1.registry rm client handle branch commit processor
    // 处理 TC Server 发起的分支提交的请求
    RmBranchCommitProcessor rmBranchCommitProcessor =
            new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
    // 2.registry rm client handle branch rollback processor
    // 处理 TC Server 发起的分支回滚的请求
    RmBranchRollbackProcessor rmBranchRollbackProcessor =
            new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
    // 3.registry rm handler undo log processor
    // 处理 TC Server 发起的 UndoLog 删除请求
    RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
    // 4.registry TC response processor
    // 处理 TC Server 响应消息的处理器,即 RM 发出请求到 TC 之后,TC 响应回来的消息处理
    ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(
            mergeMsgMap, super.getFutures(), childToParentMap, getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);
    // 5.registry heartbeat message processor
    // 处理 TC Server 响应消息的处理器,即 RM 发出心跳请求到 TC 之后,TC 响应回来的消息处理
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

如何发送和处理报文

厘清了 Seata Server 和 Client 的大致启动流程之后,我们就可以深入的看一看 Seata 是如何进行报文发送和处理的。

前面我们也说过了,发送请求和处理报文的核心逻辑是在 AbstractNettyRemoting 中,接下来就看一看这个类。

同步和异步

先简单说一说什么是同步和异步。

同步 Synchronous 和异步 Asynchronous,本质上是描述了程序在处理多个事件或者任务时的不同行为模式。

同步是指一个过程必须等待另一个过程完成之后才能继续进行。换句话说,在同步操作中,调用方发出请求后会一直阻塞等待直到接收到响应结果、或者超时才会继续执行后续代码。

相比之下,异步则允许调用者在请求后不必等待响应就可以向下执行,但当请求完成时,会以某种方式将响应通知到调用者(如通过回调函数、Future),异步模型可以提高并发性和效率。

从另一个角度来说,同步调用需要发起调用的线程获取结果,而异步调用则是由异步线程将结果放到某个地方(Future)或者是异步线程去执行事先准备好的调用成功/失败的回调方法(回调函数)。

下面是一个简单的例子,展示了三种调用方式,同步、异步 Future、异步 Callback。

package com.fhj;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Consumer;

// 同步调用接口
interface SyncAdder {
    int add(int a, int b) throws InterruptedException;
}

// 异步 Future 调用接口
interface AsyncFutureAdder {
    CompletableFuture<Integer> addAsync(int a, int b);
}

// 异步 Callback 调用接口
interface AsyncCallbackAdder {
    void addAsync(int a, int b, Consumer<Integer> callback);
}

// 实现类
@Slf4j
class AdderService implements SyncAdder, AsyncFutureAdder, AsyncCallbackAdder {

    private final ExecutorService executor = Executors.newCachedThreadPool();

    // 模拟耗时操作
    private void simulateDelay() throws InterruptedException {
        Thread.sleep(1000); // 1秒延迟
    }

    // 同步实现
    @Override
    public int add(int a, int b) throws InterruptedException {
        log.info("执行计算");
        simulateDelay();
        return a + b;
    }

    // 异步 Future 实现
    @Override
    public CompletableFuture<Integer> addAsync(int a, int b) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                log.info("执行计算");
                simulateDelay();
                return add(a, b);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, executor);
    }

    // 异步 Callback 实现
    @Override
    public void addAsync(int a, int b, Consumer<Integer> callback) {
        executor.execute(() -> {
            try {
                log.info("执行计算");
                simulateDelay();
                int result = add(a, b);
                callback.accept(result);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    public void shutdown() {
        executor.shutdown();
    }
}

@Slf4j
public class Example {
    public static void main(String[] args) throws Exception {
        AdderService adder = new AdderService();

        // 1. 同步调用
        log.info("--- 同步调用开始 ---");
        int syncResult = adder.add(3, 5);
        log.info("同步结果: " + syncResult);
        log.info("主线程继续执行...\n");

        // 2. 异步 Future 调用
        log.info("--- 异步Future调用开始 ---");
        Future<Integer> future = adder.addAsync(10, 20);
        log.info("主线程继续执行...");
        // 阻塞获取结果
        int futureResult = future.get();
        log.info("Future结果: " + futureResult + "\n");

        // 3. 异步 Callback 调用
        log.info("--- 异步Callback调用开始 ---");
        adder.addAsync(100, 200, result -> {
            log.info("Callback结果: " + result);
        });
        log.info("主线程继续执行...");

        // 等待异步操作完成
        Thread.sleep(1500);
        adder.shutdown();
    }
}

结果:

15:46:43.842 [main] INFO com.fhj.Example - --- 同步调用开始 ---
15:46:43.844 [main] INFO com.fhj.AdderService - 执行计算
15:46:44.850 [main] INFO com.fhj.Example - 同步结果: 8
15:46:44.850 [main] INFO com.fhj.Example - 主线程继续执行...

15:46:44.850 [main] INFO com.fhj.Example - --- 异步Future调用开始 ---
15:46:44.857 [pool-1-thread-1] INFO com.fhj.AdderService - 执行计算
15:46:44.857 [main] INFO com.fhj.Example - 主线程继续执行...
15:46:45.863 [pool-1-thread-1] INFO com.fhj.AdderService - 执行计算
15:46:46.865 [main] INFO com.fhj.Example - Future结果: 30

15:46:46.865 [main] INFO com.fhj.Example - --- 异步Callback调用开始 ---
15:46:46.868 [main] INFO com.fhj.Example - 主线程继续执行...
15:46:46.868 [pool-1-thread-1] INFO com.fhj.AdderService - 执行计算
15:46:47.871 [pool-1-thread-1] INFO com.fhj.AdderService - 执行计算
15:46:48.876 [pool-1-thread-1] INFO com.fhj.Example - Callback结果: 300

从结果中,至少可以看出三点,

  • 一是异步 Future 和异步 Callback 并不会阻塞主线程向下执行。
  • 二是异步调用时计算结果的不是主线程,对于异步 Callback 来说,甚至处理结果的都不是主线程。
  • 最后,Future 和 Callback 的区别在于 Future 只是由异步线程将结果存储在了一个地方(CompletableFuture#result),但是后续获取结果还是需要主线程(或者其他线程)调用 get 方法,而 Callback 的话,其实就相当于预先设定了结果的处理方式,由异步线程去执行就好了。

当然,CompletableFuture 也是可以作回调的,比如调用 whenComplete 方法。

异步调用

Netty 作为一个高性能的异步 IO 框架,它的设计核心就是异步的,所以基于 Netty 进行异步调用是比较简单的。

// org.apache.seata.core.rpc.netty.AbstractNettyRemoting#sendAsync
protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
    if (MsgVersionHelper.versionNotSupport(channel, rpcMessage)) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(
                    "Message sending will be skipped as the client version does not support it,{}", rpcMessage);
        }
        return;
    }
    channelWritableCheck(channel, rpcMessage.getBody());
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
                + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
    }

    doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);

    channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
        if (!future.isSuccess()) {
            destroyChannel(future.channel());
        }
    });
}

只需要简单调用 channel 的 writeAndFlush 方法即可实现异步调用。

特别要注意的是,writeAndFlush 方法在调用线程是 EventLoop 线程的情况下会变成同步调用,核心的点在于 writeAndFlush 的行为:

EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
    // 同步执行
    if (flush) {
        next.invokeWriteAndFlush(m, promise);
    } else {
        next.invokeWrite(m, promise);
    }
} else {
    // 提交写任务
    final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
    if (!safeExecute(executor, task, promise, m, !flush)) {
        // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
        // and put it back in the Recycler for re-use later.
        //
        // See https://github.com/netty/netty/issues/8343.
        task.cancel();
    }
}

我这里再给出一段用于测试的代码:

package com.fhj;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyWriteBehaviorDemo {

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup(1);
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
                                    log.debug("收到服务器响应: {}", msg);
                                }

                                @Override
                                public void channelActive(ChannelHandlerContext ctx) {
                                    log.debug("通道已激活");
                                }
                            });
                        }
                    });

            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();

            // 场景1:在 EventLoop 线程中调用(同步)
            log.info("\n=== 在 EventLoop 线程中调用 ===");
            channel.eventLoop().execute(() -> {
                log.info("调用线程: {}", Thread.currentThread().getName());

                long start = System.nanoTime();
                ChannelFuture future = channel.writeAndFlush(
                        Unpooled.copiedBuffer("EventLoop线程写入", CharsetUtil.UTF_8));

                long duration = (System.nanoTime() - start) / 1000;
                log.info("写入耗时: {} 微秒", duration);
                log.info("操作是否完成: {}", future.isDone());

                // 添加监听器验证完成状态
                future.addListener(f ->
                        log.info("监听器触发线程: {}", Thread.currentThread().getName())
                );
            });

            // 等待第一个场景执行完成
            Thread.sleep(500);

            // 场景2:在非 EventLoop 线程中调用(异步)
            log.info("\n=== 在非 EventLoop 线程中调用 ===");
            new Thread(() -> {
                log.info("调用线程: {}", Thread.currentThread().getName());

                long start = System.nanoTime();
                ChannelFuture future = channel.writeAndFlush(
                        Unpooled.copiedBuffer("非EventLoop线程写入", CharsetUtil.UTF_8));

                long duration = (System.nanoTime() - start) / 1000;
                log.info("写入耗时: {} 微秒", duration);
                log.info("操作是否完成: {}", future.isDone());

                // 添加监听器确认异步完成
                future.addListener(f ->
                        log.info("异步完成确认线程: {}", Thread.currentThread().getName())
                );
            }, "BusinessThread-1").start();

            Thread.sleep(1500); // 等待操作完成
        } finally {
            group.shutdownGracefully().sync();
            log.info("Netty客户端已关闭");
        }
    }
}

同步调用

在 Netty 中实现异步调用很简单,要实现同步调用就麻烦一点,需要将异步调用转换为同步调用。

从本质上来说,异步转同步就是让调用线程发起调用后,在拿到响应前进入阻塞,拿到响应后再唤醒,向下执行。

那么 Seata 的处理的核心就是 MessageFuture 类,如下:

package org.apache.seata.core.protocol;

import org.apache.seata.common.exception.ShouldNeverHappenException;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class MessageFuture {
    private RpcMessage requestMessage;
    private long timeout;
    private long start = System.currentTimeMillis();
    private transient CompletableFuture<Object> origin = new CompletableFuture<>();

    public boolean isTimeout() {
        return System.currentTimeMillis() - start > timeout;
    }

    public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException {
        Object result = null;
        try {
            result = origin.get(timeout, unit);
            if (result instanceof TimeoutException) {
                throw (TimeoutException) result;
            }
        } catch (ExecutionException e) {
            throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);
        } catch (TimeoutException e) {
            throw new TimeoutException(
                    String.format("%s ,cost: %d ms", e.getMessage(), System.currentTimeMillis() - start));
        }
        if (result instanceof RuntimeException) {
            throw (RuntimeException) result;
        } else if (result instanceof Throwable) {
            throw new RuntimeException((Throwable) result);
        }
        return result;
    }
    
    public void setResultMessage(Object obj) {
        origin.complete(obj);
    }
}

有了这个 MessageFuture 之后,我们再来看 sendSync 的基础实现:

protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
    if (timeoutMillis <= 0) {
        throw new FrameworkException("timeout should more than 0ms");
    }
    if (channel == null) {
        LOGGER.warn("sendSync nothing, caused by null channel.");
        return null;
    }
    if (MsgVersionHelper.versionNotSupport(channel, rpcMessage)) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(
                    "Message sending will be skipped as the client version does not support it,{}", rpcMessage);
        }
        return new VersionNotSupportMessage();
    }
    
    MessageFuture messageFuture = new MessageFuture();
    messageFuture.setRequestMessage(rpcMessage);
    messageFuture.setTimeout(timeoutMillis);
    futures.put(rpcMessage.getId(), messageFuture);
    
    // 检查该 Channel 是否可写(Channel 中有写缓冲区,如果缓冲区达到写阈值水位,则不可写)
    channelWritableCheck(channel, rpcMessage.getBody());
    String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
    doBeforeRpcHooks(remoteAddr, rpcMessage);
    
    // 发送结果,并设置回调,这里还是非阻塞、异步的
    channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
        if (!future.isSuccess()) {
            MessageFuture mf = futures.remove(rpcMessage.getId());
            if (mf != null) {
                mf.setResultMessage(future.cause());
            }
            destroyChannel(future.channel());
        }
    });
    try {
        // 在这里等待结果,是异步转为同步的关键
        Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
        doAfterRpcHooks(remoteAddr, rpcMessage, result);
        return result;
    } catch (Exception exx) {
        LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());
        if (exx instanceof TimeoutException) {
            throw (TimeoutException) exx;
        } else {
            throw new RuntimeException(exx);
        }
    }
}

在该方法中,还有一个关键的属性:futures

/**
 * Obtain the return result through MessageFuture blocking.
 *
 * @see AbstractNettyRemoting#sendSync
 */
protected final ConcurrentHashMap<Integer, MessageFuture> futures = new ConcurrentHashMap<>();

它用于存储尚未完成的 RPC 请求,这个 map 的 key 是用于唯一标识每一个请求/响应的 id。

基于以上铺垫,那么同步调用的过程如下,我们以服务端请求、客户端响应为例:

  • 首先服务端将 RPC 请求构建为 MessageFuture,并生成唯一请求 id,然后将请求 id 和该 MessageFuture 存储到 futures 中。
  • 接着服务端调用 channel.writeAndFlush 发起异步调用,是的,这里还是异步。
  • 异步转同步的核心在于,此时服务端线程需要调用 MessageFuture.get() 进入阻塞,当然实际是调用了 CompletableFuture.get() 进入阻塞。
  • 当客户端处理请求完毕之后,它就会构建然后发出响应,该条响应的唯一 id 一定和请求 id 是相同的。
  • 当服务端收到响应之后,就会由某个线程将响应结果设置到 MessageFuture 中,由于一次请求和响应的 id 是相同的,所以可以从上面的哈希表中拿到相应的 MessageFuture 对象。
  • 当响应结果被设置之后,最初阻塞在 get 方法的线程就可以恢复运行,这样就实现了同步的效果。

所以,Seata 的解决方案本质上来说就是利用了 CompletableFuture 对象,将它作为一个存储结果的容器。

此外,服务端处理客户端响应的逻辑在 ServerOnResponseProcessor 类中,执行的线程是 branchResultMessageExecutor 中的线程。

为什么我会肯定一次 RPC 的请求和响应的唯一 id 一定是相同的呢?首先逻辑上一定是这样,不然怎么设置结果到 MessageFuture 呢,其次就是构建请求和响应的代码如下:

protected RpcMessage buildRequestMessage(Object msg, byte messageType) {
    RpcMessage rpcMessage = new RpcMessage();
    rpcMessage.setId(getNextMessageId());
    rpcMessage.setMessageType(messageType);
    rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC);
    rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR);
    rpcMessage.setBody(msg);
    return rpcMessage;
}

protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byte messageType) {
    RpcMessage rpcMsg = new RpcMessage();
    rpcMsg.setMessageType(messageType);
    rpcMsg.setCodec(rpcMessage.getCodec());
    rpcMsg.setCompressor(rpcMessage.getCompressor());
    rpcMsg.setBody(msg);
    rpcMsg.setId(rpcMessage.getId()); // 一次 RPC 的请求和响应的唯一 id 一定是相同
    return rpcMsg;
}

报文处理

在 Netty 中,提到报文处理,我们首先应该想到的就是入栈、出栈处理器。

在 Seata Server 端,除了常见的编解码处理器之外,就是 ServerHandler 处理器了,如下:

// org.apache.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler
@ChannelHandler.Sharable
class ServerHandler extends ChannelDuplexHandler {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        // 前置了解码处理器,所以这里的消息是 RpcMessage
        if (msg instanceof RpcMessage) {
            processMessage(ctx, (RpcMessage) msg);
        } else {
            LOGGER.error("rpcMessage type error");
        }
    }

    // ...
}

比较有业务含义的就是这个 channelRead 方法,所有请求至 Server 的报文在经过解码之后都会来到这个方法。

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    Object body = rpcMessage.getBody();
    RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
    // If the client is not version 2.3.0 or higher, splitting MergedWarpMessage will result in the client’s
    // mergeMsgMap not being cleared
    if (body instanceof MergedWarpMessage
            && (StringUtils.isNotBlank(rpcContext.getVersion())
                    && Version.isAboveOrEqualVersion230(rpcContext.getVersion()))) {
        MergedWarpMessage mergedWarpMessage = (MergedWarpMessage) body;
        for (int i = 0; i < mergedWarpMessage.msgs.size(); i++) {
            RpcMessage rpcMsg =
                    buildRequestMessage(mergedWarpMessage.msgs.get(i), rpcMessage, mergedWarpMessage.msgIds.get(i));
            super.processMessage(ctx, rpcMsg);
        }
    } else {
        super.processMessage(ctx, rpcMessage);
    }
}

这里的 super.processMessage 方法就是 AbstractNettyRemoting 中的业务处理方法,如下:

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
    }
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        // 在 Server 启动的时候,向 processorTable 注册了一大堆处理器,这里通过消息 Code 拿到对应的处理器和执行器
        final Pair<RemotingProcessor, ExecutorService> pair =
                this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            // 拿到对应的线程池执行
            if (pair.getSecond() != null) {
                try {
                    pair.getSecond().execute(() -> {
                        try {
                            // 找对应的处理器执行
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        } finally {
                            MDC.clear();
                        }
                    });
                } catch (RejectedExecutionException e) {
                    LOGGER.error(
                            FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                            "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                    if (allowDumpStack) {
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@")[0];
                        long idx = System.currentTimeMillis();
                        try {
                            String jstackFile = idx + ".log";
                            LOGGER.info("jstack command will dump to " + jstackFile);
                            Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                        } catch (IOException exx) {
                            LOGGER.error(exx.getMessage());
                        }
                        allowDumpStack = false;
                    }
                }
            } else {
                try {
                    // 如果没有为处理器配置线程池,则由当前线程执行,基本上就是 EventLoop 线程了
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                }
            }
        } else {
            LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
        }
    } else {
        LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
    }
}

这个方法的逻辑很简单。

Seata 在 Server 启动的过程中,向 processorTable 注册了一大堆处理器,那么这里就可以根据消息 Code 拿到对应的处理器和线程池。

如果有线程池,就在线程池内执行处理器的方法,否则就交给 EventLoop 线程去执行。

当然,对于 Client 而言,也是这样的。

批量发送

在网络程序中,有时候也需要实现批量发送,在 Seata 中,目前只有客户端批量发送请求到服务端的场景。

还记得我们上面在 Client 启动的过程中提到过一个线程池 mergeSendExecutorService,如果允许批量发送,那么在 Client 启动的时候就会提交一个 MergedSendRunnable 任务,我们先来看这个任务在干啥?

private class MergedSendRunnable implements Runnable {

    @Override
    public void run() {
        while (true) {
            synchronized (mergeLock) {
                try {
                    mergeLock.wait(MAX_MERGE_SEND_MILLS); // 解决 CPU 高负载问题
                } catch (InterruptedException ignore) {
                    // ignore
                }
            }
            isSending = true;
            // basketMap: key 是 serverAddress,value 是发向该 serverAddress 的报文队列(阻塞队列)
            basketMap.forEach((address, basket) -> {
                if (basket.isEmpty()) {
                    return;
                }
                MergedWarpMessage mergeMessage = new MergedWarpMessage();
                while (!basket.isEmpty()) {
                    // 将同一个阻塞队列中所有 RpcMessage 进行合并
                    RpcMessage msg = basket.poll();
                    mergeMessage.msgs.add((AbstractMessage) msg.getBody());
                    mergeMessage.msgIds.add(msg.getId());
                }
                if (mergeMessage.msgIds.size() > 1) {
                    printMergeMessageLog(mergeMessage);
                }
                Channel sendChannel = null;
                try {
                    sendChannel = clientChannelManager.acquireChannel(address);
                    // 内部将 mergeMessage 封装为一个普通的 RpcMessage 发送
                    AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
                } catch (FrameworkException e) {
                    if (e.getErrorCode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
                        destroyChannel(address, sendChannel);
                    }
                    // fast fail
                    for (Integer msgId : mergeMessage.msgIds) {
                        MessageFuture messageFuture = futures.remove(msgId);
                        if (messageFuture != null) {
                            messageFuture.setResultMessage(new RuntimeException(String.format("%s is unreachable", address), e));
                        }
                    }
                    LOGGER.error("client merge call failed: {}", e.getMessage(), e);
                }
            });
            isSending = false;
        }
    }
}

那么,与之相关的批量发送代码如下:

public Object sendSyncRequest(Object msg) throws TimeoutException {
    String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
    long timeoutMillis = this.getRpcRequestTimeout();
    RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);

    // send batch message
    // put message into basketMap, @see MergedSendRunnable
    if (this.isEnableClientBatchSendRequest()) {

        // send batch message is sync request, needs to create messageFuture and put it in futures.
        MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeoutMillis);
        futures.put(rpcMessage.getId(), messageFuture);

        // put message into basketMap
        BlockingQueue<RpcMessage> basket =
                CollectionUtils.computeIfAbsent(basketMap, serverAddress, key -> new LinkedBlockingQueue<>());
        // 将 rpcMessage 添加到队列中,等待 mergeSendExecutorService 进行实际的发送
        if (!basket.offer(rpcMessage)) {
            LOGGER.error(
                    "put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
                    serverAddress,
                    rpcMessage);
            return null;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("offer message: {}", rpcMessage.getBody());
        }
        
        if (!isSending) {
            // 保证队列中一有数据,就唤醒线程,进行批量发送
            synchronized (mergeLock) {
                mergeLock.notifyAll();
            }
        }

        try {
            Object response = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            return response;
        } catch (Exception exx) {
            LOGGER.error(
                    "wait response error:{},ip:{},request:{}",
                    exx.getMessage(),
                    serverAddress,
                    rpcMessage.getBody());
            if (exx instanceof TimeoutException) {
                throw (TimeoutException) exx;
            } else {
                throw new RuntimeException(exx);
            }
        }
    } else {
        // 普通发送,拿到 channel 调父类的 sendSync 方法即可
        Channel channel = clientChannelManager.acquireChannel(serverAddress);
        return super.sendSync(channel, rpcMessage, timeoutMillis);
    }
}

可以看到,这里面也用到了对象锁的同步-等待机制,那么实现的效果就是:

  1. 最多隔 1ms 会遍历 basketMap 进行报文发送。
  2. 在 mergeSendExecutorService 内部的线程阻塞期间(mainLock.wait),如果来了需要发送的报文,那么会唤醒 mainLock 上的线程,继续进行发送。

那 Server 是怎么处理的呢?主要看 MergedWarpMessage 报文的 TypeCode,实际上就是 TYPE_SEATA_MERGE,再看 Server 启动的时候对这个 Code 注册哪个处理器,实际上就是 ServerOnRequestProcessor。

在 ServerOnRequestProcessor 这边,实际上对应了两种处理 MergedWarpMessage 报文的方式:

  1. MergedWarpMessage 中的所有独立请求全部处理完毕之后,统一发送 MergeResultMessage。
  2. 由 batchResponseExecutorService 线程池处理发送任务,可以保证两点,一是当有报文结果就响应,即使线程 wait,也会将它 notify,二是至少 1ms 会响应一次,因为 batchResponseExecutorService 中执行的线程最多 wait 1ms。这个 1ms 我本来以为是要保证响应的及时性,后面看了代码提交记录才发现这是为了解决 CPU 高负载的问题。

注意,这两种方式响应的报文类型是不同的,第一种响应的是 MergeResultMessage,第二种是 BatchResultMessage,在 Client 也会有不同的处理。

ServerOnRequestProcessor 中核心处理方法如下:

private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
    Object message = rpcMessage.getBody();
    RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
    if (!(message instanceof AbstractMessage)) {
        LOGGER.error("unrecognized message:{}", message);
        return;
    }
    // the batch send request message
    //  允许 TcServer 批量发送响应 && 客户端版本号 >= 1.5.0
    if (message instanceof MergedWarpMessage) {
        if (NettyServerConfig.isEnableTcServerBatchSendResponse()
                && StringUtils.isNotBlank(rpcContext.getVersion())
                && Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
            // 由 batchResponseExecutorService 单独处理,无需等到批量请求全部处理完毕
            List<AbstractMessage> msgs = ((MergedWarpMessage) message).msgs;
            List<Integer> msgIds = ((MergedWarpMessage) message).msgIds;
            for (int i = 0; i < msgs.size(); i++) {
                AbstractMessage msg = msgs.get(i);
                int msgId = msgIds.get(i);
                if (PARALLEL_REQUEST_HANDLE) {
                    CompletableFuture.runAsync(
                            () -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
                } else {
                    handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);
                }
            }
        } else {
            // 每个请求都处理完毕,才能向客户端发出响应
            List<AbstractResultMessage> results = new ArrayList<>();
            List<CompletableFuture<AbstractResultMessage>> completableFutures = null;
            for (int i = 0; i < ((MergedWarpMessage) message).msgs.size(); i++) {
                if (PARALLEL_REQUEST_HANDLE) {
                    if (completableFutures == null) {
                        completableFutures = new ArrayList<>();
                    }
                    int finalI = i;
                    completableFutures.add(CompletableFuture.supplyAsync(() -> handleRequestsByMergedWarpMessage(
                            ((MergedWarpMessage) message).msgs.get(finalI), rpcContext)));
                } else {
                    results.add(
                            i,
                            handleRequestsByMergedWarpMessage(
                                    ((MergedWarpMessage) message).msgs.get(i), rpcContext));
                }
            }
            if (CollectionUtils.isNotEmpty(completableFutures)) {
                try {
                    for (CompletableFuture<AbstractResultMessage> completableFuture : completableFutures) {
                        // 需要等待每个请求处理完毕,在PARALLEL_REQUEST_HANDLE的情况下才有
                        results.add(completableFuture.get());
                    }
                } catch (InterruptedException | ExecutionException e) {
                    LOGGER.error("handle request error: {}", e.getMessage(), e);
                }
            }
            MergeResultMessage resultMessage = new MergeResultMessage();
            resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
            remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
        }
    } else {
        // ...
    }
}

而 handleRequestsByMergedWarpMessage 和 handleRequestsByMergedWarpMessageBy150 的区别就在于后者会将结果封装为 QueueItem 加入阻塞队列由 batchResponseExecutorService 中的线程进行实际的发送,而前者仅仅是返回处理的结果。

private AbstractResultMessage handleRequestsByMergedWarpMessage(AbstractMessage subMessage, RpcContext rpcContext) {
    if (LOGGER.isInfoEnabled()) {
        String receiveMsgLog = String.format(
                "receive msg[merged]: %s, clientIp: %s, vgroup: %s",
                subMessage,
                NetUtil.toIpAddress(rpcContext.getChannel().remoteAddress()),
                rpcContext.getTransactionServiceGroup());
        BatchLogHandler.INSTANCE.writeLog(receiveMsgLog);
    }
    AbstractResultMessage resultMessage = transactionMessageHandler.onRequest(subMessage, rpcContext);
    if (LOGGER.isInfoEnabled()) {
        String resultMsgLog = String.format(
                "result msg[merged]: %s, clientIp: %s, vgroup: %s",
                resultMessage,
                NetUtil.toIpAddress(rpcContext.getChannel().remoteAddress()),
                rpcContext.getTransactionServiceGroup());
        BatchLogHandler.INSTANCE.writeLog(resultMsgLog);
    }
    return resultMessage;
}

private void handleRequestsByMergedWarpMessageBy150(AbstractMessage msg, int msgId, RpcMessage rpcMessage,
                                                    ChannelHandlerContext ctx, RpcContext rpcContext) {
    AbstractResultMessage resultMessage = transactionMessageHandler.onRequest(msg, rpcContext);
    BlockingQueue<QueueItem> msgQueue = CollectionUtils.computeIfAbsent(basketMap, ctx.channel(), key -> new LinkedBlockingQueue<>());
    if (!msgQueue.offer(new QueueItem(resultMessage, msgId, rpcMessage))) {
        LOGGER.error("put message into basketMap offer failed, channel: {}, rpcMessage: {}, resultMessage: {}", ctx.channel(), rpcMessage, resultMessage);
    }
    if (!isResponding) {
        // 保证队列中一有数据,就唤醒线程,进行批量发送
        synchronized (batchResponseLock) {
            batchResponseLock.notifyAll();
        }
    }
}

private void handleRequestsByMergedWarpMessageBy150(
        AbstractMessage msg, int msgId, RpcMessage rpcMessage, ChannelHandlerContext ctx, RpcContext rpcContext) {
    if (LOGGER.isInfoEnabled()) {
        String receiveMsgLog = String.format(
                "receive msg[merged]: %s, clientIp: %s, vgroup: %s",
                msg, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
        BatchLogHandler.INSTANCE.writeLog(receiveMsgLog);
    }
    AbstractResultMessage resultMessage = transactionMessageHandler.onRequest(msg, rpcContext);
    // 拿到 channel 对应的发送队列
    BlockingQueue<QueueItem> msgQueue = computeIfAbsentMsgQueue(ctx.channel());
    // 将结果添加到队列中,等待 batchResponseExecutorService 线程池实际进行发送
    offerMsg(msgQueue, rpcMessage, resultMessage, msgId, ctx.channel());
    notifyBatchRespondingThread();
    if (LOGGER.isInfoEnabled()) {
        String resultMsgLog = String.format(
                "result msg[merged]: %s, clientIp: %s, vgroup: %s",
                resultMessage,
                NetUtil.toIpAddress(ctx.channel().remoteAddress()),
                rpcContext.getTransactionServiceGroup());
        BatchLogHandler.INSTANCE.writeLog(resultMsgLog);
    }
}

再来看 batchResponseExecutorService 线程池是怎么处理批量发送的任务的?

private class BatchResponseRunnable implements Runnable {
    @Override
    public void run() {
        while (true) {
            synchronized (batchResponseLock) {
                try {
                    batchResponseLock.wait(MAX_BATCH_RESPONSE_MILLS);
                } catch (InterruptedException e) {
                    LOGGER.error("BatchResponseRunnable Interrupted error", e);
                }
            }
            isResponding = true;
            basketMap.forEach((channel, msgQueue) -> {
                if (msgQueue.isEmpty()) {
                    return;
                }
                // 将队列中的响应封装为 BatchResultMessage,但是注意并不是将所有的响应报文一次发送出去
                // 需要按照 [serialization,compressor,rpcMessageId,headMap] 进行分组,然后按组进行异步发送
                Map<ClientRequestRpcInfo, BatchResultMessage> batchResultMessageMap = new HashMap<>();
                while (!msgQueue.isEmpty()) {
                    QueueItem item = msgQueue.poll();
                    BatchResultMessage batchResultMessage = CollectionUtils.computeIfAbsent(
                            batchResultMessageMap,
                            new ClientRequestRpcInfo(item.getRpcMessage()),
                            key -> new BatchResultMessage());
                    batchResultMessage.getResultMessages().add(item.getResultMessage());
                    batchResultMessage.getMsgIds().add(item.getMsgId());
                }
                batchResultMessageMap.forEach(
                        (clientRequestRpcInfo, batchResultMessage) -> remotingServer.sendAsyncResponse(
                                buildRpcMessage(clientRequestRpcInfo), channel, batchResultMessage));
            });
            isResponding = false;
        }
    }
}

最后我们来看 Client 这边是怎么处理 Server 发出的批量响应报文的,MergeResultMessage 的类型 Code 是 TYPE_SEATA_MERGE_RESULT,而 BatchResultMessage 的类型 Code 是 TYPE_BATCH_RESULT_MSG,但是处理这两个消息的处理器都是 ClientOnResponseProcessor,如下:

public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (rpcMessage.getBody() instanceof MergeResultMessage) {
        // 处理 MergeResultMessage
        MergeResultMessage results = (MergeResultMessage) rpcMessage.getBody();
        MergedWarpMessage mergeMessage = (MergedWarpMessage) mergeMsgMap.remove(rpcMessage.getId());
        for (int i = 0; i < mergeMessage.msgs.size(); i++) {
            int msgId = mergeMessage.msgIds.get(i);
            MessageFuture future = futures.remove(msgId);
            // The old version of the server will return MergeResultMessage, so it is necessary to remove the msgId
            // from the childToParentMap.
            childToParentMap.remove(msgId);
            if (future == null) {
                LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, results.getMsgs()[i]);
            } else {
                future.setResultMessage(results.getMsgs()[i]);
            }
        }
    } else if (rpcMessage.getBody() instanceof BatchResultMessage) {
        // 处理 BatchResultMessage
        BatchResultMessage batchResultMessage = (BatchResultMessage) rpcMessage.getBody();
        for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) {
            int msgId = batchResultMessage.getMsgIds().get(i);
            MessageFuture future = futures.remove(msgId);
            // The old version of the server will return BatchResultMessage, so it is necessary to remove the msgId
            // from the childToParentMap.
            Integer parentId = childToParentMap.remove(msgId);
            if (parentId != null) {
                mergeMsgMap.remove(parentId);
            }
            if (future == null) {
                LOGGER.error(
                        "msg: {} is not found in futures, result message: {}",
                        msgId,
                        batchResultMessage.getResultMessages().get(i));
            } else {
                future.setResultMessage(
                        batchResultMessage.getResultMessages().get(i));
            }
        }
    } else {
        // 非批量响应报文处理
        Integer id = rpcMessage.getId();
        try {
            MessageFuture messageFuture = futures.remove(id);
            if (messageFuture != null) {
                messageFuture.setResultMessage(rpcMessage.getBody());
            } else {
                if (rpcMessage.getBody() instanceof AbstractResultMessage) {
                    if (transactionMessageHandler != null) {
                        transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);
                    }
                }
            }
        } finally {
            // In version 2.3.0, the server does not return MergeResultMessage and BatchResultMessage
            // so it is necessary to clear childToParentMap and mergeMsgMap here.
            Integer parentId = childToParentMap.remove(id);
            if (parentId != null) {
                mergeMsgMap.remove(parentId);
            }
        }
    }
}

当然,这里处理的逻辑很简单,就是将结果塞到对应的 MessageFuture 中,那么最开始发送请求的、阻塞的线程就可以拿到结果了,这样一次批量发送和响应就算处理完毕了。

最后,我们再做一些额外的思考,Seata 的批量发送为什么有两种方式,孰优孰劣?

对于 MergeResultMessage 的这种方式来说,它必须等到所有的报文都处理完毕之后才会发送出去,所以其实它的响应速度受限于处理最长时间的报文,即使其他报文在很短时间内就可以发送出去。

而 BatchResultMessage 这种方式则不然,配置 CompletableFuture 进行并行处理,它就可以实现一有报文处理完毕就发送,而不需要等其他报文的处理,它的响应速度肯定是更快的。

而后面这种方式是 Seata 1.5 版本之后才有的,其实也可以看出来这是一种更好地处理方式。

如何管理 Channel

在整个 TC、TM、RM 的网络通信的过程中,Channel 是一个至关重要的通信组件,而要想知道 Seata 是怎么管理 Channel 的,最容易想到的入口就是看 Server 和 Client 发送报文时是从哪里拿到到 Channel 的。

在 AbstractNettyRemotingClient 类的 sendSyncRequest 中,我们可以看到下面的代码:

public Object sendSyncRequest(Object msg) throws TimeoutException {
    // ...
    // Client 通过 NettyClientChannelManager 获取 Channel
    Channel channel = clientChannelManager.acquireChannel(serverAddress);
    return super.sendSync(channel, rpcMessage, timeoutMillis);
}

而在 AbstractNettyRemotingServer 类的 sendSyncRequest 中,我们可以看到下面的代码:

public Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp) throws TimeoutException {
    // Server 通过 ChannelManager 拿到 Channel
    Channel channel = ChannelManager.getChannel(resourceId, clientId, tryOtherApp);
    if (channel == null) {
        throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);
    }
    RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
    return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
}

所以在客户端主要是通过 NettyClientChannelManager 中获取 Channel,而服务端则是根据 resourceId 和 clientId 从 ChannelManager 中获取 Channel。

所以下面我们主要研究的就是这两个类,以及相关的一些逻辑。

Client Channel

我们先来看 Client 这边是怎么管理 Channel 的,核心类是 NettyClientChannelManager。

先简单看一下这个类的属性,

// serverAddress -> lock
private final ConcurrentMap<String, Object> channelLocks = new ConcurrentHashMap<>();
// serverAddress -> NettyPoolKey
private final ConcurrentMap<String, NettyPoolKey> poolKeyMap = new ConcurrentHashMap<>();
// serverAddress -> Channel
private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();
// 对象池,NettyPoolKey -> Channel
private final GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool;
// 函数式接口,封装了通过 serverAddress 获取 NettyPoolKey 的逻辑
private final Function<String, NettyPoolKey> poolKeyFunction;

对象池的核心类

Seata 使用了 GenericKeyedObjectPool 作为管理 Channel 的对象池。

GenericKeyedObjectPool 作为 Apache Commons Pool 库中的一个实现,它主要用于管理一组对象池,每个对象通过唯一的 Key 进行区分,可以支持多类型的对象池化需求。

在使用 GenericKeyedObjectPool 时,通常还需要配置 KeyedPoolableObjectFactory 工厂,这个工厂定义了如何创建、验证、激活、钝化以及销毁池中的对象。

当 GenericKeyedObjectPool 需要创建对象时会调用 KeyedPoolableObjectFactory 工厂的 makeObject 方法,当需要销毁时会调用 destroyObject 方法进行销毁 ……

如何池化 Channel

被池化的对象就是 Channel,而对应的 Key 是 NettyPoolKey,如下:

public class NettyPoolKey {

    private TransactionRole transactionRole;
    private String address;
    private AbstractMessage message;

    // ...
}

在 NettyPoolKey 中,维护了三个信息,事务角色(TMROLE、RMROLE、SERVERROLE),目的 TC Server 地址,以及在 Client 连接 Server 时发送的 RPC 报文。

如何创建这个 NettyPoolKey 呢?在 Seata 中,客户端其实是有两种角色的,TM 和 RM,创建的逻辑肯定是不一样的,所以,Seata 在 AbstractNettyRemotingClient 中抽象了一个方法,它的返回值是一个函数式接口,这个函数式接口就封装了根据 serverAddress 创建 NettyPoolKey 的逻辑。

// org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#getPoolKeyFunction
protected abstract Function<String, NettyPoolKey> getPoolKeyFunction();

比如在 TM 中的实现是:

protected Function<String, NettyPoolKey> getPoolKeyFunction() {
    return severAddress -> {
        RegisterTMRequest message = new RegisterTMRequest(applicationId, transactionServiceGroup, getExtraData());
        return new NettyPoolKey(NettyPoolKey.TransactionRole.TM_ROLE, severAddress, message);
    };
}

而在 RM 中的实现是:

protected Function<String, NettyPoolKey> getPoolKeyFunction() {
    return serverAddress -> {
        String resourceIds = getMergedResourceKeys();
        if (resourceIds != null && LOGGER.isInfoEnabled()) {
            LOGGER.info("RM will register: {}", resourceIds);
        }
        RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);
        message.setResourceIds(resourceIds);
        return new NettyPoolKey(NettyPoolKey.TransactionRole.RM_ROLE, serverAddress, message);
    };
}

从这里就可以看到,TM 在连接 Server 后发送的报文是 RegisterTMRequest,而 RM 是 RegisterRMRequest。

那这个函数式接口在什么时候被调用呢,后面再看。

我们前面也说到了,一个对象池,会配备对应的对象创建工厂 KeyedPoolableObjectFactory,在 Seata 中,以 NettyPoolableFactory 继承 KeyedPoolableObjectFactory 来实现。

/**
 * Netty Channel 创建工厂,通过 NettyPoolKey 创建 Channel,该类的方法必须是线程安全的
 */
public class NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> {

    // ...
    
    /**
     * 需要一个新的实例则调用该方法
     */
    @Override
    public Channel makeObject(NettyPoolKey key) {
        InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
        // 创建 Channel,本质上就是通过 bootstrap.connect 连接到 Seata Server 返回 Channel
        Channel tmpChannel = clientBootstrap.getNewChannel(address);
        long start = System.currentTimeMillis();
        Object response;
        Channel channelToServer = null;
        if (key.getMessage() == null) {
            throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());
        }
        try {
            // 发送同步请求,TM 就是 RegisterTMRequest,RM 就是 RegisterRMRequest
            response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());
            // 根据 response 判断是否注册成功
            if (!isRegisterSuccess(response, key.getTransactionRole())) {
                rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
            } else {
                // 注册成功
                channelToServer = tmpChannel;
                // 将 serverAddress 作为 key,Channel 作为 value,添加到 NettyClientChannelManager.channels 中
                // 如果是 RM 可能还需要向 Server 注册 resources
                rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());
            }
        } catch (Exception exx) {
            if (tmpChannel != null) {
                tmpChannel.close();
            }
            throw new FrameworkException("register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());
        }
        return channelToServer;
    }

    // ...

    @Override
    public void destroyObject(NettyPoolKey key, Channel channel) throws Exception {
        if (channel != null) {
            channel.disconnect();
            channel.close();
        }
    }

    /**
     * 需要借用对象时会调用该方法校验对象有效性(可选)
     */
    @Override
    public boolean validateObject(NettyPoolKey key, Channel obj) {
        if (obj != null && obj.isActive()) {
            return true;
        }
        return false;
    }

    /**
     * 需要借用对象时会调用该方法激活对象
     */
    @Override
    public void activateObject(NettyPoolKey key, Channel obj) throws Exception {}

    /**
     * 归还对象时会调用该方法钝化对象
     */
    @Override
    public void passivateObject(NettyPoolKey key, Channel obj) throws Exception {}
}

获取 Channel

在整个 Seata 客户端,有三个口径可以获取 Channel,即初始化、定时重连,发送报文时获取 Channel。

// 口径一
private void initConnection() {
    boolean failFast = ConfigurationFactory.getInstance()
            .getBoolean(
                    ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
                    DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST);
    getClientChannelManager().initReconnect(transactionServiceGroup, failFast);
}

// 口径二
public void init() {
    // 默认延时 60s 定时 10s 周期重连
    timerExecutor.scheduleAtFixedRate(
            () -> {
                try {
                    clientChannelManager.reconnect(getTransactionServiceGroup());
                } catch (Exception ex) {
                    LOGGER.warn("reconnect server failed. {}", ex.getMessage());
                }
            },
            SCHEDULE_DELAY_MILLS,
            SCHEDULE_INTERVAL_MILLS,
            TimeUnit.MILLISECONDS);
    // ...
}

// 口径三
public Object sendSyncRequest(Object msg) throws TimeoutException {
    // ...
    // Client 通过 NettyClientChannelManager 获取 Channel
    Channel channel = clientChannelManager.acquireChannel(serverAddress);
    return super.sendSync(channel, rpcMessage, timeoutMillis);
}

不过,这三个口径最后都会调用到 clientChannelManager 的 acquireChannel 方法获取 Channel。

/**
 * 根据 serverAddress 拿到 Channel,如果 Channel 不存在或者连接已死则需要重新建立连接
 */
Channel acquireChannel(String serverAddress) {
    // 从 channels 中根据 serverAddress 拿到 Channel
    Channel channelToServer = channels.get(serverAddress);
    if (channelToServer != null) {
        channelToServer = getExistAliveChannel(channelToServer, serverAddress);
        if (channelToServer != null) {
            return channelToServer;
        }
    }
    // 如果 channels 没有这个 Channel 或者这个 Channel 已断开,则需要对这个地址建立连接
    Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());
    synchronized (lockObj) {
        // 建立连接
        return doConnect(serverAddress);
    }
}

private Channel doConnect(String serverAddress) {
    Channel channelToServer = channels.get(serverAddress);
    if (channelToServer != null && channelToServer.isActive()) {
        return channelToServer;
    }
    Channel channelFromPool;
    try {
        // 这里就调用了函数式接口
        NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
        poolKeyMap.put(serverAddress, currentPoolKey);
        // 从对象池中 borrowObject,如果需要创建对象,则会调用工厂的 makeObject 方法,
        // 该方法内部就会向 Server 进行 connect,并且发送 currentPoolKey.message 的报文
        channelFromPool = nettyClientKeyPool.borrowObject(currentPoolKey);
        channels.put(serverAddress, channelFromPool);
    } catch (Exception exx) {
        LOGGER.error("{} register RM failed.", FrameworkErrorCode.RegisterRM.getErrCode(), exx);
        throw new FrameworkException("can not register RM,err:" + exx.getMessage());
    }
    return channelFromPool;
}

Server Channel

而在 Server 这边,基本上有关 Channe 管理的核心逻辑都在 ChannelManager 中,那 Server 这边的 Channel 是怎么来的呢?还记得在 Client 那边向 Server 初次连接时,连接成功之后还会发送 TM 和 RM 的一个注册请求。

这里先来看看 Server 是怎么处理这些 registerRequest 的。

处理 Client 注册

与之相关的处理器是 RegRmProcessor 和 RegTmProcessor,在这两个处理器中,最核心的逻辑就是调用 ChannelManager 的 registerTMChannel 和 registerRMChannel 方法。

public static void registerTMChannel(RegisterTMRequest request, Channel channel)
        throws IncompatibleVersionException {
    // 构建 RpcContext,这个 RpcContext 就是维护了客户端连接信息上下文
    RpcContext rpcContext = buildChannelHolder(
            NettyPoolKey.TransactionRole.TMROLE,
            request.getVersion(),
            request.getApplicationId(),
            request.getTransactionServiceGroup(),
            null,
            channel);
    // 将 Channel 作为 key,rpcContext 作为 value,put 到 IDENTIFIED_CHANNELS 中
    rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
    // applicationId:clientIp
    String clientIdentified = rpcContext.getApplicationId()
            + Constants.CLIENT_ID_SPLIT_CHAR
            + ChannelUtil.getClientIpFromChannel(channel);
    // 将 Channel 信息存储到 TM_CHANNELS 中
    ConcurrentMap<Integer, RpcContext> clientIdentifiedMap =
            CollectionUtils.computeIfAbsent(TM_CHANNELS, clientIdentified, key -> new ConcurrentHashMap<>());
    rpcContext.holdInClientChannels(clientIdentifiedMap);
}

public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel)
        throws IncompatibleVersionException {
    Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());
    RpcContext rpcContext;
    if (!IDENTIFIED_CHANNELS.containsKey(channel)) {
        // 构建 RpcContext 和 IDENTIFIED_CHANNELS
        rpcContext = buildChannelHolder(
                NettyPoolKey.TransactionRole.RMROLE,
                resourceManagerRequest.getVersion(),
                resourceManagerRequest.getApplicationId(),
                resourceManagerRequest.getTransactionServiceGroup(),
                resourceManagerRequest.getResourceIds(),
                channel);
        rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
    } else {
        rpcContext = IDENTIFIED_CHANNELS.get(channel);
        rpcContext.addResources(dbkeySet);
    }
    if (dbkeySet == null || dbkeySet.isEmpty()) {
        return;
    }
    for (String resourceId : dbkeySet) {
        String clientIp;
        // 维护 RM_CHANNELS 信息
        ConcurrentMap<Integer, RpcContext> portMap = CollectionUtils.computeIfAbsent(
                        RM_CHANNELS, resourceId, key -> new ConcurrentHashMap<>())
                .computeIfAbsent(resourceManagerRequest.getApplicationId(), key -> new ConcurrentHashMap<>())
                .computeIfAbsent(
                        clientIp = ChannelUtil.getClientIpFromChannel(channel), key -> new ConcurrentHashMap<>());

        rpcContext.holdInResourceManagerChannels(resourceId, portMap);
        updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());
    }
}

这两个方法逻辑很简单,就是基于注册请求和 Channel 的信息构建 RpcContext,维护 Server 内的相关 Map 集合,IDENTIFIED_CHANNELS、RM_CHANNELS、TM_CHANNELS。

但是,说实话,这几个集合实在是嵌套的有点深,不知道能不能优化一下。

/**
 * Channel -> RpcContext
 */
private static final ConcurrentMap<Channel, RpcContext> IDENTIFIED_CHANNELS = new ConcurrentHashMap<>();

/**
 * resourceId -> applicationId -> ip -> port -> RpcContext
 */
//                               resourceId          applicationId               ip
private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String,
        //             port    RpcContext
        ConcurrentMap<Integer, RpcContext>>>> RM_CHANNELS = new ConcurrentHashMap<>();

/**
 * applicationId:clientIp -> port -> RpcContext
 */
private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS = new ConcurrentHashMap<>();

获取 Channel

在 Server 这边,获取 Channel 的逻辑,真的是超长,感兴趣自己看看吧,本质上就是从 map 中拿到一个有效的 Channel。

public static Channel getChannel(String resourceId, String clientId, boolean tryOtherApp) {
    Channel resultChannel = null;
    // 解析 ClientId,三部分组成:applicationId + clientIp + clientPort
    String[] clientIdInfo = parseClientId(clientId);
    if (clientIdInfo == null || clientIdInfo.length != 3) {
        throw new FrameworkException("Invalid Client ID: " + clientId);
    }
    if (StringUtils.isBlank(resourceId)) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("No channel is available, resourceId is null or empty");
        }
        return null;
    }
    // applicationId
    String targetApplicationId = clientIdInfo[0];
    // clientIp
    String targetIP = clientIdInfo[1];
    // clientPort
    int targetPort = Integer.parseInt(clientIdInfo[2]);
    // 下面就是不断取出内层的 ConcurrentHashMap
    ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap = RM_CHANNELS.get(resourceId);
    if (targetApplicationId == null || applicationIdMap == null || applicationIdMap.isEmpty()) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("No channel is available for resource[{}]", resourceId);
        }
        return null;
    }
    ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId);
    if (ipMap != null && !ipMap.isEmpty()) {
        // Firstly, try to find the original channel through which the branch was registered.
        // 端口 -> RpcContext
        ConcurrentMap<Integer, RpcContext> portMapOnTargetIP = ipMap.get(targetIP);
        /**
         * 在 targetIp 上拿 Channel
         */
        if (portMapOnTargetIP != null && !portMapOnTargetIP.isEmpty()) {
            RpcContext exactRpcContext = portMapOnTargetIP.get(targetPort);
            if (exactRpcContext != null) {
                Channel channel = exactRpcContext.getChannel();
                if (channel.isActive()) {
                    // Channel 有效,则跳过下面所有的 if 返回这个 Channel
                    resultChannel = channel;
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Just got exactly the one {} for {}", channel, clientId);
                    }
                } else {
                    if (portMapOnTargetIP.remove(targetPort, exactRpcContext)) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("Removed inactive {}", channel);
                        }
                    }
                }
            }
            // The original channel was broken, try another one.
            if (resultChannel == null) {
                // 尝试当前节点上的其他端口
                for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnTargetIPEntry : portMapOnTargetIP.entrySet()) {
                    Channel channel = portMapOnTargetIPEntry.getValue().getChannel();
                    if (channel.isActive()) {
                        resultChannel = channel;
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info(
                                    "Choose {} on the same IP[{}] as alternative of {}", channel, targetIP, clientId);
                        }
                        break;
                    } else {
                        if (portMapOnTargetIP.remove(portMapOnTargetIPEntry.getKey(),
                                portMapOnTargetIPEntry.getValue())) {
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info("Removed inactive {}", channel);
                            }
                        }
                    }
                }
            }
        }
        /**
         * 在 targetApplicationId 上拿 Channel
         */
        // No channel on the app node, try another one.
        if (resultChannel == null) {
            for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> ipMapEntry : ipMap.entrySet()) {
                if (ipMapEntry.getKey().equals(targetIP)) {
                    continue;
                }
                ConcurrentMap<Integer, RpcContext> portMapOnOtherIP = ipMapEntry.getValue();
                if (portMapOnOtherIP == null || portMapOnOtherIP.isEmpty()) {
                    continue;
                }
                for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnOtherIPEntry : portMapOnOtherIP.entrySet()) {
                    Channel channel = portMapOnOtherIPEntry.getValue().getChannel();
                    if (channel.isActive()) {
                        resultChannel = channel;
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("Choose {} on the same application[{}] as alternative of {}", channel, targetApplicationId, clientId);
                        }
                        break;
                    } else {
                        if (portMapOnOtherIP.remove(portMapOnOtherIPEntry.getKey(), portMapOnOtherIPEntry.getValue())) {
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info("Removed inactive {}", channel);
                            }
                        }
                    }
                }
                if (resultChannel != null) {
                    break;
                }
            }
        }
    }
    if (resultChannel == null && tryOtherApp) {
        // 尝试其他 applicationId
        resultChannel = tryOtherApp(applicationIdMap, targetApplicationId);
        if (resultChannel == null) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("No channel is available for resource[{}] as alternative of {}", resourceId, clientId);
            }
        } else {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Choose {} on the same resource[{}] as alternative of {}", resultChannel, resourceId, clientId);
            }
        }
    }
    return resultChannel;
}

private static Channel tryOtherApp(ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap, String myApplicationId) {
    Channel chosenChannel = null;
    for (ConcurrentMap.Entry<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMapEntry : applicationIdMap.entrySet()) {
        if (!StringUtils.isNullOrEmpty(myApplicationId) && applicationIdMapEntry.getKey().equals(myApplicationId)) {
            continue;
        }
        ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> targetIPMap = applicationIdMapEntry.getValue();
        if (targetIPMap == null || targetIPMap.isEmpty()) {
            continue;
        }
        for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> targetIPMapEntry : targetIPMap.entrySet()) {
            ConcurrentMap<Integer, RpcContext> portMap = targetIPMapEntry.getValue();
            if (portMap == null || portMap.isEmpty()) {
                continue;
            }
            for (ConcurrentMap.Entry<Integer, RpcContext> portMapEntry : portMap.entrySet()) {
                Channel channel = portMapEntry.getValue().getChannel();
                if (channel.isActive()) {
                    chosenChannel = channel;
                    break;
                } else {
                    if (portMap.remove(portMapEntry.getKey(), portMapEntry.getValue())) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("Removed inactive {}", channel);
                        }
                    }
                }
            }
            if (chosenChannel != null) {
                break;
            }
        }
        if (chosenChannel != null) {
            break;
        }
    }
    return chosenChannel;
}

一图总结

最后,再以一个时序图来总结一下 Channel 的管理过程。

如何设计协议

对于一个网络程序而言,通信协议是必不可少的,Seata 也不例外,这里我们就看看 Seata V1 版本的协议是如何实现的。

与之相关类主要有 ProtocolEncoderV1、ProtocolDecoderV1。

在回顾一下前面在 Server 端启动时向 Netty 加入的处理器有哪些?IdleStateHandler 和 ProtocolDetectHandler。

首先 IdleStateHandler 是 Netty 内置的一个 ChannelHandler,主要用于检测连接的空闲状态,它可以监控读空闲、写空闲和读写空闲,当检测到空闲时,就会触发 IdleStateEvent 事件,需要通过 userEventTriggered 处理。

这里的重点自然是 ProtocolDetectHandler,它是一个 ByteToMessageDecoder,decode 方法如下:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    for (ProtocolDetector protocolDetector : supportedProtocolDetectors) {
        if (protocolDetector.detect(in)) {
            ChannelHandler[] protocolHandlers = protocolDetector.getHandlers();
            ctx.pipeline().addLast(protocolHandlers);
            ctx.pipeline().remove(this);

            in.resetReaderIndex();
            return;
        }

        in.resetReaderIndex();
    }

    byte[] preface = new byte[in.readableBytes()];
    in.readBytes(preface);
    LOGGER.error(
            "Can not recognize protocol from remote {}, preface = {}",
            ctx.channel().remoteAddress(),
            preface);
    in.clear();
    ctx.close();
}

ProtocolDetectHandler 在第一次对网络包进行解码时,会遍历 supportedProtocolDetectors 找到能够识别当前网络包的 ProtocolDetector,这里识别的方式是基于 ProtocolDetector.detect 方法实现的。

那么目前支持的协议有 Seata 协议、Http1.1 协议、Http2 协议,识别成功之后就会将对应的 protocolHandlers 加入到当前 Channel 的 pipeline 中,同时从 pipeline 中移除 ProtocolDetectHandler。

我们下面重点关注 SeataDetector,其返回的 ChannelHandler 为 MultiProtocolDecoder,这又是一个 ByteToMessageDecoder,其 decode 方法如下:

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    ByteBuf frame;
    Object decoded;
    byte version;
    try {
        if (isV0(in)) {
            decoded = in;
            version = ProtocolConstants.VERSION_0;
        } else {
            // 父类是 LengthFieldBasedFrameDecoder
            decoded = super.decode(ctx, in);
            version = decideVersion(decoded);
        }

        if (decoded instanceof ByteBuf) {
            frame = (ByteBuf) decoded;
            // 通过 MultiProtocolDecoder 进行多版本协议识别
            // 通过 version 选择对应的编解码器
            ProtocolDecoder decoder = protocolDecoderMap.get(version);
            if (decoder == null) {
                LOGGER.error(
                        "Decoder not found, version={}, use current version({})",
                        version,
                        ProtocolConstants.VERSION);
                decoder = protocolDecoderMap.get(ProtocolConstants.VERSION);
            }
            ProtocolEncoder encoder = protocolEncoderMap.get(version);
            if (encoder == null) {
                LOGGER.error(
                        "Encoder not found, version: {}, use current version({})",
                        version,
                        ProtocolConstants.VERSION);
                encoder = protocolEncoderMap.get(ProtocolConstants.VERSION);
            }
            try {
                if (decoder == null || encoder == null) {
                    throw new UnsupportedOperationException("Unsupported version: " + version);
                }
                return decoder.decodeFrame(frame);
            } finally {
                if (version != ProtocolConstants.VERSION_0) {
                    frame.release();
                }
                // 添加特定版本的编解码器
                ctx.pipeline().addLast((ChannelHandler) decoder);
                ctx.pipeline().addLast((ChannelHandler) encoder);
                if (channelHandlers != null) {
                    ctx.pipeline().addLast(channelHandlers);
                }
                // 移除当前 MultiProtocolDecoder
                ctx.pipeline().remove(this);
            }
        }
    } catch (Exception exx) {
        LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
        throw new DecodeException(exx);
    }
    return decoded;
}

V1 版本协议

Seata 的协议设计是比较周全并且通用的,也是主流的解决粘包半包问题的解决方案,即消息长度 + 消息内容。

协议的格式如下:

可以看到,包括魔数、协议版本号、长度域、头长度、报文类型、序列化算法、压缩算法、请求 id、可选的 map 扩展以及报文体。

如何进行编解码

Seata 解码器使用了 Netty 内置的 LengthFieldBasedFrameDecoder,不熟悉的可以看看。

不过编解码并不难,所以简单给出代码,不过多解释。

package org.apache.seata.core.rpc.netty.v1;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.seata.core.rpc.netty.ProtocolEncoder;
import org.apache.seata.core.serializer.Serializer;
import org.apache.seata.core.compressor.Compressor;
import org.apache.seata.core.compressor.CompressorFactory;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.serializer.SerializerServiceLoader;
import org.apache.seata.core.serializer.SerializerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
 * <pre>
 * 0     1     2     3     4     5     6     7     8     9    10     11    12    13    14    15    16
 * +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
 * |   magic   |proto|     full length       |    head   | Msg |Seria|Compr|      RequestId        |
 * |   code    |versi|     (head+body)       |   length  |Type |lizer|ess  |                       |
 * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
 * |                                   Head Map [Optional]                                         |
 * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
 * |                                         body                                                  |
 * +-----------------------------------------------------------------------------------------------+
 * </pre>

 * <p>
 * <li>Full Length: include all data </li>

 * <li>Head Length: include head data from magic code to head map. </li>

 * <li>Body Length: Full Length - Head Length</li>

 * </p>

 */
public class ProtocolEncoderV1 extends MessageToByteEncoder implements ProtocolEncoder {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolEncoderV1.class);

    public void encode(RpcMessage message, ByteBuf out) {
        try {
            ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
            rpcMessage.rpcMsgToProtocolMsg(message);
            int fullLength = ProtocolConstants.V1_HEAD_LENGTH;
            int headLength = ProtocolConstants.V1_HEAD_LENGTH;
            byte messageType = rpcMessage.getMessageType();
            out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
            out.writeByte(ProtocolConstants.VERSION_1);
            // full Length(4B) and head length(2B) will fix in the end.
            out.writerIndex(out.writerIndex() + 6); // 这里跳过 full length 和 head length 的位置,最后在补
            out.writeByte(messageType);
            out.writeByte(rpcMessage.getCodec());
            out.writeByte(rpcMessage.getCompressor());
            out.writeInt(rpcMessage.getId());
            // direct write head with zero-copy
            Map<String, String> headMap = rpcMessage.getHeadMap();
            if (headMap != null && !headMap.isEmpty()) {
                int headMapBytesLength = HeadMapSerializer.getInstance().encode(headMap, out);
                headLength += headMapBytesLength;
                fullLength += headMapBytesLength;
            }
            byte[] bodyBytes = null;
            // heartbeat don't have body
            if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
                Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), ProtocolConstants.VERSION_1);
                bodyBytes = serializer.serialize(rpcMessage.getBody());
                Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
                bodyBytes = compressor.compress(bodyBytes);
                fullLength += bodyBytes.length;
            }
            if (bodyBytes != null) {
                out.writeBytes(bodyBytes);
            }
            // fix fullLength and headLength
            int writeIndex = out.writerIndex();
            // skip magic code(2B) + version(1B)
            out.writerIndex(writeIndex - fullLength + 3);
            out.writeInt(fullLength);
            out.writeShort(headLength);
            out.writerIndex(writeIndex);
        } catch (Throwable e) {
            LOGGER.error("Encode request error!", e);
            throw e;
        }
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        try {
            if (msg instanceof RpcMessage) {
                this.encode((RpcMessage) msg, out);
            } else {
                throw new UnsupportedOperationException("Not support this class:" + msg.getClass());
            }
        } catch (Throwable e) {
            LOGGER.error("Encode request error!", e);
        }
    }
}
package org.apache.seata.core.rpc.netty.v1;

import java.util.List;
import java.util.Map;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.seata.core.compressor.Compressor;
import org.apache.seata.core.compressor.CompressorFactory;
import org.apache.seata.core.exception.DecodeException;
import org.apache.seata.core.protocol.HeartbeatMessage;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.rpc.netty.ProtocolDecoder;
import org.apache.seata.core.serializer.Serializer;
import org.apache.seata.core.serializer.SerializerServiceLoader;
import org.apache.seata.core.serializer.SerializerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <pre>
 * 0     1     2     3     4     5     6     7     8     9    10     11    12    13    14    15    16
 * +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
 * |   magic   |proto|     full length       |    head   | Msg |Seria|Compr|      RequestId        |
 * |   code    |versi|     (head+body)       |   length  |Type |lizer|ess  |                       |
 * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
 * |                                   Head Map [Optional]                                         |
 * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
 * |                                         body                                                  |
 * +-----------------------------------------------------------------------------------------------+
 * </pre>

 * <p>
 * <li>Full Length: include all data </li>

 * <li>Head Length: include head data from magic code to head map. </li>

 * <li>Body Length: Full Length - Head Length</li>

 * </p>

 */
public class ProtocolDecoderV1 extends LengthFieldBasedFrameDecoder implements ProtocolDecoder {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoderV1.class);

    private final List<SerializerType> supportDeSerializerTypes;

    public ProtocolDecoderV1() {
        /**
         *  int maxFrameLength,
         *  int lengthFieldOffset,  魔术 2B、版本号 1B 所以长度偏移 3B
         *  int lengthFieldLength,  FullLength is int(4B). so values is 4
         *  int lengthAdjustment,   FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7
         *  int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0
         */
        super(ProtocolConstants.MAX_FRAME_LENGTH, 3, 4, -7, 0);
        supportDeSerializerTypes = SerializerServiceLoader.getSupportedSerializers();
        if (supportDeSerializerTypes.isEmpty()) {
            throw new IllegalArgumentException("No serializer found");
        }
    }

    @Override
    public RpcMessage decodeFrame(ByteBuf frame) {
        byte b0 = frame.readByte();
        byte b1 = frame.readByte();
        if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0 || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
            throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1);
        }
        byte version = frame.readByte();
        int fullLength = frame.readInt();
        short headLength = frame.readShort();
        byte messageType = frame.readByte();
        byte codecType = frame.readByte();
        byte compressorType = frame.readByte();
        int requestId = frame.readInt();
        ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
        rpcMessage.setCodec(codecType);
        rpcMessage.setId(requestId);
        rpcMessage.setCompressor(compressorType);
        rpcMessage.setMessageType(messageType);
        // direct read head with zero-copy
        int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH;
        if (headMapLength > 0) {
            Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength);
            rpcMessage.getHeadMap().putAll(map);
        }
        // read body
        if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) {
            rpcMessage.setBody(HeartbeatMessage.PING);
        } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
            rpcMessage.setBody(HeartbeatMessage.PONG);
        } else {
            int bodyLength = fullLength - headLength;
            if (bodyLength > 0) {
                byte[] bs = new byte[bodyLength];
                frame.readBytes(bs);
                Compressor compressor = CompressorFactory.getCompressor(compressorType);
                bs = compressor.decompress(bs);
                SerializerType protocolType = SerializerType.getByCode(rpcMessage.getCodec());
                if (this.supportDeSerializerTypes.contains(protocolType)) {
                    Serializer serializer = SerializerServiceLoader.load(protocolType, ProtocolConstants.VERSION_1);
                    rpcMessage.setBody(serializer.deserialize(bs));
                } else {
                    throw new IllegalArgumentException("SerializerType not match");
                }
            }
        }
        return rpcMessage.protocolMsgToRpcMsg();
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        Object decoded;
        try {
            decoded = super.decode(ctx, in);
            if (decoded instanceof ByteBuf) {
                ByteBuf frame = (ByteBuf) decoded;
                try {
                    return decodeFrame(frame);
                } finally {
                    frame.release();
                }
            }
        } catch (Exception exx) {
            LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
            throw new DecodeException(exx);
        }
        return decoded;
    }
}