05 May 2015

Playing with hadoop/mapreduce and htsjdk/VCF : my notebook.

The aim of this test is to get a count of each type of variant/genotypes in a VCF file using Apache Hadoop and the java library for NGS htsjdk. My source code is available at: https://github.com/lindenb/hadoop-sandbox/blob/master/src/main/java/com/github/lindenb/hadoop/Test.java.

First, and this is my main problem, I needed to create a class 'VcfRow' that would contains the whole data about a variant. As I need to keep the information about all the semantics in the VCF header, each record contains the whole VCF header (!). I asked SO if there was an elegant way to save the header in the hadoop workflow but it currently seems that there is no such solution (http://stackoverflow.com/questions/30052859/hadoop-mapreduce-handling-a-text-file-with-a-header). This class
VcfRow must implement WritableComparable to be serialized by the hadoop pipeline. It's awfully sloooooow since we need to parse a htsjdk.variant.vcf.VCFHeader and a htsjdk.variant.vcf.VCFCodec for each new variant.

public static class VcfRow
implements WritableComparable<VcfRow>
 {
 private List<String> headerLines;
 private String line;
 private VariantContext ctx=null;
 private VCFHeader header =null;
 private VCFCodec codec=new VCFCodec();
 public VcfRow()
   {
 this.headerLines = Collections.emptyList();
 this.line="";
   }
 public VcfRow(List<String> headerLines,String line)
  {
 this.headerLines=headerLines; 
 this.line=line;
  }
 
@Override
public void write(DataOutput out) throws IOException
 {
 out.writeInt(this.headerLines.size());
 for(int i=0;i< this.headerLines.size();++i)
  {
  out.writeUTF(this.headerLines.get(i));
  }
 byte array[]=line.getBytes();
 out.writeInt(array.length);
 out.write(array);
 }

@Override
public void readFields(DataInput in) throws IOException
 {
 int n= in.readInt();
 this.headerLines=new ArrayList<String>(n);
 for(int i=0;i<n;++i) this.headerLines.add(in.readUTF());
 n = in.readInt();
 byte array[]=new byte[n];
 in.readFully(array);
 this.line=new String(array);
 this.codec=new VCFCodec();
 this.ctx=null;
 this.header=null;
 }

public VCFHeader getHeader()
 {
 if(this.header==null)
  {
  this.header = (VCFHeader)this.codec.readActualHeader(new MyLineIterator());
  }
 return this.header;
 }

public VariantContext getVariantContext()
 {
 if(this.ctx==null)
  {
  if(this.header==null) getHeader();//force decode header
  this.ctx=this.codec.decode(this.line);
  }
 return this.ctx;
 }

@Override
public int compareTo(VcfRow o)
 {
 int i = this.getVariantContext().getContig().compareTo(o.getVariantContext().getContig());
 if(i!=0) return i;
 i = this.getVariantContext().getStart() - o.getVariantContext().getStart();
 if(i!=0) return i;
 i =  this.getVariantContext().getReference().compareTo( o.getVariantContext().getReference());
 if(i!=0) return i;
 return this.line.compareTo(o.line);
 }

   private  class MyLineIterator
 extends AbstractIterator<String>
 implements LineIterator
 { 
 int index=0;
 @Override
 protected String advance()
  {
  if(index>= headerLines.size()) return null;
  return headerLines.get(index++);
  }
 }
}

Then a special InputFormat is created for the VCF format. As we need to keep a trace of the Header, this file declares `isSplitable==false`. The class VcfInputFormat creates an instance of RecordReader reading the whole VCF header the first time it is invoked with the method `initialize`. This 'VcfRecordReader' creates a new VcfRow for each line.

public static class VcfInputFormat extends FileInputFormat<LongWritable, VcfRow>
   {
 private List<String> headerLines=new ArrayList<String>();
 
 @Override
 public RecordReader<LongWritable, VcfRow> createRecordReader(InputSplit split,
   TaskAttemptContext context) throws IOException,
   InterruptedException {
  return new VcfRecordReader();
  }  
 @Override
 protected boolean isSplitable(JobContext context, Path filename) {
  return false;
  }
  
 //LineRecordReader
  private class VcfRecordReader extends RecordReader<LongWritable, VcfRow>
    {
  private LineRecordReader delegate=new LineRecordReader();
  public VcfRecordReader() throws IOException
    {
    }
  
   @Override
  public void initialize(InputSplit genericSplit,
    TaskAttemptContext context) throws IOException {
    delegate.initialize(genericSplit, context);
   while( delegate.nextKeyValue())
    {
    String row = delegate.getCurrentValue().toString();
    if(!row.startsWith("#")) throw new IOException("Bad VCF header");
    headerLines.add(row);
    if(row.startsWith("#CHROM")) break;
    }
    }
   @Override
  public LongWritable getCurrentKey() throws IOException,
    InterruptedException {
   return delegate.getCurrentKey();
    }
   
   @Override
  public VcfRow getCurrentValue() throws IOException,
    InterruptedException {
   Text row = this.delegate.getCurrentValue();
   return new VcfRow(headerLines,row.toString());
    }
   
   @Override
  public float getProgress() throws IOException, InterruptedException {
   return this.delegate.getProgress();
    }
   
   @Override
  public boolean nextKeyValue() throws IOException,
    InterruptedException {
   return this.delegate.nextKeyValue();
    }
   
   @Override
  public void close() throws IOException {
    delegate.close();
   }
     }
   }

The hadoop mapper uses the information of each VCFrow and produce a count of each category:
public static class VariantMapper
   extends Mapper<LongWritable, VcfRow, Text, IntWritable>{

 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();

 public void map(LongWritable key, VcfRow vcfRow, Context context ) throws IOException, InterruptedException {
  VariantContext ctx = vcfRow.getVariantContext();
  if( ctx.isIndel())
   { 
   word.set("ctx_indel");
      context.write(word, one);
   }
  if( ctx.isBiallelic())
   { 
   word.set("ctx_biallelic");
      context.write(word, one);
   }
  if( ctx.isSNP())
   { 
   word.set("ctx_snp");
   context.write(word, one);
   } 
  if( ctx.hasID())
   { 
   word.set("ctx_id");
   context.write(word, one);
   } 
  word.set("ctx_total");
  context.write(word, one);
 
  for(String sample: vcfRow.getHeader().getSampleNamesInOrder())
   {
   Genotype g =vcfRow.getVariantContext().getGenotype(sample);
   word.set(sample+" "+ctx.getType()+" "+g.getType().name());
   context.write(word, one);
   }

  }
 }

The Reducer computes the sum of each category:
public static class IntSumReducer
   extends Reducer<Text,IntWritable,Text,IntWritable> {
 private IntWritable result = new IntWritable();

 public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException {
   int sum = 0;
   for (IntWritable val : values) {
  sum += val.get();
    }
   result.set(sum);
   context.write(key, result);
 }
}

and here is the main program:
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "snp count");
    job.setJarByClass(Test.class);
    job.setMapperClass(VariantMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    Path inputPath=new Path(args[0]);
    job.setInputFormatClass(VcfInputFormat.class);
    FileInputFormat.addInputPath(job, inputPath);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

Download, compile, Run:
lindenb@hardyweinberg:~/src/hadoop-sandbox$ make -Bn
rm -rf hadoop-2.7.0
curl -L -o hadoop-2.7.0.tar.gz "http://apache.spinellicreations.com/hadoop/common/hadoop-2.7.0/hadoop-2.7.0.tar.gz"
tar xvfz hadoop-2.7.0.tar.gz
rm hadoop-2.7.0.tar.gz
touch -c hadoop-2.7.0/bin/hadoop
rm -rf htsjdk-1.130
curl -L -o 1.130.tar.gz "https://github.com/samtools/htsjdk/archive/1.130.tar.gz"
tar xvfz 1.130.tar.gz
rm 1.130.tar.gz
(cd htsjdk-1.130 && ant )
mkdir -p tmp dist
javac -d tmp -cp hadoop-2.7.0/share/hadoop/common/hadoop-common-2.7.0.jar:hadoop-2.7.0/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.0.jar:hadoop-2.7.0/share/hadoop/common/lib/hadoop-annotations-2.7.0.jar:hadoop-2.7.0/share/hadoop/common/lib/log4j-1.2.17.jar:htsjdk-1.130/dist/commons-logging-1.1.1.jar:htsjdk-1.130/dist/htsjdk-1.130.jar:htsjdk-1.130/dist/commons-jexl-2.1.1.jar:htsjdk-1.130/dist/snappy-java-1.0.3-rc3.jar -sourcepath src/main/java src/main/java/com/github/lindenb/hadoop/Test.java 
jar cvf dist/test01.jar -C tmp .
rm -rf tmp
mkdir -p input
curl -o input/CEU.exon.2010_09.genotypes.vcf.gz "ftp://ftp-trace.ncbi.nih.gov/1000genomes/ftp/pilot_data/paper_data_sets/a_map_of_human_variation/exon/snps/CEU.exon.2010_09.genotypes.vcf.gz"
gunzip -f input/CEU.exon.2010_09.genotypes.vcf.gz
rm -rf output
HADOOP_CLASSPATH=htsjdk-1.130/dist/commons-logging-1.1.1.jar:htsjdk-1.130/dist/htsjdk-1.130.jar:htsjdk-1.130/dist/commons-jexl-2.1.1.jar:htsjdk-1.130/dist/snappy-java-1.0.3-rc3.jar hadoop-2.7.0/bin/hadoop jar dist/test01.jar com.github.lindenb.hadoop.Test \
   input/CEU.exon.2010_09.genotypes.vcf output
cat output/*

Here is the output of the last command:

15/05/05 17:18:34 INFO input.FileInputFormat: Total input paths to process : 1
15/05/05 17:18:34 INFO mapreduce.JobSubmitter: number of splits:1
15/05/05 17:18:34 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1186897577_0001
15/05/05 17:18:34 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
15/05/05 17:18:34 INFO mapreduce.Job: Running job: job_local1186897577_0001
15/05/05 17:18:34 INFO mapred.LocalJobRunner: OutputCommitter set in config null
15/05/05 17:18:34 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
15/05/05 17:18:34 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
15/05/05 17:18:34 INFO mapred.LocalJobRunner: Waiting for map tasks
15/05/05 17:18:34 INFO mapred.LocalJobRunner: Starting task: attempt_local1186897577_0001_m_000000_0
15/05/05 17:18:34 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
15/05/05 17:18:34 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
15/05/05 17:18:34 INFO mapred.MapTask: Processing split: file:/home/lindenb/src/hadoop-sandbox/input/CEU.exon.2010_09.genotypes.vcf:0+2530564
15/05/05 17:18:34 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
15/05/05 17:18:34 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
15/05/05 17:18:34 INFO mapred.MapTask: soft limit at 83886080
15/05/05 17:18:34 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
15/05/05 17:18:34 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
15/05/05 17:18:34 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
15/05/05 17:18:35 INFO mapreduce.Job: Job job_local1186897577_0001 running in uber mode : false
15/05/05 17:18:35 INFO mapreduce.Job:  map 0% reduce 0%
15/05/05 17:18:36 INFO mapred.LocalJobRunner: 
15/05/05 17:18:36 INFO mapred.MapTask: Starting flush of map output
15/05/05 17:18:36 INFO mapred.MapTask: Spilling map output
15/05/05 17:18:36 INFO mapred.MapTask: bufstart = 0; bufend = 7563699; bufvoid = 104857600
15/05/05 17:18:36 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 24902536(99610144); length = 1311861/6553600
15/05/05 17:18:38 INFO mapred.MapTask: Finished spill 0
15/05/05 17:18:38 INFO mapred.Task: Task:attempt_local1186897577_0001_m_000000_0 is done. And is in the process of committing
(...)
NA12843 SNP HOM_REF 2515
NA12843 SNP HOM_VAR 242
NA12843 SNP NO_CALL 293
NA12872 SNP HET 394
NA12872 SNP HOM_REF 2282
NA12872 SNP HOM_VAR 188
NA12872 SNP NO_CALL 625
NA12873 SNP HET 336
NA12873 SNP HOM_REF 2253
NA12873 SNP HOM_VAR 184
NA12873 SNP NO_CALL 716
NA12874 SNP HET 357
NA12874 SNP HOM_REF 2395
NA12874 SNP HOM_VAR 229
NA12874 SNP NO_CALL 508
NA12878 SNP HET 557
NA12878 SNP HOM_REF 2631
NA12878 SNP HOM_VAR 285
NA12878 SNP NO_CALL 16
NA12889 SNP HET 287
NA12889 SNP HOM_REF 2110
NA12889 SNP HOM_VAR 112
NA12889 SNP NO_CALL 980
NA12890 SNP HET 596
NA12890 SNP HOM_REF 2587
NA12890 SNP HOM_VAR 251
NA12890 SNP NO_CALL 55
NA12891 SNP HET 609
NA12891 SNP HOM_REF 2591
NA12891 SNP HOM_VAR 251
NA12891 SNP NO_CALL 38
NA12892 SNP HET 585
NA12892 SNP HOM_REF 2609
NA12892 SNP HOM_VAR 236
NA12892 SNP NO_CALL 59
ctx_biallelic 3489
ctx_id 3489
ctx_snp 3489
ctx_total 3489


that's it,
Pierre

1 comment:

Unknown said...

To use a count of each type of variant/genotypes is a very cool way. But how use it correctly is a deep genotyping analysis that needs us to do first.