内容简介:企业中,由于领导们的要求,hive中有数据存储格式很多时候是会变的,比如为了优化将tsv,csv格式改为了parquet或者orcfile。那么这个时候假如是mr作业读取hive的表数据的话,我们又要重新去写mr并且重新部署。这个时候就很蛋疼。hcatalog帮我们解决了这个问题,有了它我们不用关心hive中数据的存储格式。详细信息请仔细阅读本文。本文主要是讲mapreduce使用HCatalog读写hive表。hcatalog使得hive的元数据可以很好的被其它hadoop工具使用,比如pig,mr和h
企业中,由于领导们的要求,hive中有数据存储格式很多时候是会变的,比如为了优化将tsv,csv格式改为了parquet或者orcfile。那么这个时候假如是mr作业读取hive的表数据的话,我们又要重新去写mr并且重新部署。这个时候就很蛋疼。hcatalog帮我们解决了这个问题,有了它我们不用关心hive中数据的存储格式。详细信息请仔细阅读本文。
本文主要是讲mapreduce使用HCatalog读写hive表。
hcatalog使得hive的元数据可以很好的被其它hadoop工具使用,比如pig,mr和hive。
HCatalog的表为用户提供了(HDFS)中数据的关系视图,并确保用户不必担心他们的数据存储在何处或采用何种格式,因此用户无需知道数据是否以RCFile格式存储, 文本文件或sequence 文件。
它还提供通知服务,以便在仓库中有新数据可用时通知工作流工具(如Oozie)。
HCatalog提供HCatInputFormat / HCatOutputFormat,使MapReduce用户能够在Hive的数据仓库中读/写数据。 它允许用户只读取他们需要的表和列的分区。 返回的记录格式是方便的列表格式,用户无需解析它们。
下面我们举个简单的例子。
在mapper类中,我们获取表schema并使用此schema信息来获取所需的列及其值。
下面是map类。
public class onTimeMapper extends Mapper { @Override protected void map(WritableComparable key, HCatRecord value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { // Get table schema HCatSchema schema = HCatBaseInputFormat.getTableSchema(context); Integer year = new Integer(value.getString("year", schema)); Integer month = new Integer(value.getString("month", schema)); Integer DayofMonth = value.getInteger("dayofmonth", schema); context.write(new IntPair(year, month), new IntWritable(DayofMonth)); } }
在reduce类中,会为将要写入hive表中的数据创建一个schema。
public class onTimeReducer extends Reducer { public void reduce (IntPair key, Iterable value, Context context) throws IOException, InterruptedException{ int count = 0; // records counter for particular year-month for (IntWritable s:value) { count++; } // define output record schema List columns = new ArrayList(3); columns.add(new HCatFieldSchema("year", HCatFieldSchema.Type.INT, "")); columns.add(new HCatFieldSchema("month", HCatFieldSchema.Type.INT, "")); columns.add(new HCatFieldSchema("flightCount", HCatFieldSchema.Type.INT,"")); HCatSchema schema = new HCatSchema(columns); HCatRecord record = new DefaultHCatRecord(3); record.setInteger("year", schema, key.getFirstInt()); record.set("month", schema, key.getSecondInt()); record.set("flightCount", schema, count); context.write(null, record); } }
最后,创建driver类,并且表明输入输出schema和表信息。
public class onTimeDriver extends Configured implements Tool{ private static final Log log = LogFactory.getLog( onTimeDriver.class ); public int run( String[] args ) throws Exception{ Configuration conf = new Configuration(); Job job = new Job(conf, "OnTimeCount"); job.setJarByClass(onTimeDriver.class); job.setMapperClass(onTimeMapper.class); job.setReducerClass(onTimeReducer.class); HCatInputFormat.setInput(job, "airline", "ontimeperf"); job.setInputFormatClass(HCatInputFormat.class); job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DefaultHCatRecord.class); job.setOutputFormatClass(HCatOutputFormat.class); HCatOutputFormat.setOutput(job, OutputJobInfo.create("airline", "flight_count", null)); HCatSchema s = HCatOutputFormat.getTableSchema(job); HCatOutputFormat.setSchema(job, s); return (job.waitForCompletion(true)? 0:1); } public static void main(String[] args) throws Exception{ int exitCode = ToolRunner.run(new onTimeDriver(), args); System.exit(exitCode); } }
当然,在跑上面写的代码之前,应该先在hive中创建输出表。
create table airline.flight_count (Year INT , Month INT , flightCount INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE;
可能会引起错误的地方是没有设置$HIVE_HOME.
[完]
欢迎点击 阅读原文 或 扫二维码 加入浪尖 知识星球 ,获取更多优质的大数据学习资源和指导。
推荐阅读:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 使用MyCat实现MySQL读写分离
- Mysql中间件应用之使用ProxySQL进行数据库读写分离
- .NET 使用 XPath 来读写 XML 文件(顺带解决 XML 命名空间的问题)
- 想用数据库“读写分离” 请先明白“读写分离”解决什么问题
- Java 读写锁浅析
- Golang文件读写
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Text Processing in Python
David Mertz / Addison-Wesley Professional / 2003-6-12 / USD 54.99
Text Processing in Python describes techniques for manipulation of text using the Python programming language. At the broadest level, text processing is simply taking textual information and doing som......一起来看看 《Text Processing in Python》 这本书的介绍吧!