StormのGuaranteeing message processing

コミティア102に参加してきたこととか、中嶋ちずな先生のサイン会に行ってきたことでも書こうかと思った。しかし、前者については多幸状態で本を読み買っていた記憶しかない。同じく後者についてもぼたんちゃんがおもらしを我慢したり、致してしまったあとに現実から逃げる姿がたまらんという意味不明な供述をしており動機は不明って感じという有様である。
なので、今日はStormのGuaranteeing message processingについて書こうと思う。


原典はここ:https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

StormにはSpoutで生成したTupleが最後段のBoltまで正常に処理されたか見ておいてくれるという機能があるんだけど、どうやってるのかはちゃんと理解できていなかった。Word countみたいなごく簡単な例ですら、文を分割して、1単語ずつTupleとしてemitすると入出力のTupleの数は一致しなくなるのに、なぜそういうことができるのか?

実際には完全に自動でやってくれるというウマイ話はなくて、開発者が注意してコードを書く必要がある。

やらなければならないことは、以下の3つ(実質は2つ)

  • SpoutからTupleをemitする際に固有かつ不変なメッセージIDをつける
  • Boltでは、処理が正常に終了すればackメソッドを実行し、失敗したならfailメソッドを実行する。
  • もし、Boltにおける処理で、入力するTupleの数と出力するTupleの数が異なる場合には、入力Tupleと出力Tupleを対応付ける(原典ではAnchoringと呼んでいる)


もう一つ加えるとするならば、Spoutからメッセージキューにアクセスした際に、メッセージを取得しても良いけどそのメッセージをキューから消してはならないということ(peekはいいけどpollはだめ)メッセージキューから消してしまうと、failしてしまった時に再送できなくなってしまう。すべてのBoltから、あるメッセージに対してのackが返ってくると、そのメッセージに付けられたIDを引数にSpoutのackメソッドが実行されるので、その時に初めて消していい。

SpoutのackメソッドはすべてのBoltからackが返ってこなければ実行されないが、failはどこかのBoltでfailメソッドを実行すればすぐに実行される(当然ではあるが)


以降は実際にサンプルを示していく。原典においては、メッセージキューとしてKestrelを使ったサンプルが紹介されているが、いささか混み入っているので、すこし簡単なものを実装してみた。
コードはGithubに置いた。:https://github.com/pengin/storm-sample
名前空間は面倒だったのでStorm-starterのものをそのまま使っていたり。

このサンプルでは、文を15文送信しWord countをこなしつつ、あるTupleに関してすべてのBoltからackが返った際にはそのTupleのIDを出力し、どこかのBoltからfailが返った際にはそのTupleのIDを出力し、送信用のキューに再送するTupleをセットする。あまりイケてない実装だけれども、コード自体は単純なのですぐ理解してもらえると思う。

該当のコードは以下のような感じだ。

@Override
public void nextTuple() {
	Utils.sleep(100);
	if(msgId < 15){
		_collector.emit(new Values(_baseSentence[msgId % _baseSentence.length]), msgId);
		msgId++;
	}else if(!_emitQueue.isEmpty()){
		emitItem item = _emitQueue.poll();
		_collector.emit(new Values(item.getSentence()), item.getMsgId());
	}
}

@Override
public void ack(Object msgId) {
	log.info(String.format("acked message ID : %d", (Integer)msgId));
}

@Override
public void fail(Object msgId) {
	log.info(String.format("failed message ID : %d", (Integer)msgId));
	_emitQueue.add(new emitItem(_baseSentence[(Integer)msgId % _baseSentence.length], (Integer)msgId));
}

Topologyの定義はここでは特に解説しない。このサンプルを動かす際に見てもらいたいのは、Bolt(bolt/SplitSentenceBolt.javaなど)において

_collector.ack(input);

_collector.fail(input);

と変えた時にどういう動きをするかである。