NAME MapReduce.pm - Perl version of Google's distributed data processing ABSTRACT Simple, generalized distributed data processing. SYNOPSIS On the commandline: # mr example.mr inputcorpus outputcorpus File example.mr: ------------------ package ExamplePackage; use MapReduce; @ISA = qw( MapReduce ); ExamplePackage->run; sub Reader { ... } sub Map { ... } sub Reduce { ... } ------------------ DESCRIPTION MapReduce is a generalized distributed data processing system (see http://labs.google.com/papers/mapreduce.html). You are strongly encouraged to read the paper before proceeding further, as it will make everything much clearer. There is at least one open implementation of mapreduce. The purpose of MapReduce.pm is to be 1. Perl-based, using Perl as the default language for mapreduce programs, and 2. as simple as possible while still being useful. Some people, like me, have a particular thing for simplicity. This is a simplified, easy to use implementation. Based on shared ssh keys and remote calls, it is currently suitable for small clusters (<=100 or so machines). To use it, you just write three functions into a .mr file: Reader(), Map(), and Reduce(). Reader defines an input record, returning one per call. Map takes an input record and creates a list of key/value pairs from it. Reduce takes a key and a list of values, and returns a (possibly different) key and another list (possibly a single value). Strange as it may seem, an astonishingly large class of data processing tasks can be formulated as definitions of these three functions. For example, here is a .mr file which counts all the occurrences of every unique word in a corpus of documents: #### file MapReduce/bin/wc.mr package WordCount; use lib "MapReduce/lib"; use MapReduce; our @ISA = qw( MapReduce ); WordCount->run(@ARGV); # Map() returns a flat list with key/value pairs eg # (word => 1, word2 => 1, ...) sub Map { my $ref = shift; return map { $_ => 1 } split /\W+/, $$ref; } # Reduce() returns a single key/value pair, reduced from an iterator # of values. sub Reduce { my ($key, $iter) = @_; my $val; while( defined (my $next = $iter->()) ){ $val += $next->[0]; } return $key, $val; } ##################################### When this is called using the mr program, like so # mr wc.mr odyssey odyssey_wc ...there will appear a new corpus called odyssey_wc, containing records of the form "word sum\n" where sum is the number of occurrences of that word. INSTALL Installation differs from the normal procedure. First you must get the ssh transport layer ready at least for the one host. generate an ssh key pair Call "ssh-keygen" as below, hitting return for all interactive questions. ssh-keygen -t rsa cat .ssh/id_rsa.pub >> .ssh/authorized_keys2 ssh localhost exit These steps prepare the ssh transport layer for a single machine. "make test" for a single-machine cluster You must rename the MapReduce-[version]/ dir to plain MapReduce, and there is no make install step. tar zxvf MapReduce-0.03.tar.gz mv MapReduce-0.03 MapReduce cd MapReduce perl Makefile.PL make test There is no make install step. You just copy bin/mr to your ~/bin/ dir, and copy test.conf to /etc/mapreduce.conf and edit to taste. The make test will test on your master machine as a single machine cluster. To add machines to your cluster, continue with the steps below. ADDING MACHINES How to gather machines into a cluster. 1. Edit /etc/mapreduce.conf to provide "hosts" entries, one for each host in the pool. 2. Distribute shared keys to the other hosts. # mr -cmd="mkdir .ssh; chmod 0700 .ssh" # mr -cmd="scp masterhost:.ssh/* .ssh/." Distribute the host list. # mr -cmd="scp masterhost:.ssh/known_hosts .ssh/." NOTE: Please see Hack #66 in the excellent Linux Server Hacks by Rob Flickenger for details beyond the above instructions. 3. Place a MapReduce.pm install dir in the home directory of each mapreduce user, and rename it so there is no version number. This can be achieved like this from the master account (after the above steps): # mr -cmd="scp masterhost:MapReduce-0.01.tar.gz ." # mr -cmd="tar zxvf MapReduce*" # mr -cmd="mv MapReduce-0.01 MapReduce" 4. Test the cluster integrity: cd back into the MapReduce/ install dir, copy /etc/mapreduce.conf to test.conf and rerun make test to check the cluster. When adding additional machines, repeat the above steps, and just remember all hosts must share the same ssh keypair, and they all have to have entries for everyone in their .ssh/known_hosts file. USING MAPREDUCE The main tool is the mr program. You can use this to launch mapreduce operations or just execute any command on every host in the pool. After all the setup has been done, there should be sufficient information in the config file and command line for mapreduce to know what to do. This info includes - the list of hosts in the pool - the location of the repository (default /tmp/mr) - the name of the task file where your program resides - the name of the input corpus - the name of the result corpus Normally only the last three are specified on the command line. Hence the typical call looks like this: # mr wc.mr odyssey odyssey_wc If something goes wrong and you need to clean up, this will get rid of the distributed output corpus: # mr -cmd="rm -r /tmp/mr/odyssey_wc" (this assumes your repository is /tmp/mr and corpus to delete is odyssey_wc). The cmd option is quite useful in practice, but be careful! REQUIRES Ssh using shared keys. Digest::MD5 DIFFERENCES FROM OTHER IMPLEMENTATIONS There are two implementations of this model (that I know of): Google's original, and a Lucene-related project called Hadoop. Google wrote theirs in C++, while the Lucene authors used Java. Compared to the other, much more involved versions, this is more or less a toy. It's only suitable for smaller clusters, for various reasons. It's based not on listening services per host, but instead on remote command invocation from a master program which forks processes to babysit the tasks. This makes the code rather simple and requires no true installation on any host in the pool. On the other hand it limits the number of hosts to available process memory on the master machine. Plus any network trouble is going to mess everything up. A quite significant difference is the absense of a distributed file system. Google uses "GFS" to make input files available to map tasks, while this implementation simply stores distributed files across the cluster so that each host is responsible for the files on its local storage. This requires a preliminary "distribution step" which partitions a set of records across all hosts. This is quite simple since it can be implemented as a MapReduce operation! See 'partition_trec_docs.mr' and 'partition_lines.mr' in the bin directory for examples. This turns out to be more convenient than it looks, because often you will want to perform another mapreduce operation on the results of the last, and in this way the files are already distributed. I haven't looked at Hadoop very much. Only to note that it appears much more "enterprise", which is to say it is more complicated. It has four xml configuration files (MapReduce.pm has only one, on one machine), and ... a lot of source code. It's no doubt much more stable, and also harder to get started. Not to mention... there are no Perl bindings. Just to natter on a bit on relative complexity. The current Hadoop build has 313 java files with 61,810 lines of code. This doesn't include non-java files. There are 14 jsp files, 13 sh files, and 586 html files, plus. You could not check out the source code and understand everything in half a day. I'm not disparaging the Hadoop effort at all. It's probably very high quality. I'm just saying the concept is very simple, and it might be possible to have a simple yet stable implementation. Even though this version is probably not as stable as the others, it comprises just over 500 lines of code as of the current release. It has only one configuration file, can be used almost immediately with very minimal effort, and is rather noninvasive, requiring nothing but untar-ing a directory on pool machines. In short, perfect for exploring, understanding, and profitably using the MapReduce concepts with near-trivial time investment. It's my goal to have a very sound system with comparable performance and stability to Google or Hadoop, using so little code that it can be read and understood in less than a day by a normal programmer. LOAD BALANCING and FAULT TOLERANCE Google and Hadoop deal with these issues separately, using a distributed file system for load balancing and centralized tracking of tasks for fault tolerance. I don't particularly like those because they're complicated and would require a lot more code than I feel is really necessary. I'm trying the idea of "replicated responsibility" to handle both load balancing and fault tolerance, and so far it works rather well and takes little code. In MapReduce.pm, the input data are distributed evenly across hosts with a tunable replication factor F which garuantees that F-1 hosts can go down at any time and the operation will still complete. The number of hosts that can actually go down is larger, and is related to the total amount of input data, but I have yet to complete an analysis of what that is. The concept of shared responsibility provides load balancing, in that each machine is responsible for making sure all files residing locally are processed, and will step up to do the processing themselves if given the opportunity (ie if finishing previous files faster than its brothers). This is simpler than centralized tracking. TO DO It would be nice to reduce the complexity of installation and deployment. I'm working on that. I would like to improve MapReduce.pm as much as possible while still conforming to the goals of simplicity, stability and performance. Please, if you have suggestions for elegant improvements, get in touch with me. SEE ALSO http://labs.google.com/papers/mapreduce.html Linux Server Hacks, Rob Flickenger AUTHOR Ira Woodhead, COPYRIGHT AND LICENSE Copyright (C) 2006 by Ira Woodhead This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.8.6 or, at your option, any later version of Perl 5 you may have available.