Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
All Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

Map/Reduce API

Save for later
  • 10 min read
  • 02 Jun 2015

article-image

 In this article by Wagner Roberto dos Santos, author of the book Infinispan Data Grid Platform Definitive Guide, we will see the usage of Map/Reduce API and its introduction in Infinispan.

Using the Map/Reduce API

According to Gartner, from now on in-memory data grids and in-memory computing will be racing towards mainstream adoption and the market for this kind of technology is going to reach 1 billion by 2016. Thinking along these lines, Infinispan already provides a MapReduce API for distributed computing, which means that we can use Infinispan cache to process all the data stored in heap memory across all Infinispan instances in parallel.

If you're new to MapReduce, don't worry, we're going to describe it in the next section in a way that gets you up to speed quickly.

An introduction to Map/Reduce

MapReduce is a programming model introduced by Google, which allows for massive scalability across hundreds or thousands of servers in a data grid. It's a simple concept to understand for those who are familiar with distributed computing and clustered environments for data processing solutions.

You can find the paper about MapReduce in the following link:
http://research.google.com/archive/mapreduce.html

The MapReduce has two distinct computational phases; as the name states, the phases are map and reduce:

  • In the map phase, a function called Map is executed, which is designed to take a set of data in a given cache and simultaneously perform filtering, sorting operations, and outputs another set of data on all nodes.
  • In the reduce phase, a function called Reduce is executed, which is designed to reduce the final form of the results of the map phase in one output. The reduce function is always performed after the map phase.

Map/Reduce in the Infinispan platform

The Infinispan MapReduce model is an adaptation of the Google original MapReduce model. There are four main components in each map reduce task, they are as follows:

  • MapReduceTask: This is a distributed task allowing a large-scale computation to be transparently parallelized across Infinispan cluster nodes. This class provides a constructor that takes a cache whose data will be used as the input for this task. The MapReduceTask orchestrates the execution of the Mapper and Reducer seamlessly across Infinispan nodes.
  • Mapper: A Mapper is used to process each input cache entry K,V. A Mapper is invoked by MapReduceTask and is migrated to an Infinispan node, to transform the K,V input pair into intermediate keys before emitting them to a Collector.
  • Reducer: A Reducer is used to process a set of intermediate key results from the map phase. Each execution node will invoke one instance of Reducer and each instance of the Reducer only reduces intermediate keys results that are locally stored on the execution node.
  • Collator: This collates results from reducers executed on the Infinispan cluster and assembles a final result returned to an invoker of MapReduceTask.

The following image shows that in a distributed environment, an Infinispan MapReduceTask is responsible for starting the process for a given cache, unless you specify an onKeys(Object...) filter, all available key/value pairs of the cache will be used as input data for the map reduce task:

 mapreduce-api-img-0

In the preceding image, the Map/Reduce processes are performing the following steps:

  1. The MapReduceTask in the Master Task Node will start the Map Phase by hashing the task input keys and grouping them by the execution node they belong to and then, the Infinispan master node will send a map function and input keys to each node. In each destination, the map will be locally loaded with the corresponding value using the given keys.
  2. The map function is executed on each node, resulting in a map< KOut, VOut > object on each node.
  3. The Combine Phase is initiated when all results are collected, if a combiner is specified (via combineWith(Reducer<KOut, VOut> combiner) method), the combiner will extract the KOut keys and invoke the reduce phase on keys.
  4. Before starting the Reduce Phase, Infinispan will execute an intermediate migration phase, where all intermediate keys and values are grouped.
  5. At the end of the Combine Phase, a list of KOut keys are returned to the initial Master Task Node. At this stage, values (VOut) are not returned, because they are not needed in the master node.
  6. At this point, Infinispan is ready to start the Reduce Phase; the Master Task Node will group KOut keys by the execution node and send a reduce command to each node where keys are hashed.
  7. Unlock access to the largest independent learning library in Tech for FREE!
    Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
    Renews at AU $19.99/month. Cancel anytime
  8. The reducer is invoked and for each KOut key, the reducer will grab a list of VOut values from a temporary cache belonging to MapReduceTask, wraps it with an iterator, and invokes the reduce method on it.
  9. Each reducer will return one map with the KOut/VOut result values. The reduce command will return to the Master Task Node, which in turn will combine all resulting maps into one single map and return it as a result of MapReduceTask.

Sample application – find a destination

Now that we have seen what map and reduce are, and how the Infinispan model works, let's create a Find Destination application that illustrates the concepts we have discussed.

To demonstrate how CDI works, in the last section, we created a web service that provides weather information. Now, based on this same weather information service, let's create a map/reduce engine for the best destination based on simple business rules, such as destination type (sun destination, golf, skiing, and so on).

So, the first step is to create the WeatherInfo cache object that will hold information about the weather:

public class WeatherInfo implements Serializable {

  private static final long serialVersionUID =     -3479816816724167384L;

  private String country;
  private String city;
  private Date day;
  private Double temp;
  private Double tempMax;
  private Double tempMin;

  public WeatherInfo(String country, String city, Date day,     Double temp) {
    this(country, city, day, temp, temp + 5, temp - 5);
  }

  public WeatherInfo(String country, String city, Date day,     Double temp,
    Double tempMax, Double tempMin) {
    super();
    this.country = country;
    this.city = city;
    this.day = day;
    this.temperature = temp;
    this.temperatureMax = tempMax;
    this.temperatureMin = tempMin;
  }

// Getters and Setters ommitted

  @Override
  public String toString() {
    return "{WeatherInfo:{ country:" + country + ", city:" +      
city + ", day:" + day + ", temperature:" + temperature + ",      
temperatureMax:" + temperatureMax + ", temperatureMin:" +          
temperatureMin + "}";
  }
}

Now, let's create an enum object to define the type of destination a user can select and the rules associated with each destination. To keep it simple, we are going to have only two destinations, sun and skiing.

The temperature value will be used to evaluate if the destination can be considered the corresponding type:

public enum DestinationTypeEnum {
SUN(18d, "Sun Destination"), SKIING(-5d, "Skiing Destination");
private Double temperature;
private String description;
DestinationTypeEnum(Double temperature, String description) {
this.temperature = temperature;
this.description = description;
}
public Double getTemperature() {
return temperature;
}
public String getDescription() {
return description;
}

Now it's time to create the Mapper class—this class is going to be responsible for validating whether each cache entry fits the destination requirements. To define the DestinationMapper class, just extend the Mapper<KIn, VIn, KOut, VOut> interface and implement your algorithm in the map method;

public class DestinationMapper implements
Mapper<String, WeatherInfo, DestinationTypeEnum, WeatherInfo> {
private static final long serialVersionUID =
-3418976303227050166L;
public void map(String key, WeatherInfo weather,
Collector<DestinationTypeEnum, WeatherInfo> c) {
if (weather.getTemperature() >= SUN.getTemperature()){
c.emit(SUN, weather);
}
else if (weather.getTemperature() <=
SKIING.getTemperature()) {
c.emit(SKIING, weather);
}
}
}

The role of the Reducer class in our application is to return the best destination among all destinations, based on the highest temperature for sun destinations and the lowest temperature for skiing destinations, returned by the mapping phase. To implement the Reducer class, you'll need to implement the Reducer<KOut, VOut> interface:

public class DestinationReducer implements
Reducer<DestinationTypeEnum, WeatherInfo> {
private static final long serialVersionUID = 7711240429951976280L;
public WeatherInfo reduce(DestinationTypeEnum key,
Iterator<WeatherInfo> it) {
WeatherInfo bestPlace = null;
if (key.equals(SUN)) {
while (it.hasNext()) {
WeatherInfo w = it.next();
if (bestPlace == null || w.getTemp() >
bestPlace.getTemp()) {
bestPlace = w;
}
}
} else { /// Best for skiing
while (it.hasNext()) {
WeatherInfo w = it.next();
if (bestPlace == null || w.getTemp() <
bestPlace.getTemp()) {
bestPlace = w;
}
}
}
return bestPlace;
}
}

Finally, to execute our sample application, we can create a JUnit test case with the MapReduceTask. But first, we have to create a couple of cache entries before executing the task, which we are doing in the setUp() method:

public class WeatherInfoReduceTest {
private static final Log logger =
LogFactory.getLog(WeatherInfoReduceTest.class);
private Cache<String, WeatherInfo> weatherCache;
@Before
public void setUp() throws Exception {
Date today = new Date();
EmbeddedCacheManager manager = new DefaultCacheManager();
Configuration config = new ConfigurationBuilder()
.clustering().cacheMode(CacheMode.LOCAL)
.build();
manager.defineConfiguration("weatherCache", config);
weatherCache = manager.getCache("weatherCache");
WeatherInfo
weatherCache.put("1", new WeatherInfo("Germany", "Berlin",
today, 12d));
weatherCache.put("2", new WeatherInfo("Germany",
"Stuttgart", today, 11d));
weatherCache.put("3", new WeatherInfo("England", "London",
today, 8d));
weatherCache.put("4", new WeatherInfo("England",
"Manchester", today, 6d));
weatherCache.put("5", new WeatherInfo("Italy", "Rome",
today, 17d));
weatherCache.put("6", new WeatherInfo("Italy", "Napoli",
today, 18d));
weatherCache.put("7", new WeatherInfo("Ireland", "Belfast",
today, 9d));
weatherCache.put("8", new WeatherInfo("Ireland", "Dublin",
today, 7d));
weatherCache.put("9", new WeatherInfo("Spain", "Madrid",
today, 19d));
weatherCache.put("10", new WeatherInfo("Spain", "Barcelona",
today, 21d));
weatherCache.put("11", new WeatherInfo("France", "Paris",
today, 11d));
weatherCache.put("12", new WeatherInfo("France",
"Marseille", today, -8d));
weatherCache.put("13", new WeatherInfo("Netherlands",
"Amsterdam", today, 11d));
weatherCache.put("14", new WeatherInfo("Portugal", "Lisbon",
today, 13d));
weatherCache.put("15", new WeatherInfo("Switzerland",
"Zurich", today, -12d));
}
@Test
public void execute() {
MapReduceTask<String, WeatherInfo, DestinationTypeEnum,
WeatherInfo> task = new MapReduceTask<String, WeatherInfo,
DestinationTypeEnum, WeatherInfo>(weatherCache);
task.mappedWith(new DestinationMapper()).reducedWith(new
DestinationReducer());
Map<DestinationTypeEnum, WeatherInfo> destination =
task.execute();
assertNotNull(destination);
assertEquals(destination.keySet().size(), 2);
logger.info("********** PRINTING RESULTS FOR WEATHER CACHE
*************");
for (DestinationTypeEnum destinationType :
destination.keySet()){
logger.infof("%s - Best Place: %s
n",destinationType.getDescription(),
destination.get(destinationType));
}
}
}

When we execute the application, you should expect to see the following output:

INFO: Skiing Destination
Best Place: {WeatherInfo:{ country:Switzerland, city:Zurich,
day:Mon Jun 02 19:42:22 IST 2014, temp:-12.0, tempMax:-7.0,
tempMin:-17.0}
INFO: Sun Destination
Best Place: {WeatherInfo:{ country:Spain, city:Barcelona, day:Mon
Jun 02 19:42:22 IST 2014, temp:21.0, tempMax:26.0, tempMin:16.0}

Summary

In this article, you learned how to work with applications in modern distributed server architecture, using the Map Reduce API, and how it can abstract parallel programming into two simple primitives, the map and reduce methods. We have seen a sample use case Find Destination that demonstrated how use map reduce almost in real time.

Resources for Article:


Further resources on this subject: