暂无说说

topology 开发

storm jiajun 11个月前 (11-20) 130次浏览 0个评论 扫描二维码

本文介绍如何创建一个简单的 Storm 项目,只有一个 spout 和一个 bolt,如下图所示:

1、新建 maven 项目,并引进依赖。

<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-core</artifactId>
		<version>0.9.1-incubating</version>
		<scope>provided</scope>
</dependency>

2、编写 spout,这里继承了 BaseRichSpout(实现了 IRichSpout),整个过程:打开 spout——发送 tuple——声明 spout 发出的字段,给下游 bolt 调用

package com.mathslib.com.HelloWorld;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class HelloWorldSpout extends BaseRichSpout {
	private static final long serialVersionUID = 1L;
	private SpoutOutputCollector collector;
	private final static Map<Integer, String> map = new HashMap<Integer, String>();
	static {
		map.put(0, "google");
		map.put(1, "facebook");
		map.put(2, "twiter");
		map.put(3, "youtube");
		map.put(4, "linkedin");
	}

	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		// 初始化 SpoutOutputCollector,打开 spout
		this.collector = collector;
	}

	@Override
	public void nextTuple() {
		Random rand = new Random();
		int index = rand.nextInt(5);
		// 发送 tuple
		collector.emit(new Values(map.get(index)));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// 定义 spout 输出的字段,供下游使用。可以有多个
		declarer.declare(new Fields("site"));
	}
}

3、编写 Bolt,这里继承 BaseBasicBolt。直接根据字段名获取上游 spout 发送过来的 tuple,并打印其值。

package com.mathslib.com.HelloWorld;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class HelloWorldBolt extends BaseBasicBolt {
	private static final long serialVersionUID = 1L;

	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {
		// 获取 tuple 值,然后直接打印,collector 不对下游发送 tuple
		input.getStringByField("site");
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// 不对下游发送 tuple,所以无需定义 spout 发送的字段名
	}

}

4、组件编写完成,定义 app,利用 TopologyBuilder 对象组装并创建 Topology,然后提交给创建的集群

package com.mathslib.com.HelloWorld;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;

public class HelloWorldTopology {
	public static void main(String[] args) {
		// 初始化 TopologyBuilder
		TopologyBuilder builder = new TopologyBuilder();
		// 设置 spout,第三个参数表示 task 数量
		builder.setSpout("HelloWorldSpout", new HelloWorldSpout(), 4);
		// 设置 bolt,利用 shuffleGrouping 方式在各个 task 之间统一的、随机分配元组
		builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 2).shuffleGrouping("HelloWorldSpout");
		// 使用设置好 spout 跟 bolt 的 TopologyBuilder 对象来创建 Topology
		StormTopology topology = builder.createTopology();

		// 设置 storm 运行时参数
		Config config = new Config();
		config.setNumWorkers(3);

		try {
			StormSubmitter.submitTopology(args[0], config, topology);
		} catch (Exception e) {
			System.out.println("Thread interrupted exception :" + e);
		}
	}
}

打成 jar 包,注意把 HelloWorldSpout、HelloWorldBolt、HelloWorldTopology 都选上,main class 选择 HelloWorldTopology

喜欢 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址