Item-Based Collaborative Filtering with Hadoop

(2009)

For awhile I've been interested in the Item Based Collaborative Filter algorithm used by companies like Amazon (and probably a bunch of others now too)

So in 2009, I tried to use it to make a recommendation system for Android apps. I decided to implement it on Hadoop. This is kind of a mess and I hope to re-implement it soon with some decent comments and better organization. AS IS, the code is split into several classes, each making one pass over the data. Unfortunately, I've lost the file that orchestrates the flow of data between these little programs. So this is from memory - here goes:

Step One takes an input file with one User ID per line, followed by any number of comma-separated strings: 1\tcom.whatever,net.app 2\tcom.whatever

App names are converted to IDs with an external lookup table. So the output might look like this:

1\t5,6
2\t5

And here's the code:

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class LogCrunch extends Configured implements Tool {

    public static class Map extends MapReduceBase implements Mapper<Object, Text, Text, Text> {
        private final Text outk = new Text();
        private final Text outv = new Text();
        private java.util.Map<String, String> lookup = new HashMap<String,String>();

        public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String[] temp = value.toString().split("\t");
            outk.set(temp[0]);
            for(String s : temp[1].split(",")) {
                outv.set(s);
                output.collect(outk, outv);
            }
        }

        @Override
        public void configure(JobConf job) {
            Path[] patternsFiles = new Path[0];
            try {
                patternsFiles = DistributedCache.getLocalCacheFiles(job);
                for(Path file : patternsFiles) {
                    String line = "";
                    BufferedReader fis = null;
                    fis = new BufferedReader(new FileReader(file.toString()));
                    while ((line = fis.readLine()) != null) {
                        String[] parts = line.split(",");
                        String pk = parts[0];
                        String pkg = parts[1];
                        lookup.put(pkg, pk);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }   
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
        private final Text outv = new Text();
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String data = "";
            while(values.hasNext()) {
                String temp = values.next().toString();
                             temp = lookup.get(temp); // lookup ID from key
                data += temp + ",";
            }
            data = data.substring(0, data.length()-1);
            outv.set(data);
            output.collect(key, outv);
        }

    }
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new LogCrunch(), args); 
        System.exit(res);
    }

    @Override
    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(LogCrunch.class);
        conf.setJobName("LogCrunch");

        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(Text.class);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(Map.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
        return 0;
    }
}

The next step sums the number of total items, writing output to a separate file.

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;

public class Step1 {

    public static class Map extends MapReduceBase implements Mapper<Object, Text, Text, IntWritable> {
        private final Text item = new Text();
        private final IntWritable one = new IntWritable(1);
        public void map(Object key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            item.set(value.toString().split("::")[0]);
            output.collect(item, one);
        }
    }

    public static class Combine extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        private final IntWritable iw = new IntWritable();
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while(values.hasNext()) {
                sum += values.next().get();
            }
            iw.set(sum);
            output.collect(key, iw);
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        private final IntWritable iw = new IntWritable();
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while(values.hasNext()) {
                sum += values.next().get();
            }
            iw.set(sum);
            output.collect(key, iw);
        }
    }
    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(Step1.class);
        conf.setJobName("Step1");

        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(IntWritable.class);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Map.class);
        conf.setCombinerClass(Combine.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
    }
}

The next step sums the user ratings

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;

public class Step2 {

    public static class Map extends MapReduceBase implements Mapper<Object, Text, Text, Text> {
        private final Text outk = new Text();
        private final Text outv = new Text();

        public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String[] temp = value.toString().split("::");
            outk.set(temp[0]);
            outv.set(temp[1]+","+temp[2]);
            output.collect(outk, outv);
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
        private final Text outv = new Text();
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            float sum = 0;
            int count = 0;
            String data = "";
            while(values.hasNext()) {
                String temp = values.next().toString();
                sum += Float.parseFloat(temp.split(",")[1]);
                count++;
                data += temp + "*";
            }
            data = data.substring(0, data.length()-1);
            outv.set(count+"@"+sum+"@"+data);
            output.collect(key, outv);
        }
    }
    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(Step2.class);
        conf.setJobName("Step2");

        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(Text.class);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(Map.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
    }
}

The next step calculates item similarity using the Pearson distance.

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;

public class Step3Pearson {

    private static double SimPearson(double sumx, double sumy, double sumxx, double sumyy, double sumxy, double n) {
        //double r = n * sumxy - sumx * sumy;
        //double d = Math.sqrt((n * sumxx - sumx*sumx) * (n * sumyy - sumy*sumy));
        double r = sumxy - (sumx * sumy / n);
        double d = Math.sqrt((sumxx - sumx*sumx/n) * (sumyy - sumy*sumy/n));
        if(d == 0d)
            return 0d;
        else
            return r / d;
    }



    public static class Map extends MapReduceBase implements Mapper<Object, Text, Text, Text> {
        private final Text outk = new Text();
        private final Text outv = new Text();

        public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String temp = value.toString().split("\t")[1];
            String temp2 = temp.toString().split("@")[0];
            String temp3 = temp.toString().split("@")[1];
            String temp4 = temp.toString().split("@")[2];
            String[] itemRatings = temp4.split("\\*");
            String temp5 = "";
            String temp6 = "";
            String temp7 = "";
            String temp8 = "";
            for(int i=0; i<itemRatings.length; i++) {
                temp5 = itemRatings[i].split(",")[0];
                temp6 = itemRatings[i].split(",")[1];
                for(int j=0; j<i; j++) {
                    temp7 = itemRatings[j].split(",")[0];
                    temp8 = itemRatings[j].split(",")[1];
                    if(Integer.parseInt(temp5)<Integer.parseInt(temp7)) {
                        outk.set(temp5 + "," + temp7);
                        outv.set(temp6 + "," + temp8);
                    } else {
                        outk.set(temp7 + "," + temp5);
                        outv.set(temp8 + "," + temp6);
                    }
                    output.collect(outk, outv);
                }
            }

        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
        private final Text outk = new Text();
        private final Text outv = new Text();
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String[] temp = key.toString().split(",");
            String item1 = temp[0];
            String item2 = temp[1];
            float sumxx = 0;
            float sumxy = 0;
            float sumyy = 0;
            float sumx = 0;
            float sumy = 0;
            float n = 0;
            double sim = 0d;
            while(values.hasNext()) {
                String[] temp2 = values.next().toString().split(",");
                Float rating1 = Float.parseFloat(temp2[0]);
                Float rating2 = Float.parseFloat(temp2[1]);
                sumxx += rating1*rating1;
                sumxy += rating1*rating2;
                sumyy += rating2*rating2;
                sumx += rating1;
                sumy += rating2;
                n++;
            }
            sim = SimPearson(sumx, sumy, sumxx, sumyy, sumxy, n);
            outk.set(item1);
            outv.set(sim+","+item2+","+n);
            output.collect(outk, outv);
            outk.set(item2);
            outv.set(sim+","+item1+","+n);
            output.collect(outk, outv);
        }
    }

    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(Step3Pearson.class);
        conf.setJobName("Step3");

        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(Text.class);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(Map.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
    }
}

The last step determines the 15 most-similar items for each item.

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;

public class Step4 {

    public static class Map extends MapReduceBase implements Mapper<Object, Text, Text, Text> {
        private final Text outk = new Text();
        private final Text outv = new Text();

        public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String[] temp = value.toString().split("\t");

            String[] temp2 = temp[1].split(",");
            double sim = 1d-Double.parseDouble(temp2[0]);

            outk.set(temp[0]+","+sim);
            outv.set(temp2[1]+","+temp2[2]);
            output.collect(outk, outv);
        }
    }


    public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
        private final Text outk = new Text();
        private final Text outv = new Text();
        private final int K = 15;
        private String which = "";
        private int count = 0;

        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String[] temp = key.toString().split(",");
            String item1 = temp[0];
            Double sim = 1d-Double.parseDouble(temp[1]);
            if(!which.equals(item1.toString())) {
                which = item1.toString();
                count = 0;
            }

            while(values.hasNext()) {
                String[] temp2 = values.next().toString().split(",");
                String item2 = temp2[0];
                String n = temp2[1];
                outk.set(temp[0]);
                outv.set(sim+","+item2+","+n);
                if(count < K)
                    output.collect(outk, outv);
                count++;
            }
        }
    }
    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(Step4.class);
        conf.setJobName("Step4");

        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(Text.class);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(Map.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
    }
}