Tuesday, 9 July 2013

HL7 Analysis with Hadoop!


My first encounter with Hadoop, Bigdata, Ubuntu, Java & Python! Learing new things is a real fun.

I have extended very popular WordCount example & implemented a Hadoop MapReduce job to count different Trigger Events from a file containing HL7 messages. I have also provided this MapReduce script in java & python with this article.
Here is a breif background of my research to encourage people to start exploring Hadoop & Bigdata

 I had some free time recently when I was recovering from a viral fever. I thought of reading some new stuff during this time. I had heard about Hadoop & Bigdata a lot but never had a chance to look into these.  
 I searched a bit on google to find some ways to do hands on. I am a .NET developer so I tried to find out if I can use Hadoop in .NET & found that Microsoft is on the way with HDInsight on Windows Azure. But after some analysis I decided to look alternatives & found Hortonworks provides ready to use appliances for VirtualBox & VMWare.  
 I downloaded VirtualBox appliance. After installing & configuring the appliance I figured out that my old sony vaio laptop with T6400 processor doesn't support Intel Virtualization.  
 I then looked some other alternatives & found some posts about installing Hadoop on Ubuntu. I remember Ubuntu from my college experience. So I downloaded & installed Ubuntu 13.04 on my laptop. I didn't know what is LTS(long term support) at that time and downloaded most recent release v13.04 :) but it's working fine. I suppose LTS is same as current stable release, it is 12.04 for Ubuntu.  
 Wow! Ubuntu installation was as simple as Windows 7. Even Hadoop 1.1.2 (current stable release) configuration was quite simple. Luckily I didn't face many problems during configuration.  
 http://www.michael-noll.com/tutorials/running-hadoop-on-ubuntu-linux-single-node-cluster/  
 Finally, after some serious exercise I was ready with my single node Hadoop cluster on Ubuntu.  
 So let's see what I did with Bigdata on Hadoop (although for this example & on single node Hadoop cluster I have used a small data :) i.e. an HL7 file with 60386 messages)  
 I first ran the WordCount example available with Hadoop Setup & then implemented it by looking at source code. I was able to install Eclipse IDE on Ubuntu & run small java programs with few minor problems. Some of the things that I had to search on google were how to add CLASSPATH in Eclipse, how to create JAR files in Eclipse etc.  
 Then I decide to extend this word count example to analyze a file with HL7 messages & count different trigger events from MSH segment (MSH_9).  
 I was able to successfully run this & them implemented same for Segment count from file with HL7 messages.  


So here is the java code, I haven't included any validation on HL7 message
EventCount.java
https://github.com/j4jayant/Hadoop-Examples/blob/master/HL7_EventCount/java/EventCount.java

 package jayant.hadoop.mapreduce.HL7Test;  
 import org.apache.hadoop.fs.Path;  
 import org.apache.hadoop.io.*;  
 import org.apache.hadoop.mapred.*;  
 import java.io.IOException;  
 import java.util.*;  
 public class EventCount {  
   public static class Map extends MapReduceBase implements Mapper {  
     private final static IntWritable one = new IntWritable(1);  
     private Text word = new Text();  
     final String MSH_SEG_START = "MSH|^~\\&";  
     public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {  
       String line = value.toString();  
       if(line.startsWith(MSH_SEG_START))  
       {  
         final String FIELD_SEPARATOR = "|";  
         final String[] splitStr = splitTokens(line,FIELD_SEPARATOR); //splitTokens() is custom method to split string  
         if(splitStr.length >= 12) { //check 12 required fields in MSH  
           word.set(splitStr[8]);  
           output.collect(word, one);  
         }  
       }  
     }  
   }  
   public static class Reduce extends MapReduceBase implements Reducer {  
     public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {  
       int sum = 0;  
       while (values.hasNext()) {  
         sum += values.next().get();  
       }  
       output.collect(key, new IntWritable(sum));  
     }  
   }  
   public static void main(String[] args) throws Exception {  
     JobConf conf = new JobConf(EventCount.class);  
     conf.setJobName("EventCount");  
     conf.setOutputKeyClass(Text.class);  
     conf.setOutputValueClass(IntWritable.class);  
     conf.setMapperClass(Map.class);  
     conf.setCombinerClass(Reduce.class);  
     conf.setReducerClass(Reduce.class);  
     conf.setInputFormat(TextInputFormat.class);  
     conf.setOutputFormat(TextOutputFormat.class);  
     FileInputFormat.setInputPaths(conf, new Path(args[0]));  
     FileOutputFormat.setOutputPath(conf, new Path(args[1]));  
     JobClient.runJob(conf);  
   }  
 }  

Let's run this example in Hadoop
Compile this program & create jar file
To compile this program you will have to add "hadoop-core-1.1.2.jar" file to your class path

Create a new Input directory in HDFS to put input HL7 files
 hadoop fs -mkdir /home/hduser/hadoop/hl7eventcount/input  

Copy HL7 file from your local machine to HDFS
 hadoop fs -copyFromLocal /home/hduser/hl7_01.txt /home/hduser/hadoop/hl7eventcount/input/hl7_01.txt  

Run this command to start MapReduce job
https://github.com/j4jayant/Hadoop-Examples/blob/master/HL7_EventCount/java/readme.txt
hadoop jar /home/hduser/eventcount.jar jayant.hadoop.mapreduce.HL7Test.EventCount /home/hduser/hadoop/hl7eventcount/input /home/hduser/hadoop/hl7eventcount/output
Output of this command will be something like this, please note that I have parsed a file with 60386 HL7 messages
 13/07/08 15:57:32 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
 13/07/08 15:57:33 INFO util.NativeCodeLoader: Loaded the native-hadoop library  
 13/07/08 15:57:33 WARN snappy.LoadSnappy: Snappy native library not loaded  
 13/07/08 15:57:33 INFO mapred.FileInputFormat: Total input paths to process : 1  
 13/07/08 15:57:34 INFO mapred.JobClient: Running job: job_201307081500_0006  
 13/07/08 15:57:35 INFO mapred.JobClient: map 0% reduce 0%  
 13/07/08 15:57:48 INFO mapred.JobClient: map 100% reduce 0%  
 13/07/08 15:57:57 INFO mapred.JobClient: map 100% reduce 33%  
 13/07/08 15:57:58 INFO mapred.JobClient: map 100% reduce 100%  
 13/07/08 15:58:00 INFO mapred.JobClient: Job complete: job_201307081500_0006  
 13/07/08 15:58:00 INFO mapred.JobClient: Counters: 30  
 13/07/08 15:58:00 INFO mapred.JobClient:  Job Counters   
 13/07/08 15:58:00 INFO mapred.JobClient:   Launched reduce tasks=1  
 13/07/08 15:58:00 INFO mapred.JobClient:   SLOTS_MILLIS_MAPS=22864  
 13/07/08 15:58:00 INFO mapred.JobClient:   Total time spent by all reduces waiting after reserving slots (ms)=0  
 13/07/08 15:58:00 INFO mapred.JobClient:   Total time spent by all maps waiting after reserving slots (ms)=0  
 13/07/08 15:58:00 INFO mapred.JobClient:   Launched map tasks=2  
 13/07/08 15:58:00 INFO mapred.JobClient:   Data-local map tasks=2  
 13/07/08 15:58:00 INFO mapred.JobClient:   SLOTS_MILLIS_REDUCES=10388  
 13/07/08 15:58:00 INFO mapred.JobClient:  File Input Format Counters   
 13/07/08 15:58:00 INFO mapred.JobClient:   Bytes Read=87014854  
 13/07/08 15:58:00 INFO mapred.JobClient:  File Output Format Counters   
 13/07/08 15:58:00 INFO mapred.JobClient:   Bytes Written=130  
 13/07/08 15:58:00 INFO mapred.JobClient:  FileSystemCounters  
 13/07/08 15:58:00 INFO mapred.JobClient:   FILE_BYTES_READ=286  
 13/07/08 15:58:00 INFO mapred.JobClient:   HDFS_BYTES_READ=87015104  
 13/07/08 15:58:00 INFO mapred.JobClient:   FILE_BYTES_WRITTEN=154658  
 13/07/08 15:58:00 INFO mapred.JobClient:   HDFS_BYTES_WRITTEN=130  
 13/07/08 15:58:00 INFO mapred.JobClient:  Map-Reduce Framework  
 13/07/08 15:58:00 INFO mapred.JobClient:   Map output materialized bytes=292  
 13/07/08 15:58:00 INFO mapred.JobClient:   Map input records=1075055  
 13/07/08 15:58:00 INFO mapred.JobClient:   Reduce shuffle bytes=292  
 13/07/08 15:58:00 INFO mapred.JobClient:   Spilled Records=40  
 13/07/08 15:58:00 INFO mapred.JobClient:   Map output bytes=724632  
 13/07/08 15:58:00 INFO mapred.JobClient:   Total committed heap usage (bytes)=459800576  
 13/07/08 15:58:00 INFO mapred.JobClient:   CPU time spent (ms)=10040  
 13/07/08 15:58:00 INFO mapred.JobClient:   Map input bytes=87014283  
 13/07/08 15:58:00 INFO mapred.JobClient:   SPLIT_RAW_BYTES=250  
 13/07/08 15:58:00 INFO mapred.JobClient:   Combine input records=60386  
 13/07/08 15:58:00 INFO mapred.JobClient:   Reduce input records=20  
 13/07/08 15:58:00 INFO mapred.JobClient:   Reduce input groups=11  
 13/07/08 15:58:00 INFO mapred.JobClient:   Combine output records=20  
 13/07/08 15:58:00 INFO mapred.JobClient:   Physical memory (bytes) snapshot=557441024  
 13/07/08 15:58:00 INFO mapred.JobClient:   Reduce output records=11  
 13/07/08 15:58:00 INFO mapred.JobClient:   Virtual memory (bytes) snapshot=2116161536  
 13/07/08 15:58:00 INFO mapred.JobClient:   Map output records=60386  

Run this command to see the output file
 hadoop fs -cat /home/hduser/hadoop/hl7eventcount/output/part-00000  
Output file will look like this
 ADT^A01 6251  
 ADT^A02 2181  
 ADT^A03 2346  
 ADT^A04 5271  
 ADT^A05 248  
 ADT^A06 95  
 ADT^A07 1  
 ADT^A08 43607  
 ADT^A13 4  
 ADT^A17 366  
 ADT^A18 16  

Now let's see the Python scripts
mapper.py
https://github.com/j4jayant/Hadoop-Examples/blob/master/HL7_EventCount/python/mapper.py
 #!/usr/bin/python  
 import sys  
 strDelim = "|";  
 # input comes from STDIN (standard input)  
 for line in sys.stdin:  
   #each line will be one segment of HL7 message  
   # remove leading and trailing whitespace  
   line = line.strip();  
   # check if this one is MSH Segment  
   if line.startswith('MSH|^~\&'):  
     # split fields of segment  
     words = line.split(strDelim);  
     #check if all the 12 required fields in MSH are present  
     if len(words) >= 12:  
       event = words[8];  
       print ('%s\t%s' % (event, 1));  

reducer.py
https://github.com/j4jayant/Hadoop-Examples/blob/master/HL7_EventCount/python/reducer.py
 #!/usr/bin/python  
 import sys  
 # maps words to their counts  
 word2count = {};  
 # input comes from STDIN  
 for line in sys.stdin:  
   # remove leading and trailing whitespace  
   line = line.strip();  
   # parse the input we got from mapper.py  
   word, count = line.split('\t', 1);  
   # convert count (currently a string) to int  
   try:  
     count = int(count);  
   except ValueError:  
     continue;  
   try:  
     word2count[word] = word2count[word]+count;  
   except:  
     word2count[word] = count;  
 # write the tuples to stdout  
 # Note: they are unsorted  
 for word in word2count.keys():  
   print ('%s\t%s'% ( word, word2count[word] ));  

Run this command to start MapReduce job (change the file/directory locations). This one uses Hadoop Streaming.
https://github.com/j4jayant/Hadoop-Examples/blob/master/HL7_EventCount/python/readme.txt
 hadoop jar /home/hduser/hadoop/contrib/streaming/hadoop-streaming-1.1.2.jar -file /home/hduser/python/eventcount/mapper.py -mapper ./mapper.py -file /home/hduser/python/eventcount/reducer.py -reducer ./reducer.py -input /home/hduser/hadoop/hl7eventcount/input/* -output /home/hduser/hadoop/hl7eventcount/output  

Output of this command will be something like this, please note that I have parsed a file with 60386 HL7 messages
 packageJobJar: [/home/hduser/python/eventcount/mapper.py, /home/hduser/python/eventcount/reducer.py, /home/hduser/tmp/hadoop-unjar1397608400024094301/] [] /tmp/streamjob3276271105870771026.jar tmpDir=null  
 13/07/09 17:54:08 INFO util.NativeCodeLoader: Loaded the native-hadoop library  
 13/07/09 17:54:08 WARN snappy.LoadSnappy: Snappy native library not loaded  
 13/07/09 17:54:08 INFO mapred.FileInputFormat: Total input paths to process : 1  
 13/07/09 17:54:08 INFO streaming.StreamJob: getLocalDirs(): [/home/hduser/tmp/mapred/local]  
 13/07/09 17:54:08 INFO streaming.StreamJob: Running job: job_201307091539_0013  
 13/07/09 17:54:08 INFO streaming.StreamJob: To kill this job, run:  
 13/07/09 17:54:08 INFO streaming.StreamJob: /home/hduser/hadoop/libexec/../bin/hadoop job -Dmapred.job.tracker=localhost:54311 -kill job_201307091539_0013  
 13/07/09 17:54:08 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201307091539_0013  
 13/07/09 17:54:09 INFO streaming.StreamJob: map 0% reduce 0%  
 13/07/09 17:54:21 INFO streaming.StreamJob: map 100% reduce 0%  
 13/07/09 17:54:29 INFO streaming.StreamJob: map 100% reduce 33%  
 13/07/09 17:54:32 INFO streaming.StreamJob: map 100% reduce 100%  
 13/07/09 17:54:35 INFO streaming.StreamJob: Job complete: job_201307091539_0013  
 13/07/09 17:54:35 INFO streaming.StreamJob: Output: /home/hduser/hadoop/hl7eventcount/output  

Another similar example would be to count different segments in HL7 file, here is the Mapper in java for this (without HL7 validations)
 public static class Map extends MapReduceBase implements Mapper {  
     private final static IntWritable one = new IntWritable(1);  
     private Text word = new Text();  
     public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {  
       String line = value.toString();  
       if(line.length() >= 4)  
       {  
        String segment = line.substring(0, 3);  
        word.set(segment);  
        output.collect(word, one);  
       }  
     }  
   }  

Sample Immunization Records Blockchain using Hyperledger Composer

This is basic and sample Blockchain implementation of Immunization Records using Hyperledger Composer.  Source Code available at:  https...