程序员社区

Storm关于如何保证事务顺序性的源码分析

关于如何保证事务顺序性的源码分析

我们看一下事务协调TransactionalSpoutCoordinator类的主要代码实现...

首先看看最重要的nextTuple方法

@Override
    public void nextTuple() {
        sync();
    }

这里调用了sync()方法

private void sync() {
        // note that sometimes the tuples active may be less than max_spout_pending, e.g.
        // max_spout_pending = 3
        // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
        // and there won't be a batch for tx 4 because there's max_spout_pending tx active
        TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
        if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
            maybeCommit.status = AttemptStatus.COMMITTING;
            _collector.emit(TRANSACTION_COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
        }
        
        try {
            if(_activeTx.size() < _maxTransactionActive) {
                BigInteger curr = _currTransaction;
                for(int i=0; i<_maxTransactionActive; i++) {
                    if((_coordinatorState.hasCache(curr) || _coordinator.isReady())
                            && !_activeTx.containsKey(curr)) {
                        TransactionAttempt attempt = new TransactionAttempt(curr, _rand.nextLong());
                        Object state = _coordinatorState.getState(curr, _initializer);
                        _activeTx.put(curr, new TransactionStatus(attempt));
                        _collector.emit(TRANSACTION_BATCH_STREAM_ID, new Values(attempt, state, previousTransactionId(_currTransaction)), attempt);
                    }
                    curr = nextTransactionId(curr);
                }
            }     
        } catch(FailedException e) {
            LOG.warn("Failed to get metadata for a transaction", e);
        }
    }

sync()方法分析,假设我们是第一次发生消息,这里会直接进入发送事务消息流( Batch流 ),即会走sync方法中的这样一段代码。

if(_activeTx.size() < _maxTransactionActive) {
    BigInteger curr = _currTransaction;
    for(int i=0; i<_maxTransactionActive; i++) {
        if((_coordinatorState.hasCache(curr) || _coordinator.isReady())
                && !_activeTx.containsKey(curr)) {
            TransactionAttempt attempt = new TransactionAttempt(curr, _rand.nextLong());
            Object state = _coordinatorState.getState(curr, _initializer);
            _activeTx.put(curr, new TransactionStatus(attempt));
            _collector.emit(TRANSACTION_BATCH_STREAM_ID, new Values(attempt, state, previousTransactionId(_currTransaction)), attempt);
        }
        curr = nextTransactionId(curr);
    }
}    

上面代码的关键是走了_collector.emit()方法,这里发送了事务消息流,接下来会以全局分组方式进行消息传输, 也就意味着从协调Spout中发送的一条事务尝试消息都会被所有的消息发送Bolt节点接收。

接下来会到消息发送bolt节点去看看源代码

public void execute(Tuple tuple) {
        ...
        if(type==TupleType.ID) {
            synchronized(_tracked) {
                track.receivedId = true;
            }
            checkFinishId(tuple, type);            
        } else if(type==TupleType.COORD) {
            int count = (Integer) tuple.getValue(1);
            synchronized(_tracked) {
                track.reportCount++;
                track.expectedTupleCount+=count;
            }
            checkFinishId(tuple, type);
        } else {            
            synchronized(_tracked) {
                //看这里
                _delegate.execute(tuple);
            }
        }
    }

上面这是消息发送节点的代码,最重要的execute方法中会调用_delegate.execute(tuple);方法发送tuple,需要注意消息发送节点其实是对TransactionalSpoutBatchExecutor实例对象的封装,_delegate的真实类型就是TransactionalSpoutBatchExecutor实例对象。

接下来看看TransactionalSpoutBatchExecutor类的execute方法

@Override
    public void execute(Tuple input) {
        TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
        ....略
                //看这里,提交批处理
                _emitter.emitBatch(attempt, input.getValue(1), _collector);
                _activeTransactions.put(attempt.getTransactionId(), attempt);
                _collector.ack(input);
                BigInteger committed = (BigInteger) input.getValue(2);
                if(committed!=null) {
                    // valid to delete before what's been committed since 
                    // those batches will never be accessed again
                    _activeTransactions.headMap(committed).clear();
                    _emitter.cleanupBefore(committed);
                }
            }
      ....略

上面的_emitter.emitBatch(attempt, input.getValue(1), _collector);很关键。

会调用我们自定义ITransactionalSpout接口的内部接口的实现类实例对象_emitter,真实类型为ITransactionalSpout.Emitter,
进而调用我们自定义的emitBatch方法,去发送tuple到后续的bolt。

这样消息就发送完毕了,那么消息是通过什么机制提交的呢?

其实消息是通过ack机制完成提交的...

回到事务协调TransactionalSpoutCoordinator类的代码,我们看他的ack方法

 @Override
    public void ack(Object msgId) {
        TransactionAttempt tx = (TransactionAttempt) msgId;
        TransactionStatus status = _activeTx.get(tx.getTransactionId());
        if(status!=null && tx.equals(status.attempt)) {
            if(status.status==AttemptStatus.PROCESSING) {
                status.status = AttemptStatus.PROCESSED;
            } else if(status.status==AttemptStatus.COMMITTING) {
                _activeTx.remove(tx.getTransactionId());
                _coordinatorState.cleanupBefore(tx.getTransactionId());
                _currTransaction = nextTransactionId(tx.getTransactionId());
                _state.setData(CURRENT_TX, _currTransaction);
            }
            sync();
        }
    }

上面ack代码可以知道,如果事务状态的类型为PROCESSING,就将该事务变为PROCESSED,这主要为sync中方法中能够正确执行。

看一下sync中方法

private void sync() {
  TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
  if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
      maybeCommit.status = AttemptStatus.COMMITTING;
      _collector.emit(TRANSACTION_COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
  } 
}

上面可以看到,sync方法会进行判断,会发送事务提交流( Commit流 )提交该事务。那么如何保证事务的顺序性呢,其实就是依靠_currTransaction这个属性,总览TransactionalSpoutCoordinator类的所有方法中,有两个地方对该属性进行了更新,一是open初始化时才会对_currTransaction这个值进行更新,也就是最初的事务,第二是刚才的ack方法中,如果提交了事务就会调用nextTransactionId方法,对_currTransaction值进行更新,同时删除_activeTx集合里的该事务信息,这样通过_currTransaction值就可以保证事务的有序性。只有前一个事务提交后续的事务才能提交。

而且下面这段sync方法的代码也很关键。

if(_activeTx.size() < _maxTransactionActive) {
    BigInteger curr = _currTransaction;
    for(int i=0; i<_maxTransactionActive; i++) {
        if((_coordinatorState.hasCache(curr) || _coordinator.isReady())
                && !_activeTx.containsKey(curr)) {
            TransactionAttempt attempt = new TransactionAttempt(curr, _rand.nextLong());
            Object state = _coordinatorState.getState(curr, _initializer);
            _activeTx.put(curr, new TransactionStatus(attempt));
            _collector.emit(TRANSACTION_BATCH_STREAM_ID, new Values(attempt, state, previousTransactionId(_currTransaction)), attempt);
        }
        curr = nextTransactionId(curr);
    }
}    

可以看到,只有_activeTx集合里的_currTransaction信息不存在了,才会提交下一个事务信息。

这样通过上一个事务正确的提交和下一个事务的正确开启,两者共同作用,保证了事务的有序性。

下篇文章会详细讨论Storm中CoordinatedBolt类是如何保证finishBatch方法被正确调用的原理进行分析。

赞(0) 打赏
未经允许不得转载:IDEA激活码 » Storm关于如何保证事务顺序性的源码分析

相关推荐

  • 暂无文章

一个分享Java & Python知识的社区