





















































In this article by Wagner Roberto dos Santos, author of the book Infinispan Data Grid Platform Definitive Guide, we will see the usage of Map/Reduce API and its introduction in Infinispan.
According to Gartner, from now on in-memory data grids and in-memory computing will be racing towards mainstream adoption and the market for this kind of technology is going to reach 1 billion by 2016. Thinking along these lines, Infinispan already provides a MapReduce API for distributed computing, which means that we can use Infinispan cache to process all the data stored in heap memory across all Infinispan instances in parallel.
If you're new to MapReduce, don't worry, we're going to describe it in the next section in a way that gets you up to speed quickly.
MapReduce is a programming model introduced by Google, which allows for massive scalability across hundreds or thousands of servers in a data grid. It's a simple concept to understand for those who are familiar with distributed computing and clustered environments for data processing solutions.
You can find the paper about MapReduce in the following link:
http://research.google.com/archive/mapreduce.html
The MapReduce has two distinct computational phases; as the name states, the phases are map and reduce:
The Infinispan MapReduce model is an adaptation of the Google original MapReduce model. There are four main components in each map reduce task, they are as follows:
The following image shows that in a distributed environment, an Infinispan MapReduceTask is responsible for starting the process for a given cache, unless you specify an onKeys(Object...) filter, all available key/value pairs of the cache will be used as input data for the map reduce task:
In the preceding image, the Map/Reduce processes are performing the following steps:
Now that we have seen what map and reduce are, and how the Infinispan model works, let's create a Find Destination application that illustrates the concepts we have discussed.
To demonstrate how CDI works, in the last section, we created a web service that provides weather information. Now, based on this same weather information service, let's create a map/reduce engine for the best destination based on simple business rules, such as destination type (sun destination, golf, skiing, and so on).
So, the first step is to create the WeatherInfo cache object that will hold information about the weather:
public class WeatherInfo implements Serializable {
private static final long serialVersionUID = -3479816816724167384L;
private String country;
private String city;
private Date day;
private Double temp;
private Double tempMax;
private Double tempMin;
public WeatherInfo(String country, String city, Date day, Double temp) {
this(country, city, day, temp, temp + 5, temp - 5);
}
public WeatherInfo(String country, String city, Date day, Double temp,
Double tempMax, Double tempMin) {
super();
this.country = country;
this.city = city;
this.day = day;
this.temperature = temp;
this.temperatureMax = tempMax;
this.temperatureMin = tempMin;
}
// Getters and Setters ommitted
@Override
public String toString() {
return "{WeatherInfo:{ country:" + country + ", city:" +
city + ", day:" + day + ", temperature:" + temperature + ",
temperatureMax:" + temperatureMax + ", temperatureMin:" +
temperatureMin + "}";
}
}
Now, let's create an enum object to define the type of destination a user can select and the rules associated with each destination. To keep it simple, we are going to have only two destinations, sun and skiing.
The temperature value will be used to evaluate if the destination can be considered the corresponding type:
public enum DestinationTypeEnum {
SUN(18d, "Sun Destination"), SKIING(-5d, "Skiing Destination");
private Double temperature;
private String description;
DestinationTypeEnum(Double temperature, String description) {
this.temperature = temperature;
this.description = description;
}
public Double getTemperature() {
return temperature;
}
public String getDescription() {
return description;
}
Now it's time to create the Mapper class—this class is going to be responsible for validating whether each cache entry fits the destination requirements. To define the DestinationMapper class, just extend the Mapper<KIn, VIn, KOut, VOut> interface and implement your algorithm in the map method;
public class DestinationMapper implements
Mapper<String, WeatherInfo, DestinationTypeEnum, WeatherInfo> {
private static final long serialVersionUID =
-3418976303227050166L;
public void map(String key, WeatherInfo weather,
Collector<DestinationTypeEnum, WeatherInfo> c) {
if (weather.getTemperature() >= SUN.getTemperature()){
c.emit(SUN, weather);
}
else if (weather.getTemperature() <=
SKIING.getTemperature()) {
c.emit(SKIING, weather);
}
}
}
The role of the Reducer class in our application is to return the best destination among all destinations, based on the highest temperature for sun destinations and the lowest temperature for skiing destinations, returned by the mapping phase. To implement the Reducer class, you'll need to implement the Reducer<KOut, VOut> interface:
public class DestinationReducer implements
Reducer<DestinationTypeEnum, WeatherInfo> {
private static final long serialVersionUID = 7711240429951976280L;
public WeatherInfo reduce(DestinationTypeEnum key,
Iterator<WeatherInfo> it) {
WeatherInfo bestPlace = null;
if (key.equals(SUN)) {
while (it.hasNext()) {
WeatherInfo w = it.next();
if (bestPlace == null || w.getTemp() >
bestPlace.getTemp()) {
bestPlace = w;
}
}
} else { /// Best for skiing
while (it.hasNext()) {
WeatherInfo w = it.next();
if (bestPlace == null || w.getTemp() <
bestPlace.getTemp()) {
bestPlace = w;
}
}
}
return bestPlace;
}
}
Finally, to execute our sample application, we can create a JUnit test case with the MapReduceTask. But first, we have to create a couple of cache entries before executing the task, which we are doing in the setUp() method:
public class WeatherInfoReduceTest {
private static final Log logger =
LogFactory.getLog(WeatherInfoReduceTest.class);
private Cache<String, WeatherInfo> weatherCache;
@Before
public void setUp() throws Exception {
Date today = new Date();
EmbeddedCacheManager manager = new DefaultCacheManager();
Configuration config = new ConfigurationBuilder()
.clustering().cacheMode(CacheMode.LOCAL)
.build();
manager.defineConfiguration("weatherCache", config);
weatherCache = manager.getCache("weatherCache");
WeatherInfo
weatherCache.put("1", new WeatherInfo("Germany", "Berlin",
today, 12d));
weatherCache.put("2", new WeatherInfo("Germany",
"Stuttgart", today, 11d));
weatherCache.put("3", new WeatherInfo("England", "London",
today, 8d));
weatherCache.put("4", new WeatherInfo("England",
"Manchester", today, 6d));
weatherCache.put("5", new WeatherInfo("Italy", "Rome",
today, 17d));
weatherCache.put("6", new WeatherInfo("Italy", "Napoli",
today, 18d));
weatherCache.put("7", new WeatherInfo("Ireland", "Belfast",
today, 9d));
weatherCache.put("8", new WeatherInfo("Ireland", "Dublin",
today, 7d));
weatherCache.put("9", new WeatherInfo("Spain", "Madrid",
today, 19d));
weatherCache.put("10", new WeatherInfo("Spain", "Barcelona",
today, 21d));
weatherCache.put("11", new WeatherInfo("France", "Paris",
today, 11d));
weatherCache.put("12", new WeatherInfo("France",
"Marseille", today, -8d));
weatherCache.put("13", new WeatherInfo("Netherlands",
"Amsterdam", today, 11d));
weatherCache.put("14", new WeatherInfo("Portugal", "Lisbon",
today, 13d));
weatherCache.put("15", new WeatherInfo("Switzerland",
"Zurich", today, -12d));
}
@Test
public void execute() {
MapReduceTask<String, WeatherInfo, DestinationTypeEnum,
WeatherInfo> task = new MapReduceTask<String, WeatherInfo,
DestinationTypeEnum, WeatherInfo>(weatherCache);
task.mappedWith(new DestinationMapper()).reducedWith(new
DestinationReducer());
Map<DestinationTypeEnum, WeatherInfo> destination =
task.execute();
assertNotNull(destination);
assertEquals(destination.keySet().size(), 2);
logger.info("********** PRINTING RESULTS FOR WEATHER CACHE
*************");
for (DestinationTypeEnum destinationType :
destination.keySet()){
logger.infof("%s - Best Place: %s
n",destinationType.getDescription(),
destination.get(destinationType));
}
}
}
When we execute the application, you should expect to see the following output:
INFO: Skiing Destination
Best Place: {WeatherInfo:{ country:Switzerland, city:Zurich,
day:Mon Jun 02 19:42:22 IST 2014, temp:-12.0, tempMax:-7.0,
tempMin:-17.0}
INFO: Sun Destination
Best Place: {WeatherInfo:{ country:Spain, city:Barcelona, day:Mon
Jun 02 19:42:22 IST 2014, temp:21.0, tempMax:26.0, tempMin:16.0}
In this article, you learned how to work with applications in modern distributed server architecture, using the Map Reduce API, and how it can abstract parallel programming into two simple primitives, the map and reduce methods. We have seen a sample use case Find Destination that demonstrated how use map reduce almost in real time.
Further resources on this subject: