如何给Apache Pig自定义UDF函数?

栏目: Apache · 发布时间: 7年前

内容简介:如何给Apache Pig自定义UDF函数?

近日由于工作所需,需要使用到Pig来分析线上的搜索日志数据,散仙本打算使用hive来分析的,但由于种种原因,没有用成,而Pig(pig0.12-cdh)散仙一直没有接触过,所以只能临阵磨枪了,花了两天时间,大致看完了pig官网的 文档 ,在看文档期间,也是边实战边学习,这样以来,对pig的学习,会更加容易,当然本篇不是介绍如何快速学好一门框架或语言的文章,正如标题所示,散仙打算介绍下如何在Pig中,使用用户自定义的UDF函数,关于学习经验,散仙会在后面的文章里介绍。

一旦你学会了UDF的使用,就意味着,你可以以更加灵活的方式来使用Pig,使它扩展一些为我们的业务场景定制的特殊功能,而这些功能,在通用的pig里是没有的,举个例子:

你从HDFS上读取的数据格式,如果使用默认的PigStorage()来加载,存储可能只支持有限的数据编码和类型,如果我们定义了一种特殊的编码存储或序列化方式,那么当我们使用默认的Pig来加载的时候,就会发现加载不了,这时候我们的UDF就派上用场了,我们只需要自定义一个LoadFunction和一个StoreFunction就可以解决,这种问题。

本篇散仙根据官方文档的例子,来实战一下,并在hadoop集群上使用Pig测试通过:

我们先来看下定义一个UDF扩展类,需要几个步骤:

序号 步骤 说明
1 在eclipse里新建一个 java 工程,并导入pig的核心包 java项目
2 新建一个包,继承特定的接口或类,重写自定义部分 核心业务
3 编写完成后,使用ant打包成jar 编译时需要pig依赖,但不用把pig的jar包打入UDF中
4 把打包完成后的jar上传到HDFS上 pig运行时候需要加载使用
5 在pig脚本里,注册我们自定义的udf的jar包 注入运行时环境
6 编写我们的核心业务pig脚本运行 测试是否运行成功

项目工程截图如下:

如何给Apache Pig自定义UDF函数?

核心代码如下:

package com.pigudf;

import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.WrappedIOException;
/**
 * 自定义UDF类,对字符串转换大写
 * @author qindongliang
 * */
public class MyUDF extends EvalFunc<String> {

	@Override
	public String exec(Tuple input) throws IOException {
		
		 //判断是否为null或空,就跳过
		if(input==null||input.size()==0){
			return null;
		}
		try{
			//获取第一个元素
			String str=(String) input.get(0);
			//转成大写返回
			return str.toUpperCase();
			
		}catch(Exception e){
			throw WrappedIOException.wrap("Caught exception processing input row ",e);
		}
	}
	

}

关于打包的ant脚本,散仙会在文末上传附件,下面看下造的一些测试数据(注意,文件一定要上传到HDFS上,除非你是local模式):

grunt> cat s.txt
zhang san,12
Song,34
long,34
abC,12
grunt>

我们在看下,操作文件和jar包是放在一起的:

grunt> ls
hdfs://dnode1:8020/tmp/udf/pudf.jar<r 3>        1295
hdfs://dnode1:8020/tmp/udf/s.txt<r 3>   36
grunt>

最后,我们看下pig脚本的定义:

--注册自定义的jar包
REGISTER pudf.jar; 
--加载测试文件的数据,逗号作为分隔符
a = load 's.txt' using PigStorage(',');   
--遍历数据,对name列转成大写
b =  foreach a generate com.pigudf.MyUDF((chararray)$0); 
--启动MapReduce的Job进行数据分析
dump b

最后,我们看下结果,只要过程不出现异常和任务失败,就证明我们的udf使用成功:

Counters:
Total records written : 4
Total bytes written : 64
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_1419419533357_0147


2014-12-30 18:10:24,394 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2014-12-30 18:10:24,395 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2014-12-30 18:10:24,396 [main] INFO  org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.
2014-12-30 18:10:24,405 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2014-12-30 18:10:24,405 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
(ZHANG SAN,12)
(SONG,34)
(LONG,34)
(ABC,12)

结果没问题,我们的UDF加载执行成功,如果我们还想将我们的输出结果直接写入到HDFS上,可以在pig脚本的末尾,去掉dump命令,加入

store e into '/tmp/dongliang/result/'; 将结果存储到HDFS上,当然我们可以自定义存储函数,将结果写入数据库,Lucene,Hbase等关系型或一些NOSQL数据库里。

欢迎大家关注微信公众号:我是工程师(woshigcs),更多与攻城师息息相关的内容,尽在此处。

二维码扫描:

如何给Apache Pig自定义UDF函数?

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Trading and Exchanges

Trading and Exchanges

Larry Harris / Oxford University Press, USA / 2002-10-24 / USD 95.00

This book is about trading, the people who trade securities and contracts, the marketplaces where they trade, and the rules that govern it. Readers will learn about investors, brokers, dealers, arbit......一起来看看 《Trading and Exchanges》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码