読者です 読者をやめる 読者になる 読者になる

苦労して覚えるStorm -激闘、Trident編 その1-

夏コミで、Tridentの本出します(しれっと)

んで最近Trident使ってるんですけど、公式のドキュメントが割とファックな感じでキレそう。(Tutorialの割りにやってることはそこそこ難しいし、用語の定義もいまいちはっきりしていないのでいま何やってるかよくわからん)

あまりのイミフっぷりに、日本人は誰も使おうとしていないのか、日本語のリソースもほとんどないんで、ここでも適当に書き下そうかなと。

Tridentとは

Stormの内部DSLです。HadoopにおけるHiveとかPigのようにSQL風のAPIで、StormのTopologyを作ることができます。

TupleとBatch

Tupleとは、Stream(SpoutとBoltをつなげたもの、Topologyのサブセット)に流すメッセージのことを言います。素のStormのBoltでは、これを1つずつ取り扱っていました。一方でTridentではTupleを数個〜数十個にまとめてBatchとして扱います。
Stormドキュメントの図を抜粋すると以下のようなイメージ。

f:id:Xray:20130711235354p:plain

Partition

Tridentのドキュメント見ていると、Partitionという単語がよく出てきます。each partition、とか partition local、とか。色々考えた結果、Partitionとは、つまり以下の図の青い部分を指すという結論に至りました。多分(ドキュメントに明確な定義が無いと思う)

f:id:Xray:20130711235048j:plain:w480

TridentでWordCount

Tutorial内の解説は、RPCガーとかStateガーQueryガーと、けっこうノイズが多い感じなので、極力ノイズを省いたコードが以下になります。

すいません、今回は眠いのでここまでで。ソースコードの解説はまた今度。

public class WordCount {
	public static StormTopology buildTopology(){
		FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, 
					new Values("aikatsu is idol activities"),
					new Values("love Live is a school idol project"),
					new Values("idol master is a robot anime"));
		spout.setCycle(true); // 上記3つの文をひたすら出力する。

		TridentTopology topology = new TridentTopology(); 
		topology.newStream("sentenceSpout", spout)
		      .each(new Fields("sentence"), new SplitSentence(), new Fields("word"))				
                      .groupBy(new Fields("word"))
		      .persistentAggregate(new MemoryMapState.Factory(), new WordCounter(), new Fields("count"));
		
		return topology.build();
	}
	
	public static void main(String [] args){
		Config conf = new Config();
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("wordcount", conf, buildTopology());

		try {
			Thread.sleep(10000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		cluster.killTopology("wordcount");
	}
	
	public static class WordCounter implements CombinerAggregator<Long> {
	        @Override
	        public Long init(TridentTuple tuple) {
	            return 1L;
	        }
	        @Override
	        public Long combine(Long val1, Long val2) {
	            return val1 + val2;
	        }
    	
                @Override
    	         public Long zero() {
        	    return 0L;
    	        }
	}

	public static class SplitSentence extends BaseFunction implements Function {
		@Override
		public void execute(TridentTuple tuple, TridentCollector collector) {
			String sentence = (String) tuple.get(0);
			for(String word : sentence.split(" ")){
				collector.emit(new Values(word));
			}
		}
	}
}