Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
All Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

Stream Grouping

Save for later
  • 420 min read
  • 2014-08-26 00:00:00

article-image

In this article, by Ankit Jain and Anand Nalya, the authors of the book Learning Storm, we will cover different types of stream groupings.

(For more resources related to this topic, see here.)

When defining a topology, we create a graph of computation with a number of bolt-processing streams. At a more granular level, each bolt executes as multiple tasks in the topology. A stream will be partitioned into a number of partitions and divided among the bolts' tasks. Thus, each task of a particular bolt will only get a subset of the tuples from the subscribed streams.

Stream grouping in Storm provides complete control over how this partitioning of tuples happens among many tasks of a bolt subscribed to a stream. Grouping for a bolt can be defined on the instance of the backtype.storm.topology.InputDeclarer class returned when defining bolts using the backtype.storm.topology.TopologyBuilder.setBolt method.

Storm supports the following types of stream groupings:

  • Shuffle grouping
  • Fields grouping
  • All grouping
  • Global grouping
  • Direct grouping
  • Local or shuffle grouping
  • Custom grouping

Now, we will look at each of these groupings in detail.

Shuffle grouping

Shuffle grouping distributes tuples in a uniform, random way across the tasks. An equal number of tuples will be processed by each task. This grouping is ideal when you want to distribute your processing load uniformly across the tasks and where there is no requirement of any data-driven partitioning.

Fields grouping

Fields grouping enables you to partition a stream on the basis of some of the fields in the tuples. For example, if you want that all the tweets from a particular user should go to a single task, then you can partition the tweet stream using fields grouping on the username field in the following manner:

builder.setSpout("1", new TweetSpout());
builder.setBolt("2", new TweetCounter()).fieldsGrouping("1", new Fields("username"))

Fields grouping is calculated with the following function:

hash (fields) % (no. of tasks)

Here, hash is a hashing function. It does not guarantee that each task will get tuples to process. For example, if you have applied fields grouping on a field, say X, with only two possible values, A and B, and created two tasks for the bolt, then it might be possible that both hash (A) % 2 and hash (B) % 2 are equal, which will result in all the tuples being routed to a single task and other tasks being completely idle.

Another common usage of fields grouping is to join streams. Since partitioning happens solely on the basis of field values and not the stream type, we can join two streams with any common join fields. The name of the fields do not need to be the same. For example, in order to process domains, we can join the Order and ItemScanned streams when an order is completed:

builder.setSpout("1", new OrderSpout());
builder.setSpout("2", new ItemScannedSpout());
builder.setBolt("joiner", new OrderJoiner())
.fieldsGrouping("1", new Fields("orderId"))
.fieldsGrouping("2", new Fields("orderRefId"));

All grouping

All grouping is a special grouping that does not partition the tuples but replicates them to all the tasks, that is, each tuple will be sent to each of the bolt's tasks for processing.

One common use case of all grouping is for sending signals to bolts. For example, if you are doing some kind of filtering on the streams, then you have to pass the filter parameters to all the bolts. This can be achieved by sending those parameters over a stream that is subscribed by all bolts' tasks with all grouping. Another example is to send a reset message to all the tasks in an aggregation bolt.

The following is an example of all grouping:

builder.setSpout("1", new TweetSpout());
builder.setSpout("signals", new SignalSpout());
builder.setBolt("2", new TweetCounter()).fieldsGrouping("1", 
new Fields("username")).allGrouping("signals");

Here, we are subscribing signals for all the TweetCounter bolt's tasks. Now, we can send different signals to the TweetCounter bolt using SignalSpout.

Global grouping

Global grouping does not partition the stream but sends the complete stream to the bolt's task with the smallest ID. A general use case of this is when there needs to be a reduce phase in your topology where you want to combine results from previous steps in the topology in a single bolt.

Global grouping might seem redundant at first, as you can achieve the same results with defining the parallelism for the bolt as one and setting the number of input streams to one. Though, when you have multiple streams of data coming through different paths, you might want only one of the streams to be reduced and others to be processed in parallel.

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at $15.99/month. Cancel anytime

For example, consider the following topology. In this topology, you might want to route all the tuples coming from Bolt C to a single Bolt D task, while you might still want parallelism for tuples coming from Bolt E to Bolt D.

stream-grouping-img-0

Global grouping

This can be achieved with the following code snippet:

builder.setSpout("a", new SpoutA());
builder.setSpout("b", new SpoutB());
builder.setBolt("c", new BoltC());
builder.setBolt("e", new BoltE());
builder.setBolt("d", new BoltD())
.globalGrouping("c")
.shuffleGrouping("e");

Direct grouping

In direct grouping, the emitter decides where each tuple will go for processing. For example, say we have a log stream and we want to process each log entry using a specific bolt task on the basis of the type of resource. In this case, we can use direct grouping.

Direct grouping can only be used with direct streams. To declare a stream as a direct stream, use the backtype.storm.topology.OutputFieldsDeclarer.declareStream method that takes a Boolean parameter directly in the following way in your spout:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declareStream("directStream", true, new Fields("field1"));
}

Now, we need the number of tasks for the component so that we can specify the taskId parameter while emitting the tuple. This can be done using the backtype.storm.task.TopologyContext.getComponentTasks method in the prepare method of the bolt. The following snippet stores the number of tasks in a bolt field:

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  this.numOfTasks = context.getComponentTasks("my-stream");
  this.collector = collector;
}

Once you have a direct stream to emit to, use the backtype.storm.task.OutputCollector.emitDirect method instead of the emit method to emit it. The emitDirect method takes a taskId parameter to specify the task. In the following snippet, we are emitting to one of the tasks randomly:

public void execute(Tuple input) {
  collector.emitDirect(new Random().nextInt(this.numOfTasks), process(input));
}

Local or shuffle grouping

If the tuple source and target bolt tasks are running in the same worker, using this grouping will act as a shuffle grouping only between the target tasks running on the same worker, thus minimizing any network hops resulting in increased performance.

In case there are no target bolt tasks running on the source worker process, this grouping will act similar to the shuffle grouping mentioned earlier.

Custom grouping

If none of the preceding groupings fit your use case, you can define your own custom grouping by implementing the backtype.storm.grouping.CustomStreamGrouping interface.

The following is a sample custom grouping that partitions a stream on the basis of the category in the tuples:

public class CategoryGrouping implements CustomStreamGrouping, Serializable {
  // Mapping of category to integer values for grouping
  private static final Map<String, Integer> categories = ImmutableMap.of
  (
    "Financial", 0, 
    "Medical", 1, 
    "FMCG", 2, 
    "Electronics", 3
  );

  // number of tasks, this is initialized in prepare method
  private int tasks = 0;

  public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) 
  {
    // initialize the number of tasks
    tasks = targetTasks.size();
  }

  public List<Integer> chooseTasks(int taskId, List<Object> values) {
    // return the taskId for a given category
    String category = (String) values.get(0);
    return ImmutableList.of(categories.get(category) % tasks);
  }
}

Now, we can use this grouping in our topologies with the following code snippet:

builder.setSpout("a", new SpoutA());
builder.setBolt("b", (IRichBolt)new BoltB())
.customGrouping("a", new CategoryGrouping());

The following diagram represents the Storm groupings graphically:

stream-grouping-img-1

Summary

In this article, we discussed stream grouping in Storm and its types.

Resources for Article:


Further resources on this subject: