Storm chap03

拓扑

在本章中,你将看到如何Storm拓扑的不同组件之间传递tuples,以及如何将拓扑部署到运行中的Storm集群中。

Stream Grouping

设计拓扑时最重要的事情之一就是定义数据在不同组件之间是如何交换的(即bolts是如何处理数据流的)。Stream Grouping指明了每个bolt如何处理数据流,以及数据流如何被处理。

一个节点可以发射不止一个数据流。Stream Grouping指明哪些数据流可以被接收。

Stream grouping在拓扑定义的时候指定,正如第二章中所示。

 ...
      builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
 ...

在上述代码块中,拓扑builder指定了一个bolt,然后使用shuffle stream grouping设置了数据源。一般来说,stream grouping接收一个组件的id作为参数,取决于不同的stream grouping类型,也许还有其他的参数。

每个InputDeclarer可能对应多个数据流,每个数据流可以指定不同的stream grouping。

Shuffle Grouping

Shuffle Grouping是使用最广泛的一种分组。它接收一个参数(源组件),将数据源发射的每个tuple发送到一个随机选取的bolt,同时保证每个bolt接收到的tuples数目一致。

Shuffle Grouping适用于原子操作,例如数学计算。然而,如果操作不允许随机分布,例如在第二章数单词的例子中,应该考虑使用其他的分组。

Fields Grouping

Fields Grouping 允许根据tuple的一个或多个域控制tuple发送到符合的bolts。它保证指定域的组合的tuple发送到相同的bolt。回到数单词的例子,如果根据word域来分组数据流,那么word-normalizer bolt总是发送指定单词的tuple到相同的word-counter实例。

 ...
 builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));
 ...

field grouping 中域集合必须在源中声明

All Grouping

All Grouping 会发送tuple的副本到每一个接收bolt的实例。这种分组通常用来发送signal。例如,如果需要刷新缓存,可以发送一个refresh cache signal到所有的bolts。在数单词的例子中,可以使用该分组增加一个清空counter缓存的功能(见示例)。

 public void execute(Tuple input) {
      String str = null;
      try{
           if(input.getSourceStreamId().equals("signals")){
                str = input.getStringByField("action");
                if("refreshCache".equals(str))
                     counters.clear();
           }
      }catch (IllegalArgumentException e) {
 //Do nothing
      }
      ...
 }

我们增加了一个if判断来检查数据流来源。Storm允许声明命名的数据流(如果发送的tuple来自未命名的数据流,那个数据流就是"default");这是确定tuples来源的一种很好的方式,就像这里我们想确定数据流来源是signals

在拓扑定义中,在worl-counter中增加第二个数据流,它会将从signals-spout数据流中产生的tuple发送到所有的bolt实例。

 builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word")).allGrouping("signals-spout","signals");

signal-spout 的实现见git 仓库

Custom Grouping

通过实现接口backtype.storm.grouping.CustomStreamGrouping可以创建自定义的分组。这是的我们可以决定每个tuple会由那个(些)bolts处理。

对数单词的例子做些修改,使得开始字母相同的所有单词由同一个bolt接收。

 public class ModuleGrouping implements CustomStreamGrouping, Serializable{
      int numTasks = 0;
      @Override
      public List<Integer> chooseTasks(List<Object> values) {
           List<Integer> boltIds = new ArrayList();
           if(values.size()>0){
                String str = values.get(0).toString();
                if(str.isEmpty())
                     boltIds.add(0);
                else
                     boltIds.add(str.charAt(0) % numTasks);
           }
           return boltIds;
      }
      @Override
      public void prepare(TopologyContext context, Fields outFields,
           List<Integer> targetTasks) {
           numTasks = targetTasks.size();
      }
 }

这里实现了一个简单的CustomStreamGrouping,其中使用任务数对单词的第一个字母相应的数值取余,根据余数来选择哪个bolt会接收当前的tuple。

word-normalizer修改如下即可使用上面的分组。

 builder.setBolt("word-normalizer", new WordNormalizer())
 .customGrouping("word-reader", new ModuleGrouping());

Direct Grouping

这是一种特殊的分组,其中数据源负责决定tuple由哪个bolt接收。跟上一个例子类似,数据源根据单词的第一个字符决定哪个bolt接收tuple。要使用直接分组,在WordNormalizerbolt中,用emitDirect代替emit

 public void execute(Tuple input) {
      ...
      for(String word : words){
           if(!word.isEmpty()){
                ...
                collector.emitDirect(getWordCountIndex(word),new Values(word));
           }
      }
 // Acknowledge the tuple
      collector.ack(input);
 }
 public Integer getWordCountIndex(String word) {
      word = word.trim().toUpperCase();
      if(word.isEmpty())
           return 0;
      else
           return word.charAt(0) % numCounterTasks;
 }

prepare方法中指明目标任务数:

 public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
      this.numCounterTasks = context.getComponentTasks("word-counter");
 }

然后再拓扑定义中指定直接分组

 builder.setBolt("word-counter", new WordCounter(),2)
 .directGrouping("word-normalizer");

Global Grouping

Global Grouping 发送所有源实例产生的tuple到单个目标实例中(具体说,就是id最小的那个task)。

None Grouping

截至本文写作时(Storm 版本 0.7.1),使用该分组跟使用Shuffle Grouping的效果一样。换句话说,使用该分组时,不关系数据流如何分组。

LocalCluster versus StormSubmitter

到目前为止,我们已经使用了LocalCluster这个工具在本地运行拓扑。在自己的电脑上运行Storm可以方便的调试不同的拓扑。但是当你想把拓扑提交到运行中的Storm集群该如何做呢?Storm提供了一个有趣的特性用于将拓扑发送到真正的集群中。将LocalCluster改成StormSubmitter,并实现submitTopology方法,该方法用于发送拓扑到集群中。

修改如下:

 //LocalCluster cluster = new LocalCluster();
 //cluster.submitTopology("Count-Word-Topology-With-Refresh-Cache", conf,
 builder.createTopology());
 StormSubmitter.submitTopology("Count-Word-Topology-With-Refresh-Cache", conf,
 builder.createTopology());
 //Thread.sleep(1000);
 //cluster.shutdown();

当使用StormSubmitter的时候,不能像在LocalCluster中那样设置集群的参数。

接下来,把源码打包成jar,jar文件会在运行storm客户端命令提交拓扑的时候发送。由于使用的是maven,只需到源码所在的目录运行下面的命令:

 mvn package

jar文件生成后,使用storm jar命令提交拓扑(附录A中给出了如何安装Storm客户端)。语法如下storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3

在该例子中,在拓扑源工程目录中运行:

 storm jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain src/main/resources/words.txt

运行该命令就将拓扑提交到了集群中。

stop/kill拓扑,运行:

 storm kill Count-Word-Topology-With-Refresh-Cache

拓扑名必须是唯一的 安装Storm客户端,见附录A

DRPC Topologies

有一种的特殊拓扑称为Distributed Remote Procedure Call(DRPC),其使用Storm的分布式特性执行Remote Procedure Calls(RPC)(见图 3-1)。Storm提供了一些使用DRPC的工具。首先,DRPC服务器是一个位于客户端和拓扑之间连接器,作为一个拓扑的spouts一直运行。它一个要执行的函数及参数。对于该函数要处理的每一个数据,服务器会分配一个请求id,用于在拓扑中标识RPC请求。当拓扑执完最后一个bolt,它必须发射RPC请求id和结果,这样DRPC服务器就可以将结果返回给相应的客户端。

fig_3_1

一个DRPC服务器可以执行很多函数。每个函数由一个唯一的名字标识。

其次,Storm提供了LinearDRPCTopologyBuilder,一个用来构建DRPC拓扑的抽象。生成的拓扑创建DRPCSpouts——连接DRPC服务器并发射数据到拓扑的其他部分——并且将bolts封装,这样结果会从最后一个bolt返回。所有添加到LinearDRPCTopologyBuilder的bolts会顺序执行。

作为该种类型拓扑的一个实例,不妨创建一个加法的过程。这是一个简单的例子,但是概念可以扩展到复杂的分布式数值计算中去。

bolt的输出declarer如下:

 public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("id","result"));
 }

作为拓扑中唯一的bolt,必须发射RPC ID以及结果。

execute方法负责执行加法操作:

 public void execute(Tuple input) {
      String[] numbers = input.getString(1).split("\\+");
      Integer added = 0;
      if(numbers.length<2){
           throw new InvalidParameterException("Should be at least 2 numbers");
      }
      for(String num : numbers){
           added += Integer.parseInt(num);
      }
      collector.emit(new Values(input.getValue(0),added));
 }

包含拓扑bolt的拓扑定义如下:

 public static void main(String[] args) {
      LocalDRPC drpc = new LocalDRPC();
      LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add");
      builder.addBolt(new AdderBolt(),2);
      Config conf = new Config();
      conf.setDebug(true);
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("drpc-adder-topology", conf,
           builder.createLocalTopology(drpc));
      String result = drpc.execute("add", "1+-1");
      checkResult(result,0);
      result = drpc.execute("add", "1+1+5+10");
      checkResult(result,17);
      cluster.shutdown();
      drpc.shutdown();
 }

这里创建了一个在本地运行DRPC服务器的LocalDRPC。接下来,创建一个拓扑builder并将bolt添加到拓扑中。使用DRPC对象的execute方法测试拓扑。

要连接到远程DRPC服务器,使用类DRPCClient。DRPC服务器暴露了一个Thrift API,这可以在许多语言中使用,在本地或者远程运行DRPC服务器时使用的是同一个API。

要提交拓扑到Storm集群中,使用builder的createRemoteTopology方法而非createLocalTopology方法,它会使用来自Storm配置中的DRPC配置。

留下评论