Parquet
Parquet project is published by Twitter and Cloudera, which is an open-source cloumnar storage format library for Apache Hadoop. Column storage have many priorities, for example, useless information can be throwed away and compression algo can be used for storage since uniform structure. You can get more information in Parquet Website and Cloudera blog . Here, no more desciption about what is Parquet, or how effective to store data.
Currently, parquet provides a state of the art columnar storage layer that can be taken advantage of by existing Hadoop frameworks, and can enable a new generation of Hadoop data processing architectures such as Impala, Drill, and parts of the Hive ‘Stinger’ initiative.
MR task read/write parquet file
write parquet file
We suppose to use the simplest mr task with no reducer.
1.set input path and input format
TextInputFormat.addInputPath(writeToParquetJob, inputPath);
writeToParquetJob.setInputFormatClass(TextInputFormat.class);
writeToParquetJob.setMapperClass(ParquetMapper.class);
2.define ParquetMapper
public static class ParquetMapper extends Mapper<LongWritable, Text, Void, Person> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] array = value.toString().split("\t");
if (array.length != 2) return;
Person person = new Person();
person.setId(Long.parseLong(array[0]));
person.setName(array[1]);
context.write(null, person);
}
}
3.set output format with thrift clas and output path
writeToParquetJob.setNumReduceTasks(0);
writeToParquetJob.setOutputFormatClass(ParquetThriftOutputFormat.class);
ParquetThriftOutputFormat.setCompression(writeToParquetJob, CompressionCodecName.GZIP);
ParquetThriftOutputFormat.setOutputPath(writeToParquetJob, parquetPath);
ParquetThriftOutputFormat.setThriftClass(writeToParquetJob, Person.class);
4.run mapreduce task
boolean ret = writeToParquetJob.waitForCompletion(true);
read parquet file
1.set input path and input format
MultipleInputs.addInputPath(readParquetJob, parquetPath, ParquetThriftInputFormat.class, ParquetReader.class);
2.define ParquetReader
public static class ParquetReader extends Mapper<Void, Person, LongWritable, Text> {
public void map(Void key, Person value, Context context) throws IOException, InterruptedException {
context.write(new LongWritable(value.getId()), new Text(value.getName()));
}
}
3.set output format and output path
readParquetJob.setNumReduceTasks(0);
readParquetJob.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(readParquetJob, outputPath);
4.run mapreduce task
boolean ret = readParquetJob.waitForCompletion(true);
In addition, thrift class Person is defined as follows:
struct Person {
1:optional i64 id;
2:optional string name;
}