MapReduce Example

When you persist the data for a GemFire XD table to HDFS, the HDFS log files enable you to analyze the table data outside of the GemFire XD distributed system, using Hadoop tools such as MapReduce or Pivotal HAWQ. GemFire XD includes a MapReduce example that helps you generate HDFS table data and then access the data using a Hadoop MapReduce job.

Note: The MapReduce example is a standalone application with support scripts to create GemFire XD system and populate data. It does not use or build upon the distributed system created in the QuickStart Tutorials.

Prerequisites

The MapReduce example requires that you use GemFire XD with Pivotal HD Enterprise for Hadoop services. The simplest way to complete this tutorial is to download and run the Pivotal HD Single Node VM, available at http://www.pivotal.io/big-data/pivotal-hd. After starting the virtual machine, double-click the start_gfxd.sh script on the desktop to start all Pivotal HD services.

You should be familiar with the Hadoop MapReduce API in order to fully understand the GemFire XD MapReduce example. See the Hadoop MapReduce Tutorial for general information about MapReduce.

The MapReduce example application is installed with GemFire XD in the examples/mapreduce subdirectory of the product installation directory. The sample application uses a set of scripts to start GemFire XD locators and servers, create the required schema, and execute the compiled MapReduce jobs. In addition to helping you modify and run the necessary scripts, this tutorial calls out excerpts of the MapReduce code to help you understand how GemFire XD extends the MapReduce API to help you easily work with GemFire XD table data. If you want more information about compiling the example code from source, see the /examples/mapreduce/readme.txt file installed in the GemFire XD directory.

Procedure

  1. Shut down any GemFire XD distributed system that may still be running on your system. The MapReduce example automatically starts a new distributed system for generating HDFS data, and existing members could cause conflicts when running on the same machine. To shut down the system that was used in the previous tutorials:
    $ cd ~
    $ gfxd shut-down-all -locators=localhost[10101]
    $ gfxd locator stop -dir=locator
  2. If you have not already started the Pivotal HD services in the virtual machine, do so now:
    $ ~/Desktop/start_all.sh
    Starting services
    Starting cluster
    [...]
    Note: It can take several minutes or more to start all services on the virtual machine, depending on the configuration of your host computer.
  3. Move to the MapReduce example /scripts directory:
    $ cd /usr/lib/gphd/gfxd/examples/mapreduce/scripts
  4. Open the mr_driver.sh file with a text editor. Look for the following lines in the file:
    GFXD=/usr/bin/gfxd
    export BIND_ADDRESS=HOSTNAME
    
    GFXD_HOME=/usr/lib/gphd/gfxd
    

    If you did not install GemFire XD using the RPM installation, edit the GFXD and GFXD_HOME values to point to the location of the gfxd utility and the GemFire XD installation utility, respectively. If you installed using the RPM installer, then you do not need to modify these lines.

    In all cases, change the HOSTNAME entry to the host name of the GemFire XD machine. For example, if you installed GemFire XD into the Pivotal HD Single Node VM, the lines should look like:
    GFXD=/usr/bin/gfxd
    export BIND_ADDRESS=pivhdsne
    
    GFXD_HOME=/usr/lib/gphd/gfxd
    

    Save the file after you have made the required changes.

  5. Open the create_hdfs_schema.sql file in a text editor, and locate the LOCATOR and HOSTNAME references in bold below:
    AUTOCOMMIT OFF;
    
    CONNECT CLIENT 'LOCATOR:1527'
    
    DROP TABLE IF EXISTS BUSY_AIRPORT;
    DROP TABLE IF EXISTS FLIGHTS_HISTORY;
    DROP TABLE IF EXISTS MAPS;
    DROP TABLE IF EXISTS FLIGHTAVAILABILITY;
    DROP TABLE IF EXISTS FLIGHTS;
    DROP TABLE IF EXISTS CITIES;
    DROP TABLE IF EXISTS COUNTRIES;
    DROP TABLE IF EXISTS AIRLINES;
    
    -- DROP TRIGGER IF EXISTS TRIG1;
    -- DROP TRIGGER IF EXISTS TRIG2;
    
    DROP HDFSSTORE IF EXISTS airlines;
    
    CREATE HDFSSTORE airlines
        NAMENODE 'hdfs://HOSTNAME:8020'
        HOMEDIR '/tmp/gfxd'
        BatchTimeInterval 5000 milliseconds;
    
    Change these references to the hostname that will run the GemFire XD locator member and the hostname that runs the Pivotal HD name node. If you installed GemFire XD in the Pivotal HD Single Node VM, use "pivhdsne" for both of these values, as in:
    AUTOCOMMIT OFF;
    
    CONNECT CLIENT 'pivhdsne:1527'
    
    DROP TABLE IF EXISTS BUSY_AIRPORT;
    DROP TABLE IF EXISTS FLIGHTS_HISTORY;
    DROP TABLE IF EXISTS MAPS;
    DROP TABLE IF EXISTS FLIGHTAVAILABILITY;
    DROP TABLE IF EXISTS FLIGHTS;
    DROP TABLE IF EXISTS CITIES;
    DROP TABLE IF EXISTS COUNTRIES;
    DROP TABLE IF EXISTS AIRLINES;
    
    -- DROP TRIGGER IF EXISTS TRIG1;
    -- DROP TRIGGER IF EXISTS TRIG2;
    
    DROP HDFSSTORE IF EXISTS airlines;
    
    CREATE HDFSSTORE airlines
        NAMENODE 'hdfs://pivhdsne:8020'
        HOMEDIR '/tmp/gfxd'
        BatchTimeInterval 5000 milliseconds;
    

    Save the file after you have made the required changes.

  6. Execute the mr_driver.sh script with the clear argument to remove any temporary files that may have been created by previously running the MapReduce examples:
    $ ./mr_driver.sh clear
    The specified working directory (server1) contains no status file
    The specified working directory (locator1) contains no status file
    rm: `/output': No such file or directory
    rm: `/output_int': No such file or directory

    If you receive messages similar to the above, it simply means that none of the example's temporary directories need to be removed.

  7. Start the GemFire XD distributed system:
    $ ./mr_driver.sh start
    Starting GemFireXD Locator using peer discovery on: 0.0.0.0[19992]
    Starting network server for GemFireXD Locator at address pivhdsne/127.0.0.1[1527]
    Logs generated in /usr/lib/gphd/Pivotal_GemFireXD_10/examples/mapreduce/scripts/locator1/gfxdlocator.log
    GemFireXD Locator pid: 35717 status: running
    Starting GemFireXD Server using locators for peer discovery: localhost[19992]
    Starting network server for GemFireXD Server at address pivhdsne/127.0.0.1[1528]
    Logs generated in /usr/lib/gphd/Pivotal_GemFireXD_10/examples/mapreduce/scripts/server1/gfxdserver.log
    GemFireXD Server pid: 35824 status: running
      Distributed system now has 2 members.
      Other members: 10.0.1.22(35717:locator)<v0>:31512

    The script creates local directories a locator and data store member (./locator1 and ./server1), and starts both members. The distributed system will be used for loading data and, in the final MapReduce job, to write data back into a GemFire XD table.

  8. The next script that you execute will create the database schema and load tables with data. Before you execute the script, examine the key tables and data involved. In the create_hdfs_schema.sql file, you can see the following HDFS store and table are created:
    CREATE HDFSSTORE airlines
        NAMENODE 'hdfs://pivhdsne:8020'
        HOMEDIR '/tmp/gfxd'
        BatchTimeInterval 5000 milliseconds;
    ...
    CREATE TABLE FLIGHTS_HISTORY
       (
          FLIGHT_ID CHAR(6),
          SEGMENT_NUMBER INTEGER,
          ORIG_AIRPORT CHAR(3),
          DEPART_TIME TIME,
          DEST_AIRPORT CHAR(3),
          ARRIVE_TIME TIME,
          MEAL CHAR(1),
          FLYING_TIME DOUBLE PRECISION,
          MILES INTEGER,
          AIRCRAFT VARCHAR(6), 
          STATUS VARCHAR (20)
       )
       PARTITION BY COLUMN (FLIGHT_ID, SEGMENT_NUMBER)
       BUCKETS 5
       HDFSSTORE (airlines);
    

    The MapReduce job that you run later will be accessing the FLIGHTS_HISTORY HDFS log files, stored in HDFS under the /tmp/gfxd directory. Notice that the BUCKETS 5 clause limits the total number of buckets in the table for running the example. Each BUCKET will receive a dedicated HDFS subdirectory and collection of log files, so the number of HDFS bucket directories will be limited to 5.

    After creating the full schema, the script executes the fh-10000.sql file to load 10,000 rows into FLIGHTS_HISTORY.

  9. Execute the following command to create the schema and load data:
    $ ./mr_driver.sh load

    The data loading step may take several minutes, depending on the speed of your system.

  10. The example class that you will run executes two separate MapReduce job. The first job uses the GemFire XD RowInputFormat implementation to identify the table name and HFDS log files to use as input to the Mapper implementation. Look at the following configuration code from the run method in the /usr/lib/gphd/gfxd/examples/mapreduce/src/main/java/demo/gfxd/mr2/TopBusyAirportGemfirexd.java file:
        conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
        conf.set(RowInputFormat.INPUT_TABLE, tableName);
        conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
    
        Job job = Job.getInstance(conf, "Busy Airport Count");
        job.setJarByClass(TopBusyAirportGemfirexd.class);
    
        job.setInputFormatClass(RowInputFormat.class);
    
        // configure mapper and reducer
        job.setMapperClass(SampleMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
    
        // configure output
        TextOutputFormat.setOutputPath(job, intermediateOutputPath);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

    For the first MapReduce job, the example sets the GemFire XD RowInputFormat.class implementation as the InputFormat class, and configures it with the highlighted HDFS store directory and table name. The RowInputFormat.CHECKPOINT_MODE indicates that the compacted log files, which represent a snapshot of the table's current values, are not used as input. Instead, the job uses the full set of raw HDFS log files that include all operations against the table. The output of the first MapReduce job is configured to write text to an intermediate file.

    The map method in SampleMapper class performs the work for the first MapReduce job, which involves tallying the ORIG_AIRPORT and DEST_AIRPORT in a temporary file:
      public static class SampleMapper extends Mapper<Object, Row, Text, IntWritable> {
    
        private final static IntWritable countOne = new IntWritable(1);
        private final Text reusableText = new Text();
    
        @Override
        public void map(Object key, Row row, Context context)
            throws IOException, InterruptedException {
    
          String origAirport;
          String destAirport;
    
          try {
            ResultSet rs = row.getRowAsResultSet();
            origAirport = rs.getString("ORIG_AIRPORT");
            destAirport = rs.getString("DEST_AIRPORT");
            reusableText.set(origAirport);
            context.write(reusableText, countOne);
            reusableText.set(destAirport);
            context.write(reusableText, countOne);
          } catch (SQLException e) {
            e.printStackTrace();
          }
        }

    In the highlighted sections, you can see that the RowInputFormat implementation delivers table records using key and row objects. The key object itself contains no relevant table data; it is included only to conform to the standard Hadoop interface. The row object contains all relevant information for the table record. In the SampleMapper class, the row is cast to a result set, and tallies for the ORIG_AIRPORT and DEST_AIRPORT are written to intermediate files in the operating system.

  11. In the second part of the run method, a second MapReduce job is configured as follows:
          String gemfirexdUrl = topConf.get("gemfirexd.url", "jdbc:gemfirexd://localhost:1527");
          topConf.set(RowOutputFormat.OUTPUT_URL, gemfirexdUrl);
          topConf.set(RowOutputFormat.OUTPUT_TABLE, "APP.BUSY_AIRPORT");
    ...
          topJob.setOutputFormatClass(RowOutputFormat.class);
          topJob.setOutputKeyClass(Key.class);
          topJob.setOutputValueClass(BusyAirportModel.class);
    
    
    Instead of writing results to a file, the second MapReduce job, uses the GemFire XD RowOutputFormat implementation to write results to a table, connecting to the running GemFire XD system with the provided URL. BusyAirportModel.class defines the schema of this table using the JDBC API. The reduce method of the second MapReduce job simply determines the busiest airport, and writes the value to the output table:
      public static class TopBusyAirportReducer extends Reducer<Text, StringIntPair, Key, BusyAirportModel> {
    
        @Override
        public void reduce(Text token, Iterable<StringIntPair> values,
            Context context) throws IOException, InterruptedException {
          String topAirport = null;
          int max = 0;
    
          for (StringIntPair v : values) {
            if (v.getSecond() > max) {
              max = v.getSecond();
              topAirport = v.getFirst();
            }
          }
          BusyAirportModel busy = new BusyAirportModel(topAirport, max);
          context.write(new Key(), busy);
        }
      }

    Again, the key object is provided only to conform to the Hadoop API; the key contains no relevant table data.

  12. To execute both MapReduce jobs, execute the command:
    $ ./mr_driver.sh runMR
    At the completion of the process, you should see the output:
    [fine 2014/03/29 06:07:52.481 CST <Distributed system shutdown hook> tid=0xb] <Hoplog:HdfsStore:StoreForDDLFetching7770299402355> Closing file system: StoreForDDLFetching7770299402355
    Heap
     def new generation   total 76672K, used 30134K [0x00000000bc600000, 0x00000000c1930000, 0x00000000c1930000)
      eden space 68160K,  44% used [0x00000000bc600000, 0x00000000be36dab0, 0x00000000c0890000)
      from space 8512K,   0% used [0x00000000c0890000, 0x00000000c0890000, 0x00000000c10e0000)
      to   space 8512K,   0% used [0x00000000c10e0000, 0x00000000c10e0000, 0x00000000c1930000)
     concurrent mark-sweep generation total 893088K, used 536508K [0x00000000c1930000, 0x00000000f8158000, 0x00000000fae00000)
     concurrent-mark-sweep perm gen total 83968K, used 58350K [0x00000000fae00000, 0x0000000100000000, 0x0000000100000000)
    
    real	0m24.624s
    user	0m8.747s
    sys	0m2.066s
    Found 2 items
    -rw-r--r--   3 gpadmin hadoop          0 2014-03-29 06:07 /output_int/_SUCCESS
    -rw-r--r--   3 gpadmin hadoop        691 2014-03-29 06:07 /output_int/part-r-00000
    gfxd> select * from busy_airport;
    AIR&|FLIGHTS    |STAMP                     
    -------------------------------------------
    JFK |860        |2014-03-29 06:07:51.921   
    
    1 row selected
    gfxd>

    By processing the full history events in the FLIGHTS_HISTORY table, the MapReduce example determined which airport had the most arrivals and departures combined.

  13. To view the intermediate output from the first MapReduce job:
    $ hdfs dfs -cat hdfs://pivhdsne:8020/output_int/part-r-00000
    ...
    PHL	122
    PHX	155
    PRG	208
    REY	199
    SAN	120
    SAT	230
    SCL	244
    SEA	421
    SEL	157
    SFO	490
    SHA	175
    SIN	331
    SJU	109
    SLC	193
    STL	123
    SVO	409
    SYD	402
    THR	176
    TPE	219
    WAW	170
    YUL	187
    YYZ	185
  14. To view the table output of the second MapReduce job:
    $ gfxd
    gfxd version 1.3.0
    gfxd> connect client 'localhost:1527';
    gfxd> select * from busy_airport;
    AIRPORT|FLIGHTS    |STAMP                     
    ----------------------------------------------
    JFK    |860        |2014-03-29 06:07:51.921   
    
    1 row selected
  15. To stop and remove GemFire XD distributed system, and remove all temporary directories, execute the command:
    $ ./mr_driver.sh clear
    The GemFireXD Server has stopped.
    The GemFireXD Locator has stopped.
    rm: `/output': No such file or directory
    14/03/29 06:10:38 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 86400000 minutes, Emptier interval = 0 minutes.
    Moved: 'hdfs://pivhdsne.localdomain:8020/output_int' to trash at: hdfs://pivhdsne.localdomain:8020/user/gpadmin/.Trash/Current