





















































(For more resources related to this topic, see here.)
This recipe will walk you through creating a MapReduce program to count distinct IPs in weblog data. We will demonstrate the application of a combiner to optimize data transfer overhead between the map and reduce stages. The code is implemented in a generic fashion and can be used to count distinct values in any tab-delimited dataset.
This recipe assumes that you have a basic familiarity with the Hadoop 0.20 MapReduce API. You will need access to the weblog_entries dataset supplied with this book and stored in an HDFS folder at the path /input/weblog.
You will need access to a pseudo-distributed or fully-distributed cluster capable of running MapReduce jobs using the newer MapReduce API introduced in Hadoop 0.20.
You will also need to package this code inside a JAR file to be executed by the Hadoop JAR launcher from the shell. Only the core Hadoop libraries are required to compile and run this example.
Perform the following steps to count distinct IPs using MapReduce:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.regex.Pattern;
public class DistinctCounterJob implements Tool {
private Configuration conf;
public static final String NAME = "distinct_counter";
public static final String COL_POS = "col_pos";
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new
DistinctCounterJob(), args);
}
public int run(String[] args) throws Exception {
if(args.length != 3) {
System.err.println("Usage: distinct_counter <input>
<output> <element_position>");
System.exit(1);
}
conf.setInt(COL_POS, Integer.parseInt(args[2]));
Job job = new Job(conf, "Count distinct elements at
position");
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(DistinctMapper.class);
job.setReducerClass(DistinctReducer.class);
job.setCombinerClass(DistinctReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setJarByClass(DistinctCounterJob.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 1 : 0;
}
public void setConf(Configuration conf) {
this.conf = conf;
}
public Configuration getConf() {
return conf;
}
}
public static class DistinctMapper
extends Mapper<LongWritable, Text, Text, IntWritable>
{
private static int col_pos;
private static final Pattern pattern = Pattern.
compile("t");
private Text outKey = new Text();
private static final IntWritable outValue = new
IntWritable(1);
@Override
protected void setup(Context context
) throws IOException, InterruptedException {
col_pos = context.getConfiguration().
getInt(DistinctCounterJob.COL_POS, 0);
}
@Override
protected void map(LongWritable key, Text value,
Context context) throws IOException,
InterruptedException {
String field = pattern.split(value.toString())[col_
pos];
outKey.set(field);
context.write(outKey, outValue);
}
}
public static class DistinctReducer
extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable count = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable>
values, Context context
) throws IOException, InterruptedException {
int total = 0;
for(IntWritable value: values) {
total += value.get();
}
count.set(total);
context.write(key, count);
}
}
hadoop jar myJobs.jar distinct_counter /input/weblog/ /output/
weblog_distinct_counter 4
First we set up DistinctCounterJob to implement a Tool interface for remote submission. The static constant NAME is of potential use in the Hadoop Driver class, which supports the launching of different jobs from the same JAR file. The static constant COL_POS is initialized to the third required argument from the command line <element_position>. This value is set within the job configuration, and should match the position of the column you wish to count for each distinct entry. Supplying 4 will match the IP column for the weblog data.
Since we are reading and writing text, we can use the supplied TextInputFormat and TextOutputFormat classes. We will set the Mapper and Reduce classes to match our DistinctMapper and DistinctReducer implemented classes respectively. We also supply DistinctReducer as a combiner class. This decision is explained in more detail as follows:
It's also very important to call setJarByClass() so that the TaskTrackers can properly unpack and find the Mapper and Reducer classes. The job uses the static helper methods on FileInputFormat and FileOutputFormat to set the input and output directories respectively. Now we're set up and ready to submit the job.
The Mapper class sets up a few member variables as follows:
The map() function splits each incoming line's value and extracts the string located at col_ pos. We reset the internal value for outKey to the string found on that line's position. For our example, this will be the IP value for the row. We emit the value of the newly reset outKey variable along with the value of outValue to mark one occurrence of that given IP address.
Without the assistance of the combiner, this would present the reducer with an iterable collection of 1s to be counted.
The following is an example of a reducer {key, value:[]} without a combiner:
{10.10.1.1, [1,1,1,1,1,1]} = six occurrences of the IP "10.10.1.1".
The implementation of the reduce() method will sum the integers and arrive at the correct total, but there's nothing that requires the integer values to be limited to the number 1. We can use a combiner to process the intermediate key-value pairs as they are output from each mapper and help improve the data throughput in the shuffle phase. Since the combiner is applied against the local map output, we may see a performance improvement as the amount of data we need to transfer for an intermediate key/value can be reduced considerably.
Instead of seeing {10.10.1.1, [1,1,1,1,1,1]}, the combiner can add the 1s and replace the value of the intermediate value for that key to {10.10.1.1, [6]}. The reducer can then sum the various combined values for the intermediate key and arrive at the same correct total. This is possible because addition is both a commutative and associative operation. In other words:
For counting the occurrences of distinct IPs, we can use the same code in our reducer as a combiner for output in the map phase.
When applied to our problem, the normal output with no combiner from two separate independently running map tasks might look like the following where {key: value[]} is equal to the intermediate key-value collection:
Without the aid of a combiner, this will be merged in the shuffle phase and presented to a single reducer as the following key-value collection:
Now let's revisit what would happen when using a Combiner against the exact same sample output:
Map Task A = {10.10.1.1, [1,1,1]} = three occurrences
Map Task B = {10.10.1.1, [1,1,1,1,1,1] = six occurrences
Now the reducer will see the following for that key-value collection:
We arrived at the same total count for that IP address, but we used a combiner to limit the amount of network I/O during the MapReduce shuffle phase by pre-reducing the intermediate key-value output from each mapper.
The combiner can be confusing to newcomers. Here are some useful tips:
The previous recipe and the default WordCount example show the Combiner class being initialized to the same implementation as the Reducer class. This is not enforced by the API, but ends up being common for many types of distributed aggregate operations such as sum(), min(), and max(). One basic example might be the min() operation of the Reducer class that specifically formats output in a certain way for readability. This will take a slightly different form from that of the min() operator of the Combiner class, which does not care about the specific output formatting.
Whether or not the framework invokes your combiner during execution depends on the intermediate spill file size from each map output, and is not guaranteed to run for every intermediate key. Your job should not depend on the combiner for correct results, it should be used only for optimization.
You can control the spill file threshold when MapReduce tries to combine intermediate values with the configuration property min.num.spills.for.combine.
This recipe will illustrate the efficient use of the Hive date UDFs to list the 20 most recent events and the number of days between the event date and the current system date.
Make sure you have access to a pseudo-distributed or fully-distributed Hadoop cluster with Apache Hive 0.7.1 installed on your client machine and on the environment path for the active user account.
This recipe depends on having the Nigera_ACLED_cleaned.tsv dataset loaded into a Hive table named acled_nigeria_cleaned with the fields mapped to the respective datatypes.
Issue the following command to the Hive client to see the mentioned fields:
describe acled_nigeria_cleaned
You should see the following response:
OK
Loc string
event_date string
event_type string
actor string
latitude double
longitude double
source string
fatalities int
Perform the following steps to utilize Hive UDFs for sorting and transformation:
SELECT event_type,event_date,days_since FROM (
SELECT event_type,event_date,
datediff(to_date(from_unixtime(unix_timestamp())),
to_date(from_unixtime(
unix_timestamp(event_date,
'yyyy-MM-dd')))) AS days_since
FROM acled_nigeria_cleaned) date_differences
ORDER BY event_date DESC LIMIT 20;
OK
Battle-No change of territory 2011-12-31 190
Violence against civilians 2011-12-27 194
Violence against civilians 2011-12-25 196
Violence against civilians 2011-12-25 196
Violence against civilians 2011-12-25 196
Let's start with the nested SELECT subqueries. We select three fields from our Hive table acled_nigeria_cleaned: event_type, event_date, and the result of calling the UDF datediff(), which takes as arguments an end date and a start date. Both are expected in the form yyyy-MM-dd. The first argument to datediff() is the end date, with which we want to represent the current system date. Calling unix_timestamp() with no arguments will return the current system time in milliseconds. We send that return value to from_ unixtimestamp() to get a formatted timestamp representing the current system date in the default Java 1.6 format (yyyy-MM-dd HH:mm:ss). We only care about the date portion, so calling to_date() with the output of this function strips the HH:mm:ss. The result is the current date in the yyyy-MM-dd form.
The second argument to datediff() is the start date, which for our query is the event_ date. The series of function calls operate in almost the exact same manner as our previous argument, except that when we call unix_timestamp(), we must tell the function that our argument is in the SimpleDateFormat format that is yyyy-MM-dd. Now we have both start_date and end_date arguments in the yyyy-MM-dd format and can perform the datediff() operation for the given row. We alias the output column of datediff() as days_since for each row.
The outer SELECT statement takes these three columns per row and sorts the entire output by event_date in descending order to get reverse chronological ordering. We arbitrarily limit the output to only the first 20.
The net result is the 20 most recent events with the number of days that have passed since that event occurred.
The date UDFs can help tremendously in performing string date comparisons. Here are some additional pointers:
Check out the Javadocs for SimpleDateFormat to learn how your custom date strings can be used with the date transform UDFs.
This recipe will show a very simple analytic that uses Hive to count fatalities for every month appearing in the dataset and print the results to the console.
Make sure you have access to a pseudo-distributed or fully-distributed Hadoop cluster with Apache Hive 0.7.1 installed on your client machine and on the environment path for the active user account.
This recipe depends on having the Nigera_ACLED_cleaned.tsv dataset loaded into a Hive table named acled_nigeria_cleaned with the following fields mapped to the respective datatypes.
Issue the following command to the Hive client:
describe acled_nigeria_cleaned
You should see the following response:
OK
loc string
event_date string
event_type string
actor string
latitude double
longitude double
source string
fatalities int
Follow the steps to use Hive for report generation:
SELECT from_unixtime(unix_timestamp(event_date, 'yyyy-MM-dd'),
'yyyy-MMM'),
COALESCE(CAST(sum(fatalities) AS STRING), 'Unknown')
FROM acled_nigeria_cleaned
GROUP BY from_unixtime(unix_timestamp(event_date, 'yyyy-MMdd'),'
yyyy-MMM');
OK
1997-Apr 115
1997-Aug 4
1997-Dec 26
The SELECT statement uses unix_timestamp() and from_unixtime() to reformat the event_date for each row as just a year-month concatenated field. This is also in the GROUP BY expression for totaling fatalities using sum().
The coalesce() method returns the first non-null argument passed to it. We pass as the first argument, the value of fatalities summed for that given year-month, cast as a string. If that value is NULL for any reason, return the constant Unknown. Otherwise return the string representing the total fatalities counted for that year-month combination. Print everything to the console over stdout.
The following are some additional helpful tips related to the code in this recipe:
As mentioned in the Hive documentation, coalesce() supports one or more arguments. The first non-null argument will be returned. This can be useful for evaluating several different expressions for a given column before deciding the right one to choose.
The coalesce() will return NULL if no argument is non-null. It's not uncommon to provide a type literal to return if all other arguments are NULL.
Having to reformat dates stored in your raw data is very common. Proper use of from_ unixtime() and unix_timestamp() can make your life much easier.
Remember this general code template for concise date format transformation in Hive:
from_unixtime(unix_timestamp(<col>,<in-format>),<out-format>);