Seata源码—4.全局事务拦截与开启事务处理
大纲
1.Seata Server的启动入口的源码
2.Seata Server的网络服务器启动的源码
3.全局事务拦截器的核心变量
4.全局事务拦截器的初始化源码
5.全局事务拦截器的AOP切面拦截方法
6.通过全局事务执行模版来执行全局事务
7.获取xid构建全局事务实例与全局事务的传播级别
8.全局事务执行模版根据传播级别来执行业务
9.全局事务执行模版开启事务+提交事务+回滚事务
10.Seata Server集群的负载均衡机制实现源码
11.Seata Client向Seata Server发送请求的源码
12.Client将RpcMessage对象编码成字节数组
13.Server将字节数组解码成RpcMessage对象
14.Server处理已解码的RpcMessage对象的流程
15.Seata Server开启全局事务的流程源码
1.Seata Server的启动入口的源码
代码位于seata-server模块下:
@SpringBootApplication(scanBasePackages = {"io.seata"}) public class ServerApplication { public static void main(String[] args) throws IOException { //run the spring-boot application SpringApplication.run(ServerApplication.class, args); } } @Component public class ServerRunner implements CommandLineRunner, DisposableBean { private static final Logger LOGGER = LoggerFactory.getLogger(ServerRunner.class); private boolean started = Boolean.FALSE; private static final List<Disposable> DISPOSABLE_LIST = new CopyOnWriteArrayList<>(); public static void addDisposable(Disposable disposable) { DISPOSABLE_LIST.add(disposable); } @Override public void run(String... args) { try { long start = System.currentTimeMillis(); Server.start(args); started = true; long cost = System.currentTimeMillis() - start; LOGGER.info("seata server started in {} millSeconds", cost); } catch (Throwable e) { started = Boolean.FALSE; LOGGER.error("seata server start error: {} ", e.getMessage(), e); System.exit(-1); } } public boolean started() { return started; } @Override public void destroy() throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug("destoryAll starting"); } for (Disposable disposable : DISPOSABLE_LIST) { disposable.destroy(); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("destoryAll finish"); } } } public class Server { //The entry point of application. public static void start(String[] args) { //create logger final Logger logger = LoggerFactory.getLogger(Server.class); //initialize the parameter parser //Note that the parameter parser should always be the first line to execute. //Because, here we need to parse the parameters needed for startup. ParameterParser parameterParser = new ParameterParser(args); //initialize the metrics //Seata Server是支持metric指标采集功能的 MetricsManager.get().init(); System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode()); //Seata Server里的Netty服务器的IO线程池,最小50个,最大500个 ThreadPoolExecutor workingThreads = new ThreadPoolExecutor( NettyServerConfig.getMinServerPoolSize(), NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()), new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy() ); //创建一个Netty网络通信服务器 NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads); UUIDGenerator.init(parameterParser.getServerNode()); //log store mode : file, db, redis SessionHolder.init(parameterParser.getSessionStoreMode()); LockerManagerFactory.init(parameterParser.getLockStoreMode()); //启动定时调度线程 DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer); coordinator.init(); nettyRemotingServer.setHandler(coordinator); //let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028 ServerRunner.addDisposable(coordinator); //127.0.0.1 and 0.0.0.0 are not valid here. if (NetUtil.isValidIp(parameterParser.getHost(), false)) { XID.setIpAddress(parameterParser.getHost()); } else { String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS); if (StringUtils.isNotBlank(preferredNetworks)) { XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR))); } else { XID.setIpAddress(NetUtil.getLocalIp()); } } //初始化Netty服务器 nettyRemotingServer.init(); } }
2.Seata Server的网络服务器启动的源码
创建和启动Seata的网络服务器:
public class NettyRemotingServer extends AbstractNettyRemotingServer { ... //Instantiates a new Rpc remoting server. 创建Seata Server public NettyRemotingServer(ThreadPoolExecutor messageExecutor) { super(messageExecutor, new NettyServerConfig()); } //启动Seata Server @Override public void init() { //registry processor registerProcessor(); if (initialized.compareAndSet(false, true)) { super.init(); } } private void registerProcessor() { //1.registry on request message processor ServerOnRequestProcessor onRequestProcessor = new ServerOnRequestProcessor(this, getHandler()); ShutdownHook.getInstance().addDisposable(onRequestProcessor); super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor); //2.registry on response message processor ServerOnResponseProcessor onResponseProcessor = new ServerOnResponseProcessor(getHandler(), getFutures()); super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor); super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor); //3.registry rm message processor RegRmProcessor regRmProcessor = new RegRmProcessor(this); super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor); //4.registry tm message processor RegTmProcessor regTmProcessor = new RegTmProcessor(this); super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null); //5.registry heartbeat message processor ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this); super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null); } ... } public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer { private final NettyServerBootstrap serverBootstrap; ... public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) { super(messageExecutor); //创建Netty Server serverBootstrap = new NettyServerBootstrap(nettyServerConfig); serverBootstrap.setChannelHandlers(new ServerHandler()); } @Override public void init() { super.init(); //启动Netty Server serverBootstrap.start(); } ... } public abstract class AbstractNettyRemoting implements Disposable { //The Timer executor. 由单个线程进行调度的线程池 protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("timeoutChecker", 1, true)); //The Message executor. protected final ThreadPoolExecutor messageExecutor; ... public void init() { //启动一个定时任务,每隔3秒检查发送的请求是否响应超时 timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) { MessageFuture future = entry.getValue(); if (future.isTimeout()) { futures.remove(entry.getKey()); RpcMessage rpcMessage = future.getRequestMessage(); future.setResultMessage(new TimeoutException(String.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString()))); if (LOGGER.isDebugEnabled()) { LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody()); } } } nowMills = System.currentTimeMillis(); } }, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS); } ... } public class NettyServerBootstrap implements RemotingBootstrap { private final NettyServerConfig nettyServerConfig; private final EventLoopGroup eventLoopGroupBoss; private final EventLoopGroup eventLoopGroupWorker; private final ServerBootstrap serverBootstrap = new ServerBootstrap(); private ChannelHandler[] channelHandlers; private int listenPort; private final AtomicBoolean initialized = new AtomicBoolean(false); public NettyServerBootstrap(NettyServerConfig nettyServerConfig) { this.nettyServerConfig = nettyServerConfig; if (NettyServerConfig.enableEpoll()) { this.eventLoopGroupBoss = new EpollEventLoopGroup(nettyServerConfig.getBossThreadSize(), new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize())); this.eventLoopGroupWorker = new EpollEventLoopGroup(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads())); } else { this.eventLoopGroupBoss = new NioEventLoopGroup(nettyServerConfig.getBossThreadSize(), new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize())); this.eventLoopGroupWorker = new NioEventLoopGroup(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads())); } } //Sets channel handlers. protected void setChannelHandlers(final ChannelHandler... handlers) { if (handlers != null) { channelHandlers = handlers; } } @Override public void start() { this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker) .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ) .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize()) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize()) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark())) .localAddress(new InetSocketAddress(getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) .addLast(new ProtocolV1Decoder()) .addLast(new ProtocolV1Encoder()); if (channelHandlers != null) { addChannelPipelineLast(ch, channelHandlers); } } } ); try { this.serverBootstrap.bind(getListenPort()).sync(); XID.setPort(getListenPort()); LOGGER.info("Server started, service listen port: {}", getListenPort()); RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort())); initialized.set(true); } catch (SocketException se) { throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se); } catch (Exception exx) { throw new RuntimeException("Server start failed", exx); } } ... } public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable { private RemotingServer remotingServer; private final DefaultCore core; private static volatile DefaultCoordinator instance; private final ScheduledThreadPoolExecutor retryRollbacking = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_ROLLBACKING, 1)); private final ScheduledThreadPoolExecutor retryCommitting = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_COMMITTING, 1)); private final ScheduledThreadPoolExecutor asyncCommitting = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(ASYNC_COMMITTING, 1)); private final ScheduledThreadPoolExecutor timeoutCheck = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(TX_TIMEOUT_CHECK, 1)); private final ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(UNDOLOG_DELETE, 1)); ... public static DefaultCoordinator getInstance(RemotingServer remotingServer) { if (null == instance) { synchronized (DefaultCoordinator.class) { if (null == instance) { instance = new DefaultCoordinator(remotingServer); } } } return instance; } private DefaultCoordinator(RemotingServer remotingServer) { if (remotingServer == null) { throw new IllegalArgumentException("RemotingServer not allowed be null."); } this.remotingServer = remotingServer; this.core = new DefaultCore(remotingServer); } public void init() { retryRollbacking.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS); retryCommitting.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS); asyncCommitting.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS); timeoutCheck.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS); undoLogDelete.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete), UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS); } ... }
Seata Client的ClientHandler和Seata Server的ServerHandler:
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer { ... @ChannelHandler.Sharable class ServerHandler extends ChannelDuplexHandler { //Channel read. @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof RpcMessage)) { return; } //此时会把解码完毕的RpcMessage来进行处理 processMessage(ctx, (RpcMessage) msg); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) { synchronized (lock) { if (ctx.channel().isWritable()) { lock.notifyAll(); } } ctx.fireChannelWritabilityChanged(); } //Channel inactive. @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { debugLog("inactive:{}", ctx); if (messageExecutor.isShutdown()) { return; } handleDisconnect(ctx); super.channelInactive(ctx); } private void handleDisconnect(ChannelHandlerContext ctx) { final String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress()); RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel()); if (LOGGER.isInfoEnabled()) { LOGGER.info(ipAndPort + " to server channel inactive."); } if (rpcContext != null && rpcContext.getClientRole() != null) { rpcContext.release(); if (LOGGER.isInfoEnabled()) { LOGGER.info("remove channel:" + ctx.channel() + "context:" + rpcContext); } } else { if (LOGGER.isInfoEnabled()) { LOGGER.info("remove unused channel:" + ctx.channel()); } } } //Exception caught. @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { try { if (cause instanceof DecoderException && null == ChannelManager.getContextFromIdentified(ctx.channel())) { return; } LOGGER.error("exceptionCaught:{}, channel:{}", cause.getMessage(), ctx.channel()); super.exceptionCaught(ctx, cause); } finally { ChannelManager.releaseRpcContext(ctx.channel()); } } //User event triggered. @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { debugLog("idle:{}", evt); IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.READER_IDLE) { if (LOGGER.isInfoEnabled()) { LOGGER.info("channel:" + ctx.channel() + " read idle."); } handleDisconnect(ctx); try { closeChannelHandlerContext(ctx); } catch (Exception e) { LOGGER.error(e.getMessage()); } } } } @Override public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { if (LOGGER.isInfoEnabled()) { LOGGER.info(ctx + " will closed"); } super.close(ctx, future); } } ... } public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient { ... @Sharable class ClientHandler extends ChannelDuplexHandler { @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof RpcMessage)) { return; } processMessage(ctx, (RpcMessage) msg); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) { synchronized (lock) { if (ctx.channel().isWritable()) { lock.notifyAll(); } } ctx.fireChannelWritabilityChanged(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (messageExecutor.isShutdown()) { return; } if (LOGGER.isInfoEnabled()) { LOGGER.info("channel inactive: {}", ctx.channel()); } clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress())); super.channelInactive(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.READER_IDLE) { if (LOGGER.isInfoEnabled()) { LOGGER.info("channel {} read idle.", ctx.channel()); } try { String serverAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress()); clientChannelManager.invalidateObject(serverAddress, ctx.channel()); } catch (Exception exx) { LOGGER.error(exx.getMessage()); } finally { clientChannelManager.releaseChannel(ctx.channel(), getAddressFromContext(ctx)); } } if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) { try { if (LOGGER.isDebugEnabled()) { LOGGER.debug("will send ping msg,channel {}", ctx.channel()); } AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), HeartbeatMessage.PING); } catch (Throwable throwable) { LOGGER.error("send request error: {}", throwable.getMessage(), throwable); } } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(), NetUtil.toStringAddress(ctx.channel().remoteAddress()) + "connect exception. " + cause.getMessage(), cause); clientChannelManager.releaseChannel(ctx.channel(), getAddressFromChannel(ctx.channel())); if (LOGGER.isInfoEnabled()) { LOGGER.info("remove exception rm channel:{}", ctx.channel()); } super.exceptionCaught(ctx, cause); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { if (LOGGER.isInfoEnabled()) { LOGGER.info(ctx + " will closed"); } super.close(ctx, future); } } ... }
3.全局事务拦截器的核心变量
全局事务注解扫描器GlobalTransactionScanner的wrapIfNecessary()方法,如果发现Spring的Bean含有Seata的注解,就会为该Bean创建动态代理。
比如Spring的Bean添加了@GlobalTransactional注解,那么GlobalTransactionScanner类为这个Bean创建动态代理时,会使用全局事务拦截器GlobalTransactionalInterceptor来进行创建。
这样后续调用到这个Spring Bean的方法时,就会先调用GlobalTransactionInterceptor拦截器。
GlobalTransactionalInterceptor这个全局事务注解拦截器的核心变量如下:
一.TransactionalTemplate全局事务执行模版
二.GlobalLockTemplate全局锁管理模版
三.FailureHandler全局事务异常处理器
//全局事务注解拦截器 public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class); //默认的全局事务异常处理组件 //如果全局事务出现开启、回滚、提交、重试异常时,就可以回调这个DefaultFailureHandlerImpl进行异常处理 private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl(); //全局事务执行模版,用来管理全局事务的执行 private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate(); //全局锁执行模版,用来实现不同全局事务间的写隔离 private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate(); //真正的全局事务异常处理组件 private final FailureHandler failureHandler; //是否禁用全局事务 private volatile boolean disable; //全局事务拦截器的顺序 private int order; //AOP切面全局事务核心配置,来自于全局事务注解 protected AspectTransactional aspectTransactional; //全局事务降级检查的时间周期 private static int degradeCheckPeriod; //是否开启全局事务的降级检查 private static volatile boolean degradeCheck; //降级检查允许时间 private static int degradeCheckAllowTimes; //降级次数 private static volatile Integer degradeNum = 0; //reach达标次数 private static volatile Integer reachNum = 0; //Guava提供的事件总线 private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true); //定时调度线程池 private static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true)); //默认的全局事务超时时间 private static int defaultGlobalTransactionTimeout = 0; ... }
4.全局事务拦截器的初始化源码
全局事务拦截器GlobalTransactionalInterceptor进行初始化时,会设置全局事务的异常处理组件,设置默认的全局事务超时时间为60秒。
//全局事务注解扫描器 public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean { ... //Spring AOP里对方法进行拦截的拦截器 private MethodInterceptor interceptor; @Override protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { if (!doCheckers(bean, beanName)) { return bean; } try { synchronized (PROXYED_SET) { if (PROXYED_SET.contains(beanName)) { return bean; } interceptor = null; if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { //init tcc fence clean task if enable useTccFence TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext); //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor); } else { //获取目标class的接口 Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); //existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解 if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) { return bean; } if (globalTransactionalInterceptor == null) { //创建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器 globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook); ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor); } interceptor = globalTransactionalInterceptor; } LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName()); if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理 //接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理 //这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器 bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else { AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null)); int pos; for (Advisor avr : advisor) { // Find the position based on the advisor's order, and add to advisors by pos pos = findAddSeataAdvisorPosition(advised, avr); advised.addAdvisor(pos, avr); } } PROXYED_SET.add(beanName); return bean; } } catch (Exception exx) { throw new RuntimeException(exx); } } ... } //全局事务拦截器 public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor { //真正的全局事务异常处理组件 private final FailureHandler failureHandler; //是否禁用全局事务 private volatile boolean disable; //全局事务拦截器的顺序 private int order; //是否开启全局事务的降级检查 private static volatile boolean degradeCheck; //全局事务降级检查的时间周期 private static int degradeCheckPeriod; //降级检查允许时间 private static int degradeCheckAllowTimes; //默认的全局事务超时时间 private static int defaultGlobalTransactionTimeout = 0; //Guava提供的事件总线 private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true); ... //Instantiates a new Global transactional interceptor. //实例化一个新的全局事务拦截器 public GlobalTransactionalInterceptor(FailureHandler failureHandler) { this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler; this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION); this.order = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER); degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, DEFAULT_TM_DEGRADE_CHECK); if (degradeCheck) { ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this); degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD); degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES); EVENT_BUS.register(this); if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) { startDegradeCheck(); } } this.initDefaultGlobalTransactionTimeout(); } //初始化默认的全局事务超时时间,60s=1min private void initDefaultGlobalTransactionTimeout() { if (GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout <= 0) { int defaultGlobalTransactionTimeout; try { defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT); } catch (Exception e) { LOGGER.error("Illegal global transaction timeout value: " + e.getMessage()); defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT; } if (defaultGlobalTransactionTimeout <= 0) { LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'", defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT); defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT; } GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout; } } ... }
5.全局事务拦截器的AOP切面拦截方法
如果调用添加了@GlobalTransactional注解的方法,就会执行GlobalTransactionalInterceptor的invoke()方法。
//全局事务拦截器 public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor { //是否禁用全局事务 private volatile boolean disable; //是否开启全局事务的降级检查 private static volatile boolean degradeCheck; //降级次数 private static volatile Integer degradeNum = 0; //降级检查允许时间 private static int degradeCheckAllowTimes; //AOP切面全局事务核心配置,来自于全局事务注解 protected AspectTransactional aspectTransactional; ... //如果调用添加了@GlobalTransactional注解的方法,就会执行如下invoke()方法 @Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { //methodInvocation是一次方法调用 //通过methodInvocation的getThis()方法可以获取到被调用方法的对象 //通过AopUtils.getTargetClass()方法可以获取到对象对应的Class Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null; //通过反射,获取到目标class中被调用的method方法 Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); //如果调用的目标method不为null if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { //尝试寻找桥接方法bridgeMethod final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); //通过反射,获取被调用的目标方法的@GlobalTransactional注解 final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); //通过反射,获取被调用目标方法的@GlobalLock注解 final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); //如果禁用了全局事务,或者开启了降级检查,同时降级次数大于了降级检查允许次数,那么localDisable就为true //localDisable为true则表示全局事务被禁用了,此时就不可以开启全局事务了 boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes); //如果全局事务没有禁用 if (!localDisable) { //全局事务注解不为空,或者是AOP切面全局事务核心配置不为空 if (globalTransactionalAnnotation != null || this.aspectTransactional != null) { AspectTransactional transactional; if (globalTransactionalAnnotation != null) { //创建全局事务AOP切面的核心配置AspectTransactional,配置数据会从全局事务注解里提取出来 transactional = new AspectTransactional( globalTransactionalAnnotation.timeoutMills(), globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.noRollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.propagation(), globalTransactionalAnnotation.lockRetryInterval(), globalTransactionalAnnotation.lockRetryTimes() ); } else { transactional = this.aspectTransactional; } //真正处理全局事务的入口 return handleGlobalTransaction(methodInvocation, transactional); } else if (globalLockAnnotation != null) { return handleGlobalLock(methodInvocation, globalLockAnnotation); } } } //直接运行目标方法 return methodInvocation.proceed(); } //获取注解 public <T extends Annotation> T getAnnotation(Method method, Class<?> targetClass, Class<T> annotationClass) { return Optional.ofNullable(method).map(m -> m.getAnnotation(annotationClass)).orElse(Optional.ofNullable(targetClass).map(t -> t.getAnnotation(annotationClass)).orElse(null)); } ... }
6.通过全局事务执行模版来执行全局事务
GlobalTransactionInterceptor全局事务拦截器中会有一个全局事务执行模版的实例变量,这个全局事务执行模版TransactionalTemplate实例就是用来执行全局事务的。执行全局事务时,就会调用TransactionalTemplate的execute()方法。
//全局事务拦截器 public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor { //全局事务执行模版,用来管理全局事务的执行 private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate(); ... //真正进行全局事务的处理 Object handleGlobalTransaction(final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable { boolean succeed = true; try { //基于全局事务执行模版TransactionalTemplate,来执行全局事务 return transactionalTemplate.execute(new TransactionalExecutor() { //真正执行目标方法 @Override public Object execute() throws Throwable { return methodInvocation.proceed(); } //根据全局事务注解可以获取到一个name,可以对目标方法进行格式化 public String name() { String name = aspectTransactional.getName(); if (!StringUtils.isNullOrEmpty(name)) { return name; } return formatMethod(methodInvocation.getMethod()); } //获取全局事务的信息 @Override public TransactionInfo getTransactionInfo() { //reset the value of timeout int timeout = aspectTransactional.getTimeoutMills(); if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) { timeout = defaultGlobalTransactionTimeout; } //封装一个全局事务信息实例TransactionInfo TransactionInfo transactionInfo = new TransactionInfo(); transactionInfo.setTimeOut(timeout);//全局事务超时时间 transactionInfo.setName(name());//全局事务名称 transactionInfo.setPropagation(aspectTransactional.getPropagation());//全局事务传播级别 transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());//全局锁获取重试间隔 transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());//全局锁重试次数 //全局事务回滚规则 Set<RollbackRule> rollbackRules = new LinkedHashSet<>(); for (Class<?> rbRule : aspectTransactional.getRollbackFor()) { rollbackRules.add(new RollbackRule(rbRule)); } for (String rbRule : aspectTransactional.getRollbackForClassName()) { rollbackRules.add(new RollbackRule(rbRule)); } for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) { rollbackRules.add(new NoRollbackRule(rbRule)); } for (String rbRule : aspectTransactional.getNoRollbackForClassName()) { rollbackRules.add(new NoRollbackRule(rbRule)); } transactionInfo.setRollbackRules(rollbackRules); return transactionInfo; } }); } catch (TransactionalExecutor.ExecutionException e) { ... } finally { if (degradeCheck) { EVENT_BUS.post(new DegradeCheckEvent(succeed)); } } } ... }
7.获取xid构建全局事务实例与全局事务的传播级别
(1)从RootContext获取xid来构建全局事务实例
(2)全局事务的传播级别
(1)从RootContext获取xid来构建全局事务实例
RootContext会通过SPI机制加载ContextCore实例,比如FastThreadLocalContextCore实例、ThreadLocalContextCore实例。
而xid又会通过RootContext的bind()方法被put()到ContextCore实例中,也就是xid会被put()到ThreadLocal<Map<String, Object>>中,或者被put()到FastThreadLocal<Map<String, Object>>中。因此,通过RootContext的get()方法可以从ContextCore实例中获取当前线程的xid。
//全局事务执行模版 public class TransactionalTemplate { private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class); //Execute object. public Object execute(TransactionalExecutor business) throws Throwable { //1.Get transactionInfo TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } //1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'. GlobalTransaction tx = GlobalTransactionContext.getCurrent(); //1.2 Handle the transaction propagation. Propagation propagation = txInfo.getPropagation(); ... } ... } //全局事务上下文 public class GlobalTransactionContext { private GlobalTransactionContext() { } //Get GlobalTransaction instance bind on current thread. public static GlobalTransaction getCurrent() { String xid = RootContext.getXID(); if (xid == null) { return null; } return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant); } ... } public class RootContext { //通过SPI机制加载ContextCore实例,比如FastThreadLocalContextCore、ThreadLocalContextCore //所以可以认为,xid是存放在ThreadLocal<Map<String, Object>>中的 private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load(); ... private RootContext() { } //Gets xid. @Nullable public static String getXID() { return (String) CONTEXT_HOLDER.get(KEY_XID); } //Bind xid. public static void bind(@Nonnull String xid) { if (StringUtils.isBlank(xid)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("xid is blank, switch to unbind operation!"); } unbind(); } else { MDC.put(MDC_KEY_XID, xid); if (LOGGER.isDebugEnabled()) { LOGGER.debug("bind {}", xid); } CONTEXT_HOLDER.put(KEY_XID, xid); } } ... }
(2)全局事务的传播级别
全局事务的传播级别分别有:REQUIRED、REQUIRES_NEW、NOT_SUPPORTED、NEVER、SUPPORTS、MANDATORY。
//Propagation level of global transactions. //全局事务的传播级别 public enum Propagation { //如果全局事务已经存在,此时会直接在当前的全局事务里继续去运行下去,后续运行的都是全局事务里的分支事务 //如果全局事务此时还不存在,就会开启一个新的全局事务来运行 //这种全局事务传播级别,就是REQUIRED //The logic is similar to the following code: // if (tx == null) { // try { // tx = beginNewTransaction(); // begin new transaction, is not existing // Object rs = business.execute(); // execute with new transaction // commitTransaction(tx); // return rs; // } catch (Exception ex) { // rollbackTransaction(tx); // throw ex; // } // } else { // return business.execute(); // execute with current transaction // } REQUIRED, //如果全局事务已经存在,则先暂停该事务,然后开启一个新的全局事务来执行业务 //The logic is similar to the following code: // try { // if (tx != null) { // suspendedResource = suspendTransaction(tx); // suspend current transaction // } // try { // tx = beginNewTransaction(); // begin new transaction // Object rs = business.execute(); // execute with new transaction // commitTransaction(tx); // return rs; // } catch (Exception ex) { // rollbackTransaction(tx); // throw ex; // } // } finally { // if (suspendedResource != null) { // resumeTransaction(suspendedResource); // resume transaction // } // } REQUIRES_NEW, //如果全局事务已经存在,则先暂停该事务,然后不要使用全局事务来执行业务 //The logic is similar to the following code: // try { // if (tx != null) { // suspendedResource = suspendTransaction(tx); // suspend current transaction // } // return business.execute(); // execute without transaction // } finally { // if (suspendedResource != null) { // resumeTransaction(suspendedResource); // resume transaction // } // } NOT_SUPPORTED, //如果全局事务不存在,则不要使用全局事务来执行业务 //如果全局事务存在,则使用全局事务来执行业务 //The logic is similar to the following code: // if (tx != null) { // return business.execute(); // execute with current transaction // } else { // return business.execute(); // execute without transaction // } SUPPORTS, //如果全局事务存在,则抛异常 //如果全局事务不存在,则执行业务 //The logic is similar to the following code: // if (tx != null) { // throw new TransactionException("existing transaction"); // } // return business.execute(); // execute without transaction NEVER, //如果全局事务不存在,则抛异常 //如果全局事务存在,则使用全局事务去执行业务 //The logic is similar to the following code: // if (tx == null) { // throw new TransactionException("not existing transaction"); // } // return business.execute(); // execute with current transaction MANDATORY }
8.全局事务执行模版根据传播级别来执行业务
//全局事务执行模版 public class TransactionalTemplate { ... //Execute object. //通过全局事务生命周期管理组件执行全局事务 public Object execute(TransactionalExecutor business) throws Throwable { //1.Get transactionInfo TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } //1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'. //根据线程本地变量副本,获取当前线程本地变量副本里是否存在xid,如果存在则创建一个全局事务 //刚开始在开启一个全局事务的时候,是没有全局事务的 GlobalTransaction tx = GlobalTransactionContext.getCurrent(); //1.2 Handle the transaction propagation. //从全局事务配置里,可以获取到全局事务的传播级别,默认是REQUIRED //也就是如果存在一个全局事务,就直接执行业务; //如果不存在一个全局事务,就开启一个新的全局事务; Propagation propagation = txInfo.getPropagation(); //不同的全局事务传播级别,会采取不同的处理方式 //比如挂起当前事务 + 开启新的事务,或者是直接不使用事务执行业务,挂起其实就是解绑当前线程的xid //可以通过@GlobalTransactional注解,定制业务方法的全局事务,比如指定业务方法全局事务的传播级别 SuspendedResourcesHolder suspendedResourcesHolder = null; try { switch (propagation) { case NOT_SUPPORTED: //If transaction is existing, suspend it. if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); } //Execute without transaction and return. return business.execute(); case REQUIRES_NEW: //If transaction is existing, suspend it, and then begin new transaction. if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); tx = GlobalTransactionContext.createNew(); } //Continue and execute with new transaction break; case SUPPORTS: //If transaction is not existing, execute without transaction. if (notExistingTransaction(tx)) { return business.execute(); } //Continue and execute with new transaction break; case REQUIRED: //If current transaction is existing, execute with current transaction, else continue and execute with new transaction. break; case NEVER: //If transaction is existing, throw exception. if (existingTransaction(tx)) { throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid())); } else { //Execute without transaction and return. return business.execute(); } case MANDATORY: //If transaction is not existing, throw exception. if (notExistingTransaction(tx)) { throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'"); } //Continue and execute with current transaction. break; default: throw new TransactionException("Not Supported Propagation:" + propagation); } //1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'. if (tx == null) { //如果xid为null,则会创建一个新的全局事务 tx = GlobalTransactionContext.createNew(); } //set current tx config to holder GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo); try { //2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC, //else do nothing. Of course, the hooks will still be triggered. //开启一个全局事务 beginTransaction(txInfo, tx); Object rs; try { //Do Your Business //执行业务方法,把全局事务xid通过Dubbo RPC传递下去,开启并执行一个一个分支事务 rs = business.execute(); } catch (Throwable ex) { //3. The needed business exception to rollback. //发生异常时需要完成的事务 completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } //4. everything is fine, commit. //如果一切执行正常就会在这里提交全局事务 commitTransaction(tx); return rs; } finally { //5. clear //执行一些全局事务完成后的回调,比如清理等工作 resumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); } } finally { //If the transaction is suspended, resume it. if (suspendedResourcesHolder != null) { //如果之前挂起了一个全局事务,此时可以恢复这个全局事务 tx.resume(suspendedResourcesHolder); } } } ... }
9.全局事务执行模版开启事务+提交事务+回滚事务
(1)事务执行模版的开启事务+提交事务+回滚事务
(2)默认的全局事务和默认的事务管理器对事务的开启+提交+回滚的处理
(1)事务执行模版的开启事务+提交事务+回滚事务
事务执行模版TransactionalTemplate在开启、提交、回滚事务时,会通过默认的全局事务DefaultGlobalTransaction来进行开启、提交、回滚事务。
//全局事务上下文 public class GlobalTransactionContext { private GlobalTransactionContext() { } //Try to create a new GlobalTransaction. //如果xid为null,则会创建一个新的全局事务 public static GlobalTransaction createNew() { return new DefaultGlobalTransaction(); } ... } //默认的全局事务 public class DefaultGlobalTransaction implements GlobalTransaction { private TransactionManager transactionManager; private String xid; private GlobalStatus status; private GlobalTransactionRole role; ... //Instantiates a new Default global transaction. DefaultGlobalTransaction() { //全局事务角色是全局事务发起者 this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher); } //Instantiates a new Default global transaction. DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) { this.transactionManager = TransactionManagerHolder.get();//全局事务管理者 this.xid = xid; this.status = status; this.role = role; } ... } //全局事务执行模版 public class TransactionalTemplate { ... //开启事务 private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { //开启全局事务之前有一个回调的一个钩子名为triggerBeforeBegin() triggerBeforeBegin(); //真正去开启一个全局事务 tx.begin(txInfo.getTimeOut(), txInfo.getName()); //开启全局事务之后还有一个回调钩子名为triggerAfterBegin() triggerAfterBegin(); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); } } private void triggerBeforeBegin() { for (TransactionHook hook : getCurrentHooks()) { try { hook.beforeBegin(); } catch (Exception e) { LOGGER.error("Failed execute beforeBegin in hook {}", e.getMessage(), e); } } } private void triggerAfterBegin() { for (TransactionHook hook : getCurrentHooks()) { try { hook.afterBegin(); } catch (Exception e) { LOGGER.error("Failed execute afterBegin in hook {}", e.getMessage(), e); } } } ... //提交事务 private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { triggerBeforeCommit(); tx.commit(); triggerAfterCommit(); } catch (TransactionException txe) { // 4.1 Failed to commit throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); } } private void triggerBeforeCommit() { for (TransactionHook hook : getCurrentHooks()) { try { hook.beforeCommit(); } catch (Exception e) { LOGGER.error("Failed execute beforeCommit in hook {}", e.getMessage(), e); } } } private void triggerAfterCommit() { for (TransactionHook hook : getCurrentHooks()) { try { hook.afterCommit(); } catch (Exception e) { LOGGER.error("Failed execute afterCommit in hook {}", e.getMessage(), e); } } } private void triggerAfterCompletion() { for (TransactionHook hook : getCurrentHooks()) { try { hook.afterCompletion(); } catch (Exception e) { LOGGER.error("Failed execute afterCompletion in hook {}", e.getMessage(), e); } } } ... //回滚事务 private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException { //roll back if (txInfo != null && txInfo.rollbackOn(originalException)) { try { rollbackTransaction(tx, originalException); } catch (TransactionException txe) { //Failed to rollback throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException); } } else { //not roll back on this exception, so commit commitTransaction(tx); } } private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException { triggerBeforeRollback(); tx.rollback(); triggerAfterRollback(); //3.1 Successfully rolled back throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus()) ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException); } private void triggerBeforeRollback() { for (TransactionHook hook : getCurrentHooks()) { try { hook.beforeRollback(); } catch (Exception e) { LOGGER.error("Failed execute beforeRollback in hook {}", e.getMessage(), e); } } } private void triggerAfterRollback() { for (TransactionHook hook : getCurrentHooks()) { try { hook.afterRollback(); } catch (Exception e) { LOGGER.error("Failed execute afterRollback in hook {}", e.getMessage(), e); } } } ... }
(2)默认的全局事务和默认的事务管理器对事务的开启+提交+回滚的处理
默认的全局事务DefaultGlobalTransaction在进行开启、提交、回滚事务时,会由默认的事务管理器DefaultTransactionManager来开启、提交、回滚事务。
而默认的事务管理器DefaultTransactionManager在开启、提交、回滚事务时,最终都会执行其syncCall()方法发起一个同步调用,也就是通过TmNettyRemotingClient向Seata Server发送一个Netty请求。
//默认的全局事务 public class DefaultGlobalTransaction implements GlobalTransaction { private TransactionManager transactionManager; private String xid; private GlobalStatus status; private GlobalTransactionRole role; ... @Override public void begin() throws TransactionException { begin(DEFAULT_GLOBAL_TX_TIMEOUT); } @Override public void begin(int timeout) throws TransactionException { begin(timeout, DEFAULT_GLOBAL_TX_NAME); } @Override public void begin(int timeout, String name) throws TransactionException { if (role != GlobalTransactionRole.Launcher) { assertXIDNotNull(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid); } return; } assertXIDNull(); String currentXid = RootContext.getXID(); if (currentXid != null) { throw new IllegalStateException("Global transaction already exists," + " can't begin a new global transaction, currentXid = " + currentXid); } //通过全局事务管理器去真正开启全局事务,一旦开启成功,就可以获取到一个xid xid = transactionManager.begin(null, null, name, timeout); status = GlobalStatus.Begin; //把xid绑定到RootContext的线程本地变量副本里去 RootContext.bind(xid); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction [{}]", xid); } } @Override public void commit() throws TransactionException { if (role == GlobalTransactionRole.Participant) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid); } return; } assertXIDNotNull(); int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT; try { while (retry > 0) { try { retry--; status = transactionManager.commit(xid); break; } catch (Throwable ex) { LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage()); if (retry == 0) { throw new TransactionException("Failed to report global commit", ex); } } } } finally { if (xid.equals(RootContext.getXID())) { suspend(); } } if (LOGGER.isInfoEnabled()) { LOGGER.info("[{}] commit status: {}", xid, status); } } @Override public void rollback() throws TransactionException { if (role == GlobalTransactionRole.Participant) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid); } return; } assertXIDNotNull(); int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT; try { while (retry > 0) { try { retry--; status = transactionManager.rollback(xid); break; } catch (Throwable ex) { LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage()); if (retry == 0) { throw new TransactionException("Failed to report global rollback", ex); } } } } finally { if (xid.equals(RootContext.getXID())) { suspend(); } } if (LOGGER.isInfoEnabled()) { LOGGER.info("[{}] rollback status: {}", xid, status); } } ... } public class RootContext { private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load(); public static final String KEY_XID = "TX_XID"; ... //Gets xid. @Nullable public static String getXID() { return (String) CONTEXT_HOLDER.get(KEY_XID); } //Bind xid. public static void bind(@Nonnull String xid) { if (StringUtils.isBlank(xid)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("xid is blank, switch to unbind operation!"); } unbind(); } else { MDC.put(MDC_KEY_XID, xid); if (LOGGER.isDebugEnabled()) { LOGGER.debug("bind {}", xid); } CONTEXT_HOLDER.put(KEY_XID, xid); } } ... } //默认的全局事务管理器 public class DefaultTransactionManager implements TransactionManager { @Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { //构建一个全局事务开启请求GlobalBeginRequest GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); //发起一个同步调用 GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request); if (response.getResultCode() == ResultCode.Failed) { throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg()); } return response.getXid(); } @Override public GlobalStatus commit(String xid) throws TransactionException { GlobalCommitRequest globalCommit = new GlobalCommitRequest(); globalCommit.setXid(xid); GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit); return response.getGlobalStatus(); } @Override public GlobalStatus rollback(String xid) throws TransactionException { GlobalRollbackRequest globalRollback = new GlobalRollbackRequest(); globalRollback.setXid(xid); GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback); return response.getGlobalStatus(); } ... private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException { try { //TMNettyRemotingClient会和Seata Server基于Netty建立长连接 return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request); } catch (TimeoutException toe) { throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe); } } } //GlobalBeginRequest会根据Seata的通信协议序列化成字节数组,然后通过Netty被发送到Seata Server中去 public class GlobalBeginRequest extends AbstractTransactionRequestToTC { private int timeout = 60000; private String transactionName; ... @Override public short getTypeCode() { return MessageType.TYPE_GLOBAL_BEGIN; } @Override public AbstractTransactionResponse handle(RpcContext rpcContext) { return handler.handle(this, rpcContext); } ... }
10.Seata Server集群的负载均衡机制实现源码
(1)通过负载均衡选择Seata Server节点
(2)Seata提供的负载均衡算法
(1)通过负载均衡选择Seata Server节点
默认的事务管理器DefaultTransactionManager在开启、提交、回滚事务时,最终都会执行其syncCall()方法发起一个同步调用,也就是通过TmNettyRemotingClient向Seata Server发送一个Netty请求。
syncCall()方法在调用TmNettyRemotingClient实例的sendSyncRequest()方法发送请求时,其实调用的是TmNettyRemotingClient的抽象父类AbstractNettyRemotingClient的sendSyncRequest()方法。
在sendSyncRequest()方法中,首先会调用AbstractNettyRemotingClient的loadBalance()方法进行负载均衡,也就是首先会调用AbstractNettyRemotingClient.doSelect()方法。
AbstractNettyRemotingClient的doSelect()方法会先通过LoadBalanceFactory工厂 + SPI来获取一个LoadBalance实例,然后再调用LoadBalance实例的select()方法来进行负载均衡。
负载均衡,其实就是从Seata Server节点中选择其中一个节点发送请求。
//默认的全局事务管理器 public class DefaultTransactionManager implements TransactionManager { ... private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException { try { //TMNettyRemotingClient会和Seata Server基于Netty建立长连接 return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request); } catch (TimeoutException toe) { throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe); } } ... } public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient { ... @Override public Object sendSyncRequest(Object msg) throws TimeoutException { //因为Seata Server是可以多节点部署实现高可用架构的,所以这里调用loadBalance()方法进行负载均衡 String serverAddress = loadBalance(getTransactionServiceGroup(), msg); //获取RPC调用的超时时间 long timeoutMillis = this.getRpcRequestTimeout(); //构建一个RPC消息 RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); //send batch message //put message into basketMap, @see MergedSendRunnable //默认是不开启批量消息发送 if (this.isEnableClientBatchSendRequest()) { ... } else { //通过网络连接管理器clientChannelManager,获取与指定Seata Server建立的网络连接Channel //然后通过网络连接Channel把RpcMessage发送给Seata Server Channel channel = clientChannelManager.acquireChannel(serverAddress); return super.sendSync(channel, rpcMessage, timeoutMillis); } } protected String loadBalance(String transactionServiceGroup, Object msg) { InetSocketAddress address = null; try { @SuppressWarnings("unchecked") List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().aliveLookup(transactionServiceGroup); address = this.doSelect(inetSocketAddressList, msg); } catch (Exception ex) { LOGGER.error(ex.getMessage()); } if (address == null) { throw new FrameworkException(NoAvailableService); } return NetUtil.toStringAddress(address); } protected InetSocketAddress doSelect(List<InetSocketAddress> list, Object msg) throws Exception { if (CollectionUtils.isNotEmpty(list)) { if (list.size() > 1) { return LoadBalanceFactory.getInstance().select(list, getXid(msg)); } else { return list.get(0); } } return null; } ... } public class LoadBalanceFactory { ... public static LoadBalance getInstance() { //根据SPI机制获取LoadBalance实例 String config = ConfigurationFactory.getInstance().getConfig(LOAD_BALANCE_TYPE, DEFAULT_LOAD_BALANCE); return EnhancedServiceLoader.load(LoadBalance.class, config); } }
(2)Seata提供的负载均衡算法
轮询选择算法、随机选择算法、最少使用算法、一致性哈希算法。
一.轮询选择算法
@LoadLevel(name = ROUND_ROBIN_LOAD_BALANCE) public class RoundRobinLoadBalance implements LoadBalance { private final AtomicInteger sequence = new AtomicInteger(); @Override public <T> T select(List<T> invokers, String xid) { int length = invokers.size(); //通过轮询选择Seata Server的节点 return invokers.get(getPositiveSequence() % length); } private int getPositiveSequence() { for (;;) { int current = sequence.get(); int next = current >= Integer.MAX_VALUE ? 0 : current + 1; if (sequence.compareAndSet(current, next)) { return current; } } } }
二.随机选择算法
@LoadLevel(name = RANDOM_LOAD_BALANCE) public class RandomLoadBalance implements LoadBalance { @Override public <T> T select(List<T> invokers, String xid) { int length = invokers.size(); return invokers.get(ThreadLocalRandom.current().nextInt(length)); } }
三.最少使用算法
@LoadLevel(name = LEAST_ACTIVE_LOAD_BALANCE) public class LeastActiveLoadBalance implements LoadBalance { @Override public <T> T select(List<T> invokers, String xid) { int length = invokers.size(); long leastActive = -1; int leastCount = 0; int[] leastIndexes = new int[length]; for (int i = 0; i < length; i++) { long active = RpcStatus.getStatus(invokers.get(i).toString()).getActive(); if (leastActive == -1 || active < leastActive) { leastActive = active; leastCount = 1; leastIndexes[0] = i; } else if (active == leastActive) { leastIndexes[leastCount++] = i; } } if (leastCount == 1) { return invokers.get(leastIndexes[0]); } return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); } }
四.一致性哈希算法
@LoadLevel(name = CONSISTENT_HASH_LOAD_BALANCE) public class ConsistentHashLoadBalance implements LoadBalance { public static final String LOAD_BALANCE_CONSISTENT_HASH_VISUAL_NODES = LOAD_BALANCE_PREFIX + "visualNodes"; private static final int VIRTUAL_NODES_NUM = ConfigurationFactory.getInstance().getInt(LOAD_BALANCE_CONSISTENT_HASH_VISUAL_NODES, VIRTUAL_NODES_DEFAULT); @Override public <T> T select(List<T> invokers, String xid) { //通过一致性哈希选择节点 return new ConsistentHashSelector<>(invokers, VIRTUAL_NODES_NUM).select(xid); } private static final class ConsistentHashSelector<T> { private final SortedMap<Long, T> virtualInvokers = new TreeMap<>(); private final HashFunction hashFunction = new MD5Hash(); ConsistentHashSelector(List<T> invokers, int virtualNodes) { for (T invoker : invokers) { for (int i = 0; i < virtualNodes; i++) { virtualInvokers.put(hashFunction.hash(invoker.toString() + i), invoker); } } } public T select(String objectKey) { SortedMap<Long, T> tailMap = virtualInvokers.tailMap(hashFunction.hash(objectKey)); Long nodeHashVal = tailMap.isEmpty() ? virtualInvokers.firstKey() : tailMap.firstKey(); return virtualInvokers.get(nodeHashVal); } } @SuppressWarnings("lgtm[java/weak-cryptographic-algorithm]") private static class MD5Hash implements HashFunction { MessageDigest instance; public MD5Hash() { try { instance = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } } @Override public long hash(String key) { instance.reset(); instance.update(key.getBytes()); byte[] digest = instance.digest(); long h = 0; for (int i = 0; i < 4; i++) { h <<= 8; h |= ((int) digest[i]) & 0xFF; } return h; } } public interface HashFunction { long hash(String key); } }
11.Seata Client向Seata Server发送请求的源码
首先Seata Client会通过网络连接管理器ClientChannelManager获取与指定Seata Server建立的网络连接Channel。
然后通过Netty的Channel把RpcMessage请求消息发送给Seata Server,也就是执行Channel的writeAndFlush()方法将RpcMessage请求消息异步发送给Seata Server。
其中,Seata Client会将发送的请求消息封装在一个MessageFuture实例中。并且,Seata Client会通过MessageFuture同步等待Seata Server返回该请求的响应。而MessageFuture请求响应组件是通过CompletableFuture实现同步等待的。
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient { ... @Override public Object sendSyncRequest(Object msg) throws TimeoutException { //因为Seata Server是可以多节点部署实现高可用架构的,所以这里调用loadBalance()方法进行负载均衡 String serverAddress = loadBalance(getTransactionServiceGroup(), msg); //获取RPC调用的超时时间 long timeoutMillis = this.getRpcRequestTimeout(); //构建一个RPC消息 RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); //send batch message //put message into basketMap, @see MergedSendRunnable //默认是不开启批量消息发送 if (this.isEnableClientBatchSendRequest()) { ... } else { //通过网络连接管理器clientChannelManager,获取与指定Seata Server建立的网络连接Channel //然后通过网络连接Channel把RpcMessage发送给Seata Server Channel channel = clientChannelManager.acquireChannel(serverAddress); return super.sendSync(channel, rpcMessage, timeoutMillis); } } ... } public abstract class AbstractNettyRemoting implements Disposable { ... protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException { if (timeoutMillis <= 0) { throw new FrameworkException("timeout should more than 0ms"); } if (channel == null) { LOGGER.warn("sendSync nothing, caused by null channel."); return null; } //把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里 MessageFuture messageFuture = new MessageFuture(); messageFuture.setRequestMessage(rpcMessage); messageFuture.setTimeout(timeoutMillis); futures.put(rpcMessage.getId(), messageFuture); channelWritableCheck(channel, rpcMessage.getBody()); //获取远程地址 String remoteAddr = ChannelUtil.getAddressFromChannel(channel); doBeforeRpcHooks(remoteAddr, rpcMessage); //通过Netty的Channel异步化发送数据,同时对发送结果添加监听器 //如果发送失败,则会对网络连接Channel进行销毁处理 channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { MessageFuture messageFuture1 = futures.remove(rpcMessage.getId()); if (messageFuture1 != null) { messageFuture1.setResultMessage(future.cause()); } destroyChannel(future.channel()); } }); try { //然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应 Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); doAfterRpcHooks(remoteAddr, rpcMessage, result); return result; } catch (Exception exx) { LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody()); if (exx instanceof TimeoutException) { throw (TimeoutException) exx; } else { throw new RuntimeException(exx); } } } ... } public class MessageFuture { private transient CompletableFuture<Object> origin = new CompletableFuture<>(); ... public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException { Object result = null; try { result = origin.get(timeout, unit); if (result instanceof TimeoutException) { throw (TimeoutException)result; } } catch (ExecutionException e) { throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e); } catch (TimeoutException e) { throw new TimeoutException(String.format("%s ,cost: %d ms", e.getMessage(), System.currentTimeMillis() - start)); } if (result instanceof RuntimeException) { throw (RuntimeException)result; } else if (result instanceof Throwable) { throw new RuntimeException((Throwable)result); } return result; } ... }
12.Client将RpcMessage对象编码成字节数组
Seata Client在调用Channel的writeAndFlush()方法将RpcMessage对象发送给Seata Server时,会先将RpcMessage对象交给NettyClientBootstrap的ChannelPipeline进行处理。其中,RpcMessage对象会被ProtocolV1Encoder编码成字节数组。
public class NettyClientBootstrap implements RemotingBootstrap { ... @Override public void start() { if (this.defaultEventExecutorGroup == null) { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(), new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()), nettyClientConfig.getClientWorkerThreads())); } //基于Netty API构建一个Bootstrap //设置好对应的NioEventLoopGroup线程池组,默认1个线程就够了 this.bootstrap.group(this.eventLoopGroupWorker) .channel(nettyClientConfig.getClientChannelClazz()) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()); if (nettyClientConfig.enableNative()) { if (PlatformDependent.isOsx()) { if (LOGGER.isInfoEnabled()) { LOGGER.info("client run on macOS"); } } else { bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED).option(EpollChannelOption.TCP_QUICKACK, true); } } //对Netty网络通信数据处理组件pipeline进行初始化 bootstrap.handler( new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); //IdleStateHandler,空闲状态检查Handler //如果有数据通过就记录一下时间 //如果超过很长时间没有数据通过,即处于空闲状态,那么就会触发一个user triggered event出去给ClientHandler来进行处理 pipeline.addLast(new IdleStateHandler( nettyClientConfig.getChannelMaxReadIdleSeconds(), nettyClientConfig.getChannelMaxWriteIdleSeconds(), nettyClientConfig.getChannelMaxAllIdleSeconds() )) .addLast(new ProtocolV1Decoder())//基于Seata通信协议的编码器 .addLast(new ProtocolV1Encoder());//基于Seata通信协议的解码器 if (channelHandlers != null) { addChannelPipelineLast(ch, channelHandlers); } } } ); if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) { LOGGER.info("NettyClientBootstrap has started"); } } ... } public class ProtocolV1Encoder extends MessageToByteEncoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolV1Encoder.class); @Override public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) { try { if (msg instanceof RpcMessage) { RpcMessage rpcMessage = (RpcMessage) msg; //完整的消息长度 int fullLength = ProtocolConstants.V1_HEAD_LENGTH; //消息头的长度 int headLength = ProtocolConstants.V1_HEAD_LENGTH; //获取消息类型 byte messageType = rpcMessage.getMessageType(); //先写入魔数MagicNumber,通过魔数代表一条消息的开始 out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES); //然后写入版本号 out.writeByte(ProtocolConstants.VERSION); //full Length(4B) and head length(2B) will fix in the end. //接着标记写入index的位置:当前写入的字节数 + 6,就是标记的writerIndex //可以理解为直接让writerIndex跳过了6字节,这6个字节的内容先空出来不写 //最后写完具体的消息后,再把这6个字节代表的消息长度和消息头长度补回来 //空出来的6个字节 = 4个字节的消息长度 + 2个字节的消息头长度 out.writerIndex(out.writerIndex() + 6); //此时消息长度和消息头长度,还没统计出来,所以先跳过6个字节 //也就是从版本号之后的第6个字节开始写:消息类型、codec、compressor out.writeByte(messageType); out.writeByte(rpcMessage.getCodec()); out.writeByte(rpcMessage.getCompressor()); //接着写入4个字节的消息ID out.writeInt(rpcMessage.getId()); //direct write head with zero-copy //获取消息头 Map<String, String> headMap = rpcMessage.getHeadMap(); if (headMap != null && !headMap.isEmpty()) { //对消息头进行编码,把Map转换为字节数据写入到out里面,此时才是在写消息头 //写完消息头之后,便可以获取到消息头长度headLength了 int headMapBytesLength = HeadMapSerializer.getInstance().encode(headMap, out); headLength += headMapBytesLength; fullLength += headMapBytesLength; } byte[] bodyBytes = null; //根据消息类型对消息体进行序列化 if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { //heartbeat has no body //根据RpcMessage对象的codec属性通过SPI机制获取serializer序列化组件 Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec())); //通过serializer对消息体进行序列化 bodyBytes = serializer.serialize(rpcMessage.getBody()); //根据RpcMessage对象的compressor属性通过SPI机制获取compressor压缩组件 Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor()); //通过compressor对字节数组进行压缩 bodyBytes = compressor.compress(bodyBytes); fullLength += bodyBytes.length; } if (bodyBytes != null) { out.writeBytes(bodyBytes); } //fix fullLength and headLength int writeIndex = out.writerIndex(); //skip magic code(2B) + version(1B) out.writerIndex(writeIndex - fullLength + 3); out.writeInt(fullLength); out.writeShort(headLength); out.writerIndex(writeIndex); } else { throw new UnsupportedOperationException("Not support this class:" + msg.getClass()); } } catch (Throwable e) { LOGGER.error("Encode request error!", e); } } }
13.Server将字节数组解码成RpcMessage对象
Seata Server收到Seata Client发来的字节数组时,会先将字节数组交给NettyServerBootstrap的ChannelPipeline进行处理。其中,字节数组会被ProtocolV1Decoder解码成RpcMessage对象。
public class NettyServerBootstrap implements RemotingBootstrap { ... @Override public void start() { this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker) .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ) .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize()) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize()) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark())) .localAddress(new InetSocketAddress(getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) .addLast(new ProtocolV1Decoder()) .addLast(new ProtocolV1Encoder()); if (channelHandlers != null) { addChannelPipelineLast(ch, channelHandlers); } } } ); try { this.serverBootstrap.bind(getListenPort()).sync(); XID.setPort(getListenPort()); LOGGER.info("Server started, service listen port: {}", getListenPort()); RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort())); initialized.set(true); } catch (SocketException se) { throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se); } catch (Exception exx) { throw new RuntimeException("Server start failed", exx); } } ... } public class ProtocolV1Decoder extends LengthFieldBasedFrameDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolV1Decoder.class); //为了解决粘包和拆包的问题,这里基于LengthFieldBasedFrameDecoder按照整帧来进行解码 public ProtocolV1Decoder() { // default is 8M this(ProtocolConstants.MAX_FRAME_LENGTH); } public ProtocolV1Decoder(int maxFrameLength) { //最大的帧长度是8M,所以一个消息数据不能超过8M //开头是2个字节的魔数、1个字节的版本号、然后第4个字节开始是4个字节的FullLength super(maxFrameLength, 3, 4, -7, 0); } //每一个整帧解出来之后,就可以通过decode()方法,把字节数组转为RpcMessage对象 @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { Object decoded; try { //调用decode()方法进行解帧 decoded = super.decode(ctx, in); if (decoded instanceof ByteBuf) { ByteBuf frame = (ByteBuf)decoded; try { return decodeFrame(frame); } finally { frame.release(); } } } catch (Exception exx) { LOGGER.error("Decode frame error, cause: {}", exx.getMessage()); throw new DecodeException(exx); } return decoded; } public Object decodeFrame(ByteBuf frame) { //开头两个byte是魔数 byte b0 = frame.readByte(); byte b1 = frame.readByte(); if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0 || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) { throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1); } //获取到version版本号 byte version = frame.readByte(); int fullLength = frame.readInt(); short headLength = frame.readShort(); byte messageType = frame.readByte(); byte codecType = frame.readByte(); byte compressorType = frame.readByte(); int requestId = frame.readInt(); RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setCodec(codecType); rpcMessage.setId(requestId); rpcMessage.setCompressor(compressorType); rpcMessage.setMessageType(messageType); //direct read head with zero-copy int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH; if (headMapLength > 0) { Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength); rpcMessage.getHeadMap().putAll(map); } //read body if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) { rpcMessage.setBody(HeartbeatMessage.PING); } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { rpcMessage.setBody(HeartbeatMessage.PONG); } else { int bodyLength = fullLength - headLength; if (bodyLength > 0) { byte[] bs = new byte[bodyLength]; frame.readBytes(bs); //先获取到压缩组件,对消息体字节数组进行解压缩 Compressor compressor = CompressorFactory.getCompressor(compressorType); bs = compressor.decompress(bs); //然后对解压缩完的数据,根据序列化类型进行反序列化,获取到消息体对象 Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec())); rpcMessage.setBody(serializer.deserialize(bs)); } } return rpcMessage; } }
14.Server处理已解码的RpcMessage对象的流程
Seata Server将收到的网络请求字节数组解码成RpcMessage对象后,便会将RpcMessage对象交给NettyServerBootstrap的ServerHandler进行处理,也就是交给ServerHandler的channelRead()方法进行处理。
ServerHandler的channelRead()方法会调用AbstractNettyRemoting的processMessage()方法,也就是调用ServerOnRequestProcessor的process()方法来实现对RpcMessage对象的处理。
在ServerOnRequestProcessor的process()方法的处理过程中,会调用TransactionMessageHandler的onRequest()方法处理RpcMessage对象。
由于Server.start()初始化NettyRemotingServer时,设置了TransactionMessageHandler为DefaultCoordinator,所以最终就会调用DefaultCoordinator的onRequest()方法来处理RpcMessage对象。
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer { ... @ChannelHandler.Sharable class ServerHandler extends ChannelDuplexHandler { //Channel read. @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof RpcMessage)) { return; } //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理 processMessage(ctx, (RpcMessage) msg); } ... } ... } public abstract class AbstractNettyRemoting implements Disposable { ... protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody())); } Object body = rpcMessage.getBody(); if (body instanceof MessageTypeAware) { MessageTypeAware messageTypeAware = (MessageTypeAware) body; //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的 //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的 //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理 final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode()); if (pair != null) { if (pair.getSecond() != null) { try { pair.getSecond().execute(() -> { try { pair.getFirst().process(ctx, rpcMessage); } catch (Throwable th) { LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); } finally { MDC.clear(); } }); } catch (RejectedExecutionException e) { LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(), "thread pool is full, current max pool size is " + messageExecutor.getActiveCount()); if (allowDumpStack) { String name = ManagementFactory.getRuntimeMXBean().getName(); String pid = name.split("@")[0]; long idx = System.currentTimeMillis(); try { String jstackFile = idx + ".log"; LOGGER.info("jstack command will dump to " + jstackFile); Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile)); } catch (IOException exx) { LOGGER.error(exx.getMessage()); } allowDumpStack = false; } } } else { try { pair.getFirst().process(ctx, rpcMessage); } catch (Throwable th) { LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); } } } else { LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode()); } } else { LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body); } } ... } public class NettyRemotingServer extends AbstractNettyRemotingServer { ... private void registerProcessor() { //1. registry on request message processor ServerOnRequestProcessor onRequestProcessor = new ServerOnRequestProcessor(this, getHandler()); ShutdownHook.getInstance().addDisposable(onRequestProcessor); super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor); //2. registry on response message processor ServerOnResponseProcessor onResponseProcessor = new ServerOnResponseProcessor(getHandler(), getFutures()); super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor); super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor); //3. registry rm message processor RegRmProcessor regRmProcessor = new RegRmProcessor(this); super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor); //4. registry tm message processor RegTmProcessor regTmProcessor = new RegTmProcessor(this); super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null); //5. registry heartbeat message processor ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this); super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null); } ... } public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer { ... @Override public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) { Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor); this.processorTable.put(messageType, pair); } ... } public abstract class AbstractNettyRemoting implements Disposable { ... //This container holds all processors. protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32); ... } public class ServerOnRequestProcessor implements RemotingProcessor, Disposable { private final TransactionMessageHandler transactionMessageHandler; ... @Override public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { if (ChannelManager.isRegistered(ctx.channel())) { onRequestMessage(ctx, rpcMessage); } else { try { if (LOGGER.isInfoEnabled()) { LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel()); } ctx.disconnect(); ctx.close(); } catch (Exception exx) { LOGGER.error(exx.getMessage()); } if (LOGGER.isInfoEnabled()) { LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString())); } } } private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) { Object message = rpcMessage.getBody(); //RpcContext线程本地变量副本 RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup()); } else { try { BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup()); } catch (InterruptedException e) { LOGGER.error("put message to logQueue error: {}", e.getMessage(), e); } } if (!(message instanceof AbstractMessage)) { return; } //the batch send request message if (message instanceof MergedWarpMessage) { ... } else { //the single send request message final AbstractMessage msg = (AbstractMessage) message; //最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext); remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result); } } ... } //Server端的全局事务处理逻辑组件 //其中包含了:开启很多后台线程、处理开启全局事务、处理提交全局事务、处理回滚全局事务、处理全局事务状态的上报、处理分支事务的注册、 //本地检查、超时检查、重试回滚、重试提交、异步提交、Undo Log的删除 public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable { ... @Override public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) { if (!(request instanceof AbstractTransactionRequestToTC)) { throw new IllegalArgumentException(); } AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request; transactionRequest.setTCInboundHandler(this); return transactionRequest.handle(context); } ... }
15.Seata Server开启全局事务的流程源码
注意:创建一个全局事务会话后,会通过slf4j的MDC把xid放入线程本地变量副本里。
//Server端的全局事务处理逻辑组件 //其中包含了:开启很多后台线程、处理开启全局事务、处理提交全局事务、处理回滚全局事务、处理全局事务状态的上报、处理分支事务的注册、 //本地检查、超时检查、重试回滚、重试提交、异步提交、Undo Log的删除 public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable { ... @Override public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) { if (!(request instanceof AbstractTransactionRequestToTC)) { throw new IllegalArgumentException(); } AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request; transactionRequest.setTCInboundHandler(this); return transactionRequest.handle(context); } ... } public class GlobalBeginRequest extends AbstractTransactionRequestToTC { ... @Override public AbstractTransactionResponse handle(RpcContext rpcContext) { return handler.handle(this, rpcContext); } ... } //The type Abstract tc inbound handler. public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler { ... @Override public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) { GlobalBeginResponse response = new GlobalBeginResponse(); exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() { @Override public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException { try { //开启全局事务 doGlobalBegin(request, response, rpcContext); } catch (StoreException e) { throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()), e); } } }, request, response); return response; } //Do global begin. protected abstract void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException; ... } public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable { private final DefaultCore core; ... @Override protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { //接下来才真正处理开启全局事务的业务逻辑 //其中会调用DefaultCore来真正开启一个全局事务,即拿到xid并设置到响应里去 response.setXid(core.begin( rpcContext.getApplicationId(),//应用程序id rpcContext.getTransactionServiceGroup(),//事务服务分组 request.getTransactionName(),//事务名称 request.getTimeout())//超时时间 ); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid()); } } ... } public class DefaultCore implements Core { ... @Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { //创建一个全局事务会话 GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout); //通过slf4j的MDC把xid放入线程本地变量副本里去 MDC.put(RootContext.MDC_KEY_XID, session.getXid()); //添加一个全局事务会话的生命周期监听器 session.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); //打开Session session.begin(); //transaction start event,发布会话开启事件 MetricsPublisher.postSessionDoingEvent(session, false); //返回全局事务会话的xid return session.getXid(); } ... } public class GlobalSession implements SessionLifecycle, SessionStorable { ... public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) { GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false); return session; } public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) { //全局事务id是通过UUIDGenerator来生成的 this.transactionId = UUIDGenerator.generateUUID(); this.status = GlobalStatus.Begin; this.lazyLoadBranch = lazyLoadBranch; if (!lazyLoadBranch) { this.branchSessions = new ArrayList<>(); } this.applicationId = applicationId; this.transactionServiceGroup = transactionServiceGroup; this.transactionName = transactionName; this.timeout = timeout; //根据UUIDGenerator生成的transactionId + XID工具生成最终的xid this.xid = XID.generateXID(transactionId); } ... }
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等