我們最常用的或許就是Storm從Kafka中讀取資料轉換成Tuple了,現在我們就將Storm與Kafka來進行整合。
1.pom.xml
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency>
2.程式碼:
2.1Bolt:
package com.storm.kafka; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import java.util.Map; /** * @author 鄒培賢 * @Title: ${file_name} * @Package ${package_name} * @Description: ${todo} * @date 2018/7/2915:27 */ public class SplitBolt implements IRichBolt { private TopologyContext context; private OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.context=topologyContext; this.collector=outputCollector; } @Override public void execute(Tuple tuple) { String line =tuple.getString(0); System.out.println(line); } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
2.2APP:
package com.storm.kafka; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.kafka.*; import org.apache.storm.spout.SchemeAsMultiScheme; import org.apache.storm.topology.TopologyBuilder; import java.util.UUID; /** * @author 鄒培賢 * @Title: ${file_name} * @Package ${package_name} * @Description: ${todo} * @date 2018/7/2915:30 */ public class APP { public static void main(String args[]){ TopologyBuilder builder=new TopologyBuilder(); String zkConnString="s10:2181,s11:2181,s12:2181"; BrokerHosts hosts = new ZkHosts(zkConnString); //Spout配置 SpoutConfig spoutConfig=new SpoutConfig(hosts,"test","/test",UUID.randomUUID().toString()); spoutConfig.scheme=new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout =new KafkaSpout(spoutConfig); builder.setSpout("kafkaspout",kafkaSpout).setNumTasks(2); builder.setBolt("split-bolt",new SplitBolt(),2).shuffleGrouping("kafkaspout").setNumTasks(2); Config conf = new Config(); conf.setNumWorkers(2); conf.setDebug(true); //本地模式 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wc", conf, builder.createTopology()); } }
3.執行結果:
写评论
很抱歉,必須登入網站才能發佈留言。