Jayant Singh Website - Healthcare IT Blog

HL7 Analysis with Hadoop!

My first encounter with Hadoop, Bigdata, Ubuntu, Java & Python! Learing new things is a real fun.

I have extended very popular WordCount example & implemented a Hadoop MapReduce job to count different Trigger Events from a file containing HL7 messages. I have also provided this MapReduce script in java & python with this article.

Here is a breif background of my research to encourage people to start exploring Hadoop & Bigdata

I had some free time recently when I was recovering from a viral fever. I thought of reading some new stuff during this time. I had heard about Hadoop & Bigdata a lot but never had a chance to look into these.

I searched a bit on google to find some ways to do hands on. I am a .NET developer so I tried to find out if I can use Hadoop in .NET & found that Microsoft is on the way with HDInsight on Windows Azure. But after some analysis I decided to look alternatives & found Hortonworks provides ready to use appliances for VirtualBox & VMWare.

I downloaded VirtualBox appliance. After installing & configuring the appliance I figured out that my old sony vaio laptop with T6400 processor doesn't support Intel Virtualization.

I then looked some other alternatives & found some posts about installing Hadoop on Ubuntu. I remember Ubuntu from my college experience. So I downloaded & installed Ubuntu 13.04 on my laptop. I didn't know what is LTS(long term support) at that time and downloaded most recent release v13.04 :) but it's working fine. I suppose LTS is same as current stable release, it is 12.04 for Ubuntu.

Wow! Ubuntu installation was as simple as Windows 7. Even Hadoop 1.1.2 (current stable release) configuration was quite simple. Luckily I didn't face many problems during configuration.


Finally, after some serious exercise I was ready with my single node Hadoop cluster on Ubuntu.

So let's see what I did with Bigdata on Hadoop (although for this example & on single node Hadoop cluster I have used a small data :) i.e. an HL7 file with 60386 messages)

I first ran the WordCount example available with Hadoop Setup & then implemented it by looking at source code. I was able to install Eclipse IDE on Ubuntu & run small java programs with few minor problems. Some of the things that I had to search on google were how to add CLASSPATH in Eclipse, how to create JAR files in Eclipse etc.

Then I decide to extend this word count example to analyze a file with HL7 messages & count different trigger events from MSH segment (MSH_9).

I was able to successfully run this & them implemented same for Segment count from file with HL7 messages.

So here is the java code, I haven't included any validation on HL7 message

EventCount.java

https://github.com/j4jayant/Hadoop-Examples/blob/master/HL7_EventCount/java/EventCount.java

package jayant.hadoop.mapreduce.HL7Test;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

import java.io.IOException;
import java.util.*;

public class EventCount {
    public static class Map extends MapReduceBase implements Mapper {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
       final String MSH_SEG_START = "MSH|^~\\&";

       public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {
           String line = value.toString();
           if(line.startsWith(MSH_SEG_START))
           {
               final String FIELD_SEPARATOR = "|";
               final String[] splitStr = splitTokens(line,FIELD_SEPARATOR); //splitTokens() is custom method to split string
               if(splitStr.length >= 12) { //check 12 required fields in MSH
                   word.set(splitStr[8]);
                   output.collect(word, one);
               }
           }
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer {
        public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(EventCount.class);
        conf.setJobName("EventCount");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

       conf.setMapperClass(Map.class);
       conf.setCombinerClass(Reduce.class);
       conf.setReducerClass(Reduce.class);

       conf.setInputFormat(TextInputFormat.class);
       conf.setOutputFormat(TextOutputFormat.class);

       FileInputFormat.setInputPaths(conf, new Path(args[0]));
       FileOutputFormat.setOutputPath(conf, new Path(args[1]));

       JobClient.runJob(conf);
    }
}

 

Let's run this example in Hadoop

Compile this program & create jar file

To compile this program you will have to add "hadoop-core-1.1.2.jar" file to your class path


Create a new Input directory in HDFS to put input HL7 files

hadoop fs -mkdir /home/hduser/hadoop/hl7eventcount/input

Copy HL7 file from your local machine to HDFS

hadoop fs -copyFromLocal /home/hduser/hl7_01.txt /home/hduser/hadoop/hl7eventcount/input/hl7_01.txt

Run this command to start MapReduce job

https://github.com/j4jayant/Hadoop-Examples/blob/master/HL7_EventCount/java/readme.txt

hadoop jar /home/hduser/eventcount.jar jayant.hadoop.mapreduce.HL7Test.EventCount /home/hduser/hadoop/hl7eventcount/input /home/hduser/hadoop/hl7eventcount/output

Output of this command will be something like this, please note that I have parsed a file with 60386 HL7 messages

13/07/08 15:57:32 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/07/08 15:57:33 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/07/08 15:57:33 WARN snappy.LoadSnappy: Snappy native library not loaded
13/07/08 15:57:33 INFO mapred.FileInputFormat: Total input paths to process : 1
13/07/08 15:57:34 INFO mapred.JobClient: Running job: job_201307081500_0006
13/07/08 15:57:35 INFO mapred.JobClient:  map 0% reduce 0%
13/07/08 15:57:48 INFO mapred.JobClient:  map 100% reduce 0%
13/07/08 15:57:57 INFO mapred.JobClient:  map 100% reduce 33%
13/07/08 15:57:58 INFO mapred.JobClient:  map 100% reduce 100%
13/07/08 15:58:00 INFO mapred.JobClient: Job complete: job_201307081500_0006
13/07/08 15:58:00 INFO mapred.JobClient: Counters: 30
13/07/08 15:58:00 INFO mapred.JobClient:   Job Counters 
13/07/08 15:58:00 INFO mapred.JobClient:     Launched reduce tasks=1
13/07/08 15:58:00 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=22864
13/07/08 15:58:00 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/07/08 15:58:00 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/07/08 15:58:00 INFO mapred.JobClient:     Launched map tasks=2
13/07/08 15:58:00 INFO mapred.JobClient:     Data-local map tasks=2
13/07/08 15:58:00 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=10388
13/07/08 15:58:00 INFO mapred.JobClient:   File Input Format Counters 
13/07/08 15:58:00 INFO mapred.JobClient:     Bytes Read=87014854
13/07/08 15:58:00 INFO mapred.JobClient:   File Output Format Counters 
13/07/08 15:58:00 INFO mapred.JobClient:     Bytes Written=130
13/07/08 15:58:00 INFO mapred.JobClient:   FileSystemCounters
13/07/08 15:58:00 INFO mapred.JobClient:     FILE_BYTES_READ=286
13/07/08 15:58:00 INFO mapred.JobClient:     HDFS_BYTES_READ=87015104
13/07/08 15:58:00 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=154658
13/07/08 15:58:00 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=130
13/07/08 15:58:00 INFO mapred.JobClient:   Map-Reduce Framework
13/07/08 15:58:00 INFO mapred.JobClient:     Map output materialized bytes=292
13/07/08 15:58:00 INFO mapred.JobClient:     Map input records=1075055
13/07/08 15:58:00 INFO mapred.JobClient:     Reduce shuffle bytes=292
13/07/08 15:58:00 INFO mapred.JobClient:     Spilled Records=40
13/07/08 15:58:00 INFO mapred.JobClient:     Map output bytes=724632
13/07/08 15:58:00 INFO mapred.JobClient:     Total committed heap usage (bytes)=459800576
13/07/08 15:58:00 INFO mapred.JobClient:     CPU time spent (ms)=10040
13/07/08 15:58:00 INFO mapred.JobClient:     Map input bytes=87014283
13/07/08 15:58:00 INFO mapred.JobClient:     SPLIT_RAW_BYTES=250
13/07/08 15:58:00 INFO mapred.JobClient:     Combine input records=60386
13/07/08 15:58:00 INFO mapred.JobClient:     Reduce input records=20
13/07/08 15:58:00 INFO mapred.JobClient:     Reduce input groups=11
13/07/08 15:58:00 INFO mapred.JobClient:     Combine output records=20
13/07/08 15:58:00 INFO mapred.JobClient:     Physical memory (bytes) snapshot=557441024
13/07/08 15:58:00 INFO mapred.JobClient:     Reduce output records=11
13/07/08 15:58:00 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=2116161536
13/07/08 15:58:00 INFO mapred.JobClient:     Map output records=60386

Run this command to see the output file

hadoop fs -cat /home/hduser/hadoop/hl7eventcount/output/part-00000

Output file will look like this

ADT^A01 6251
ADT^A02 2181
ADT^A03 2346
ADT^A04 5271
ADT^A05 248
ADT^A06 95
ADT^A07 1
ADT^A08 43607
ADT^A13 4
ADT^A17 366
ADT^A18 16

Now let's see the Python scripts

mapper.py

https://github.com/j4jayant/Hadoop-Examples/blob/master/HL7_EventCount/python/mapper.py

#!/usr/bin/python

import sys

strDelim = "|";
# input comes from STDIN (standard input)
for line in sys.stdin:
    #each line will be one segment of HL7 message
    # remove leading and trailing whitespace
    line = line.strip();

    # check if this one is MSH Segment
    if line.startswith('MSH|^~\&'):
        # split fields of segment
        words = line.split(strDelim);

        #check if all the 12 required fields in MSH are present
        if len(words) >= 12:
            event = words[8];
            print ('%s\t%s' % (event, 1));

reducer.py

https://github.com/j4jayant/Hadoop-Examples/blob/master/HL7_EventCount/python/reducer.py

#!/usr/bin/python

import sys

# maps words to their counts
word2count = {};

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip();

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1);
    # convert count (currently a string) to int
    try:
        count = int(count);
    except ValueError:
        continue;

    try:
        word2count[word] = word2count[word]+count;
    except:
        word2count[word] = count;

# write the tuples to stdout
# Note: they are unsorted
for word in word2count.keys():
    print ('%s\t%s'% ( word, word2count[word] ));

Run this command to start MapReduce job (change the file/directory locations). This one uses Hadoop Streaming.

https://github.com/j4jayant/Hadoop-Examples/blob/master/HL7_EventCount/python/readme.txt

hadoop jar /home/hduser/hadoop/contrib/streaming/hadoop-streaming-1.1.2.jar -file /home/hduser/python/eventcount/mapper.py -mapper ./mapper.py -file /home/hduser/python/eventcount/reducer.py -reducer ./reducer.py -input /home/hduser/hadoop/hl7eventcount/input/* -output /home/hduser/hadoop/hl7eventcount/output

Output of this command will be something like this, please note that I have parsed a file with 60386 HL7 messages

packageJobJar: [/home/hduser/python/eventcount/mapper.py, /home/hduser/python/eventcount/reducer.py, /home/hduser/tmp/hadoop-unjar1397608400024094301/] [] /tmp/streamjob3276271105870771026.jar tmpDir=null
13/07/09 17:54:08 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/07/09 17:54:08 WARN snappy.LoadSnappy: Snappy native library not loaded
13/07/09 17:54:08 INFO mapred.FileInputFormat: Total input paths to process : 1
13/07/09 17:54:08 INFO streaming.StreamJob: getLocalDirs(): [/home/hduser/tmp/mapred/local]
13/07/09 17:54:08 INFO streaming.StreamJob: Running job: job_201307091539_0013
13/07/09 17:54:08 INFO streaming.StreamJob: To kill this job, run:
13/07/09 17:54:08 INFO streaming.StreamJob: /home/hduser/hadoop/libexec/../bin/hadoop job  -Dmapred.job.tracker=localhost:54311 -kill job_201307091539_0013
13/07/09 17:54:08 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201307091539_0013
13/07/09 17:54:09 INFO streaming.StreamJob:  map 0%  reduce 0%
13/07/09 17:54:21 INFO streaming.StreamJob:  map 100%  reduce 0%
13/07/09 17:54:29 INFO streaming.StreamJob:  map 100%  reduce 33%
13/07/09 17:54:32 INFO streaming.StreamJob:  map 100%  reduce 100%
13/07/09 17:54:35 INFO streaming.StreamJob: Job complete: job_201307091539_0013
13/07/09 17:54:35 INFO streaming.StreamJob: Output: /home/hduser/hadoop/hl7eventcount/output

Another similar example would be to count different segments in HL7 file, here is the Mapper in java for this (without HL7 validations)

    public static class Map extends MapReduceBase implements Mapper {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

       public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {
           String line = value.toString();
           if(line.length() >= 4)
           {
              String segment = line.substring(0, 3);
              word.set(segment);
              output.collect(word, one);
           }
        }
    }