内容简介:Apache Pig简介与实践
Apache Pig是一个用来分析大数据集的平台,它由两部分组成:一部分是用于表达数据分析程序的高级脚本语言,另一部分是用于评估分析程序的基本工具。目前来看,Pig主要用于离线数据的批量处理应用场景,但是随着Pig的发展处理数据的速度会不断地提升,这可能依赖于Pig底层的执行引擎。比如,Pig通过指定执行模式,可以使用Hadoop的MapReduce计算引擎来实现数据处理,也可以使用基于Tez的计算引擎来实现(Tez是为了绕开MapReduce多阶段Job写磁盘而设计的DAG计算引擎,性能应该比MapReduce要快),看到Pig未来的发展路线图,以后可能会基于Storm或Spark计算平台实现底层计算引擎,那样速度会有极大地提升。我们基于最新的0.15.0版本的Pig(Hadoop使用的是2.2.0版本),通过编写一些例子脚本来实践Pig的语言特性。
Pig安装与执行
Pig安装非常简单,只需要下载Pig包,然后解压缩即可:
wget http://mirror.bit.edu.cn/apache/pig/pig-0.15.0/pig-0.15.0.tar.gz tar xvzf pig-0.15.0.tar.gz sudo ln -s /usr/local/pig-0.15.0 /usr/local/pig cd /usr/local/pig bin/pig -x mapreduce
如果希望直接使用pig命令,可以修改环境变量文件~/.bashrc,增加如下配置:
export PIG_HOME=/usr/local/pig export PATH=$PATH:$PIG_HOME/bin
使变量配置生效:
. ~/.bashrc
Pig支持如下4种执行模式:
- 本地模式
本地模式主要是基于本地文件系统,比较适合调试脚本使用。进入本地模式执行如下命令:
pig -x local
- Tez本地模式
Tez本地模式类似于前面的本地模式,它使用Tez运行时引擎,进入Tez本地模式执行如下命令:
pig -x tez_local
不过该模式还处于试验阶段,不过多累述。
- MapReduce模式
MapReduce模式基于Hadoop,数据存储在HDFS上,它基于运行于YARN之上的MapReduce进行处理。进入MapReduce运行模式执行如下命令:
pig -x mapreduce
一般,我们的数据都是存储在HDFS上的,使用该模式能够充分利用Hadoop集群的计算能力。
- Tez模式
基于Tez模式执行,需要在安装Hadoop集群的时候,修改Hadoop配置文件mapred-site.xml,将属性mapreduce.framework.name的值设置为yarn-tez。进入Tez模式执行如下命令:
pig -x tez
有关Tez相关内容,可以查看Apache Tez官网介绍。
数据类型
Pig的数据类型可以分为2类,分别为简单类型和复杂类型。简单类型包括:
int、long、float、double、chararray、bytearray、boolean、datetime、biginteger、bigdecimal。复杂类型包括:tuple、bag、map。
这里对特别的数据类型,解释说明一下:
chararray相当于字符串String;bytearray相当于字节数组;tuple是一个有序的字段的集合,可以理解为元组,例如(3090018, ‘Android’, 76);bag是tuple的集合,例如{(3090018, ‘Android’, 76), (3090019, ‘iOS’, 172)};map是键值对的集合,例如[name#Jeff Stone, age#28, healthy index#195.58]。
基本操作符
- 算数操作符(Arithmetic Operators)包括:+、-、*、/、%、?:、CASE WHEN THEN ELSE END。
- 布尔操作符(Boolean Operators)包括:AND、OR、IN、NOT。
- 类型转换操作符(Cast Operators):使用圆括号包含类型名,作用于一个字段,例如(int)age、(map[])、(chararray)COUNT($2)、(tuple(chararray,int,map[]))name_age_scores等等。
- 比较操作符(Comparison Operators)包括:==、!=、、=、matches。其中,matches比较操作符使用 Java 的Pattern进行匹配来比较,例如user_name matches ‘[a-n]{3,12}’。
- 类型构造操作符(Type Construction Operators):可以创建复杂类型的数据,tuple使用(),map使用[],bag使用{},例如FOREACH users GENERATE (name, age, address)。
- 解引用操作符(Dereference Operators):解引用主要是针对集合类型tuple、bag、map,从集合中拿到对应字段的值。比如对于tuple,定义类型t=tuple(t1:int,t2:int,t3:int),则我要获取字段t1和t3的值,一种方式可以通过t.t1和t.t3得到,也可以通过t.$0和t.$2获取到。
关系操作符
操作符 | 语法 | 说明 |
ASSERT | ASSERT alias BY expression [, message]; | 断言:对某个数据的条件为true |
COGROUP | alias = COGROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING 'collected' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; | 数据分组,与GROUP相同,但是至多支持127个关系 |
CROSS | alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n]; | 笛卡儿积 |
CUBE | alias = CUBE alias BY { CUBE expression | ROLLUP expression }, [ CUBE expression | ROLLUP expression ] [PARALLEL n]; | 计算CUBE,支持ROLLUP操作 |
DEFINE | DEFINE macro_name (param [, param ...]) RETURNS {void | alias [, alias ...]} { pig_latin_fragment };DEFINE alias {function | [`command` [input] [output] [ship] [cache] [stderr] ] }; | 定义宏(类似函数),能够重用脚本代码为UDF或streaming设置别名 |
DISTINCT | alias = DISTINCT alias [PARTITION BY partitioner] [PARALLEL n]; | 去重操作,可以指定并行度(即Reducer个数) |
FILTER | alias = FILTER alias BY expression; | 条件过滤 |
FOREACH |
alias = FOREACH alias GENERATE expression [AS schema] [expression [AS schema]….]; alias = FOREACH nested_alias { alias = {nested_op | nested_exp}; [{alias = {nested_op | nested_exp}; …] GENERATE expression [AS schema] [expression [AS schema]….] }; |
基于列对数据进行转换 |
GROUP | alias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING 'collected' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; |
数据分组操作: 只支持一个关系 PARALLEL子句可以指定并行度(Reducer个数) |
IMPORT | IMPORT ‘file-with-macro’; | 导入外部Pig脚本 |
JOIN |
alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; |
内连接外连接 |
LIMIT | alias = LIMIT alias n; | 输出结果集的n个记录 |
LOAD | LOAD ‘data’ [USING function] [AS schema]; | 从数据源加载数据 |
MAPREDUCE | alias1 = MAPREDUCE ‘mr.jar’ STORE alias2 INTO ‘inputLocation’ USING storeFunc LOAD ‘outputLocation’ USING loadFunc AS schema [`params, ... `]; | 在Pig中执行MapReduce程序,需要指定使用的MapReduce程序JAR文件 |
ORDER BY | alias = ORDER alias BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [PARALLEL n]; | 排序 |
RANK | alias = RANK alias [ BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [DENSE] ]; | 排名操作:可能有排名相同的,即排名序号相同 |
SAMPLE | SAMPLE alias size; | 用于采样,size范围[0, 1] |
SPLIT | SPLIT alias INTO alias IF expression, alias IF expression [, alias IF expression …] [, alias OTHERWISE]; | 将一个大表,拆分成多个小表 |
STORE | STORE alias INTO ‘directory’ [USING function]; | 存储结果到文件系统:如果为指定USING子句,则使用默认的PigStorage(),更多可以查看“Load/Store函数”。 |
STREAM | alias = STREAM alias [, alias …] THROUGH {`command` | cmd_alias } [AS schema] ; | 将数据发送到外部程序或者脚本 |
UNION | alias = UNION [ONSCHEMA] alias, alias [, alias …]; | 计算并集 |
关系操作符示例
- ASSERT
live_user_ids = LOAD '/test/live_user_ids' USING PigStorage() AS (udid: chararray); ASSERT live_user_ids BY udid != null, 'udid MUST NOT be NULL!';
上面断言表live_user_ids中的udid字段一定存在值。
- GROUP
根据某个或某些字段进行分组,只根据一个字段进行分组,比较简单。如果想要根据两个字段分组,则可以将两个字段构造成一个tuple,然后进行分组。Pig脚本如下所示:
events = LOAD '/data/etl/hive_input/20150613/basis_event_2015061306-r-00002' USING PigStorage('\t'); projected_events = FOREACH events GENERATE $1 AS (event_code: long), $2 AS (udid: chararray), $10 AS (network: chararray), $34 AS (area_code: int); -- $1是表events的第2个字段 uniq_events = DISTINCT projected_events; uniq_events = FILTER uniq_events BY (event_code IS NOT NULL) AND (udid IS NOT NULL) AND (network IS NOT NULL) AND (area_code IS NOT NULL); groupped = GROUP uniq_events BY (udid, event_code) PARALLEL 2; -- 指定使用2个Reducer selected10 = LIMIT groupped 10; DUMP selected10;
从HDFS加载的文件中执行投影操作,生成包含event_code、udid、network、area_code这4个字段的一个表projected_events,接着执行去重、条件过滤操作,计算分组的时候基于event_code、udid两个进行分组。
- FOREACH
FOREACH操作可以针对一个数据集进行迭代处理操作,生成一个新的数据集。它有2种使用方法,一种是执行投影操作,选择部分字段的数据,例如脚本:
provinces = LOAD '/test/provinces' USING PigStorage(',') AS (country_id: int, province_id: int, name: chararray); compositekeyed_provinces = FOREACH provinces GENERATE (CONCAT(CONCAT((chararray)country_id, '_'), (chararray)province_id) AS pid, name);
这里,将原数据集的两个主键字段的值进行拼接合并,作为新表的一个字段。另一种是,支持在FOREACH操作中使用代码段,可以增加更复杂的处理逻辑,摘自官网的例子,例如脚本:
a = LOAD '/test/data' AS (url:chararray, outlink:chararray); DUMP a; (www.ccc.com,www.hjk.com) (www.ddd.com,www.xyz.org) (www.aaa.com,www.cvn.org) (www.www.com,www.kpt.net) (www.www.com,www.xyz.org) (www.ddd.com,www.xyz.org) b = GROUP a BY url; DUMP b; (www.aaa.com,{(www.aaa.com,www.cvn.org)}) (www.ccc.com,{(www.ccc.com,www.hjk.com)}) (www.ddd.com,{(www.ddd.com,www.xyz.org),(www.ddd.com,www.xyz.org)}) (www.www.com,{(www.www.com,www.kpt.net),(www.www.com,www.xyz.org)}) result = FOREACH b { filterda = FILTER a BY outlink == 'www.xyz.org'; -- 过滤掉outlink字段值为'www.xyz.org'的记录 filtered_outlinks = filterda.outlink; filtered_outlinks = DISTINCT filtered_outlinks; -- 对outlink集合进行去重 GENERATE group, COUNT(filtered_outlinks); -- 根据对表a进行分组得到group,计算每个分组中outlink的数量 }; DUMP result; (www.aaa.com,0) (www.ccc.com,0) (www.ddd.com,1) (www.www.com,1)
上面,表b的第一个字段为chararray类型的字段(存放域名字符串),第二个字段是一个bag类型的字段(存在当前域名的出链接,即的集合),最后统计的结果是给定的url的出链接的个数。
- FILTER
根据条件进行过滤,相当于 SQL 中WHERE子句。示例脚本如下所示:
live_info = LOAD '/test/live_info' USING PigStorage() AS (id: long, name: chararray); newly_added_lives = FILTER live_info BY (id % 140000 >= 0) AND (name matches '[a-zA-Z0-9]{8, 32}' OR name == 'test');
上面内容很好理解,不再累述。
- JOIN
JOIN操作支持支持配置并行度,指定Reducer的数量。表连接操作,支持内连接和外连接,内连接脚本示例如下:
live = LOAD '/test/shiyj/pig/pig_live' USING PigStorage('\t') AS (id: long, name: chararray); program = LOAD '/test/shiyj/pig/pig_live_program' USING PigStorage('\t') AS (id: long,name: chararray,live_id: long,live_start: chararray,live_end: chararray); program_info = JOIN live BY id, program BY live_id; DUMP program_info;
根据表live的id字段,表program的live_id字段进行内连接。外连接的操作,官网给出了4个例子,可以分别看一下。左外连接例子如下:
A = LOAD 'a.txt' AS (n:chararray, a:int); B = LOAD 'b.txt' AS (n:chararray, m:chararray); C = JOIN A by $0 LEFT OUTER, B BY $0; -- 表A和B的字段n进行左外连接
全外连接的例子如下所示:
A = LOAD 'a.txt' AS (n:chararray, a:int); B = LOAD 'b.txt' AS (n:chararray, m:chararray); C = JOIN A BY $0 FULL, B BY $0; -- 使用FULL关键字
支持复制的左外连接(Replicated Join),示例如下:
A = LOAD 'large'; B = LOAD 'tiny'; C= JOIN A BY $0 LEFT, B BY $0 USING 'replicated';
只有左外连接支持这种方式,实际上Replicated Join会在MapReduce的Map阶段把做左表进行复制,也就是说做表应该是小表,能够在内存中放得下,然后与右表进行连接操作。还有一种使用Skewed Join,示例如下所示:
A = LOAD 'studenttab' as (name, age, gpa); B = LOAD 'votertab' as (name, age, registration, contribution); C = JOIN A BY name FULL, B BY name USING 'skewed';
只有在进行外连接的两表的数据,明显不对称,称为数据倾斜,一个表很大,另一个表相对小,但是内存中放不下,这种情况可以使用Skewed Join操作。目前,Pig支持基于两表的Skewed Join操作。
- DISTINCT
去重操作使用DISTINCT,比较简单,示例如下所示:
live_user_ids = LOAD '/test/live_user_ids' USING PigStorage() AS (udid: chararray); uniq_user_ids = DISTINCT live_user_ids PARALLEL 8;
DISTINCT操作支持配置并行度,指定Reducer的数量。
- UNION
计算并集操作,使用UNION操作符,示例如下所示:
a = LOAD 'data' AS (a1:int,a2:int,a3:int); b = LOAD 'data' AS (b1:int,b2:int); u = UNION a, b;
计算并集,不要求两表的字段数一定相同。
- LIMIT
LIMIT选择计算结果的一部分,示例如下所示:
top100_user_ids = LIMIT live_user_ids 100;
- STORE
STORE操作用来保存计算结果,示例如下所示:
STORE play_users INTO '/test/shiyj/tmp.play_users' USING PigStorage ('\t');
如果没有指定USING子句,则默认使用PigStorage()函数,另外Pig还支持如下的Store/Load函数:
BinStorage() JsonLoader(['schema']) JsonStorage() PigDump() PigStorage([field_delimiter] , ['options']) TextLoader() HBaseStorage('columns', ['options']) AvroStorage(['schema|record name'], ['options']) TrevniStorage(['schema|record name'], ['options']) AccumuloStorage(['columns'[, 'options']]) OrcStorage(['options'])
具体使用方法,可以参考文档介绍。
- CROSS
比较容易理解,摘自官网上的例子,如下所示:
-- 加载表a数据 a = LOAD 'data1' AS (a1:int,a2:int,a3:int); DUMP a; (1,2,3) (4,2,1) -- 加载表b数据 b = LOAD 'data2' AS (b1:int,b2:int); DUMP b; (2,4) (8,9) (1,3) -- 计算笛卡尔积,并输出结果 result = CROSS a, b; DUMP result; (1,2,3,2,4) (1,2,3,8,9) (1,2,3,1,3) (4,2,1,2,4) (4,2,1,8,9) (4,2,1,1,3)
- CUBE
这个操作符功能比较强大,如下所示:
users = LOAD '/test/shiyj/pig/pig_live_users' AS (create_date,room_id,audio_id,udid); groupped = COGROUP users BY (create_date,room_id,audio_id); groupped_count = FOREACH groupped { uniq = DISTINCT users.udid; GENERATE group, COUNT(uniq); }; STORE groupped_count INTO '/test/shiyj/pig/groupped_count' USING PigStorage('\t'); -- 将分组统计的结果保存到HDFS groupped_count = LOAD '/test/shiyj/pig/groupped_count/part-r-*' AS (k: tuple(chararray, long, long), cnt: int); -- 加载前面保存的结果,进行CUBE计算 groupped_count = FOREACH groupped_count GENERATE k.$0, k.$1, k.$2, cnt; cubed_users = CUBE groupped_count BY CUBE($0, $1, $2); DUMP cubed_users;
我们可以看下官网文档的例子,简单比较容易理解:
(1)CUBE操作
Pig脚本内容,如下所示:
salesinp = LOAD '/pig/data/salesdata' USING PigStorage(',') AS (product:chararray, year:int, region:chararray, state:chararray, city:chararray, sales:long); cubedinp = CUBE salesinp BY CUBE(product,year); result = FOREACH cubedinp GENERATE FLATTEN(group), SUM(cube.sales) AS totalsales; DUMP result;
如果输入数据为(car, 2012, midwest, ohio, columbus, 4000),则上面脚本执行CUBE操作,结果输出内容如下所示:
(car,2012,4000) (car,,4000) (,2012,4000) (,,4000)
上面针对产品(product)和年度(year)两个维度进行查询。
(2)ROLLUP操作
CUBE操作支持ROLLUP(上卷操作),例如脚本内容:
salesinp = LOAD '/pig/data/salesdata' USING PigStorage(',') AS (product:chararray, year:int, region:chararray, state:chararray, city:chararray, sales:long); rolledup = CUBE salesinp BY ROLLUP(region,state,city); result = FOREACH rolledup GENERATE FLATTEN(group), SUM(cube.sales) AS totalsales; DUMP result;
同样如果输入tuple值为(car, 2012, midwest, ohio, columbus, 4000),则ROLLUP操作结果如下所示:
(midwest,ohio,columbus,4000) (midwest,ohio,,4000) (midwest,,,4000) (,,,4000)
上面只是根据ROLLUP表达式指定的维度执行CUBE操作。
(3)合并CUBE和ROLLUP操作
还可以将CUBE操作和ROLLUP操作合并起来,例如执行脚本内容:
salesinp = LOAD '/pig/data/salesdata' USING PigStorage(',') AS (product:chararray, year:int, region:chararray, state:chararray, city:chararray, sales:long); cubed_and_rolled = CUBE salesinp BY CUBE(product,year), ROLLUP(region, state, city); result = FOREACH cubed_and_rolled GENERATE FLATTEN(group), SUM(cube.sales) AS totalsales;
上面的CUBE操作等价于下面两中操作:
cubed_and_rolled = CUBE salesinp BY CUBE(product,year,region, state, city); -- 或 cubed_and_rolled = CUBE salesinp BY ROLLUP(product,year,region, state, city);
执行结果,如下所示:
(car,2012,midwest,ohio,columbus,4000) (car,2012,midwest,ohio,,4000) (car,2012,midwest,,,4000) (car,2012,,,,4000) (car,,midwest,ohio,columbus,4000) (car,,midwest,ohio,,4000) (car,,midwest,,,4000) (car,,,,,4000) (,2012,midwest,ohio,columbus,4000) (,2012,midwest,ohio,,4000) (,2012,midwest,,,4000) (,2012,,,,4000) (,,midwest,ohio,columbus,4000) (,,midwest,ohio,,4000) (,,midwest,,,4000) (,,,,,4000)
- SAMPLE
对数据进行取样操作,脚本如下所示:
-- 加载原始数据集,并计算记录数 users = LOAD '/test/shiyj/pig/pig_live_users' AS (create_date,room_id,audio_id,udid); g_users = GROUP users ALL; total_user_cnt = FOREACH g_users GENERATE COUNT(users); DUMP total_user_cnt; -- 15%取样,计算取样记录数 sampled_users = SAMPLE users 0.15; g_sampled_users = GROUP sampled_users ALL; sampled_user_cnt = FOREACH g_sampled_users GENERATE COUNT(sampled_users); DUMP sampled_user_cnt;
- MAPREDUCE
MAPREDUCE操作允许在Pig脚本内部执行MapReduce程序,示例脚本来自官网,如下所示:
A = LOAD 'WordcountInput.txt'; B = MAPREDUCE 'wordcount.jar' STORE A INTO 'inputDir' LOAD 'outputDir' AS (word:chararray, count: int) `org.myorg.WordCount inputDir outputDir`;
如果直接写原生的MapReduce程序各个能解决实际问题,可以将写好的程序打包,在Pig脚本中指定相关参数即可运行。
- SPLIT
将一个表拆分成多个表,可以按照“水平拆分”的思想进行操作,根据某些条件来生成新表。示例如下所示:
A = LOAD 'data' AS (f1:int,f2:int,f3:int); -- 加载数据到表A DUMP A; (1,2,3) (4,5,6) (7,8,9) SPLIT A INTO X IF f1<7, Y IF f2==5, Z IF (f3<6 OR f3>6); -- A表中满足条件f1<7的记录插入到表X中,满足条件f2==5的记录插入到T表中,满足条件(f3<6 OR f3>6)的记录插入到Z表中 DUMP X; (1,2,3) (4,5,6) DUMP Y; (4,5,6) DUMP Z; (1,2,3) (7,8,9)
求值函数(Eval Functions)
函数 | 语法 | 说明 |
AVG | AVG(expression) | 计算某一个数字类型的列的均值,数字类型支持:int,long,float,double,bigdecimal,biginteger,bytearray |
BagToString | BagToString(vals:bag [, delimiter:chararray]) | 将bag转换成字符串,可以指定分隔符,适合拼接bag中字符串 |
CONCAT | CONCAT(expression, expression, [...expression]) | 字符串拼接 |
COUNT | COUNT(expression) | 计算一个bag中元素的总数,不含NULL值 |
COUNT_STAR | COUNT_STAR(expression) | 计算一个bag中元素的总数,包含NULL值 |
DIFF | DIFF (expression, expression) | 比较一个tuple中的两个字段,这两个字段都是bag类型,结果返回在两个bag中不同的元素,结果仍然是一个bag |
IsEmpty | IsEmpty(expression) | 检查一个map或bag是否为空 |
MAX | MAX(expression) | 计算最大值,支持数组类型:int,long,float,double,bigdecimal,biginteger,bytearray |
MIN | MIN(expression) | 计算最小值,支持数组类型:int,long,float,double,bigdecimal,biginteger,bytearray |
PluckTuple |
DEFINE pluck PluckTuple(expression1) DEFINE pluck PluckTuple(expression1,expression3) pluck(expression2) |
允许定义一个字符串前缀,然后过滤指定的列,满足:一概字符串前缀开始,或者匹配该正则表达式,下面是官网的例子: a = LOAD 'a' as (x, y); b = LOAD 'b' as (x, y); c = JOIN a by x, b by x; -- 表a和b连接,因为表a和b有相同的列名,所以连接后添加前缀“表名::”来区分 DEFINE pluck PluckTuple('a::'); <code>-- 定义前缀"a::",等价于DEFINE pluck PluckTuple('a::', true); d = FOREACH c GENERATE FLATTEN(pluck(*)); -- 包含前缀"a::"的都保留 DESCRIBE c; c: {a::x: bytearray,a::y: bytearray,b::x: bytearray,b::y: bytearray} DESCRIBE d; d: {plucked::a::x: bytearray,plucked::a::y: bytearray} DEFINE pluckNegative PluckTuple('a::','false'); -- 定义前缀"a::",包含该前缀的过滤掉 d = FOREACH c GENERATE FLATTEN(pluckNegative(*)); DESCRIBE d; -- 结果中包含前缀"a::"的被排除掉 d: {plucked::b::x: bytearray,plucked::b::y: bytearray} |
SIZE | SIZE(expression) | 计算Pig指定数据类型的元素的数量,支持类型:int,long,float,double,,chararray,bytearray、tuple、bag、map |
SUBTRACT | SUBTRACT(expression, expression) | bag操作符,用来对两个bag做差集操作,结果为:包含在第一个bag中但不包含在第二个bag中的元素 |
SUM | SUM(expression) | 求和操作,支持类型:int,long,float,double,bigdecimal,biginteger,将bytearray转换为double类型 |
TOKENIZE | TOKENIZE(expression [, 'field_delimiter']) | 拆分一个字符串,得到一个bag结果集 |
数学函数
数学函数比较简单,不再详细描述,主要包括如下20个:
ABS ACOS ASIN ATAN CBRT CEIL COS COSH EXP FLOOR LOG LOG10 RANDOM ROUND ROUND_TO SIN SINH SQRT TAN TANH
具体使用可以查看官方文档。
字符串函数
字符串函数非常常用,主要包括如下20个:
ENDSWITH EqualsIgnoreCase INDEXOF LAST_INDEX_OF LCFIRST LOWER LTRIM REGEX_EXTRACT REGEX_EXTRACT_ALL REPLACE RTRIM SPRINTF STARTSWITH STRSPLIT STRSPLITTOBAG SUBSTRING TRIM UCFIRST UPPER UniqueID
使用方法可以查看文档。
日期时间函数
日期函数有下面24个,如下所示:
AddDuration CurrentTime DaysBetween GetDay GetHour GetMilliSecond GetMinute GetMonth GetSecond GetWeek GetWeekYear GetYear HoursBetween MilliSecondsBetween MinutesBetween MonthsBetween SecondsBetween SubtractDuration ToDate ToMilliSeconds ToString ToUnixTime WeeksBetween YearsBetween
集合函数
集合函数主要是,将其他类型的数据转换为集合类型tuple、bag、map,如下所示:
TOTUPLE
TOBAG
TOMAP
TOP
前面3个都是生成集合的函数,最后一个用来计算一个集合中的topN个元素,可以指定是按照升序/降序得到的结果,语法为TOP(topN,column,relation)。
Hive UDF函数
在Pig中可以直接调用Hive的UDF,HiveUDAF和HiveUDTF,语法如下表所示:
函数 | 语法 | 说明 |
HiveUDF | HiveUDF(name[, constant parameters]) |
DEFINE sin HiveUDF('sin'); -- 定义HiveUDF,后面可以直接使用函数sin a = LOAD 'student' AS (name:chararray, age:int, gpa:double); b = FOREACH a GENERATE sin(gpa); -- 使用函数sin |
HiveUDAF | HiveUDAF(name[, constant parameters]) |
DEFINE explode HiveUDTF('explode'); a = LOAD 'mydata' AS (a0:{(b0:chararray)}); b = FOREACH a GENERATE FLATTEN(explode(a0)); |
HiveUDTF | HiveUDTF(name[, constant parameters]) |
DEFINE avg HiveUDAF('avg'); a = LOAD 'student' AS (name:chararray, age:int, gpa:double); b = GROUP a BY name; c = FOREACH b GENERATE group, AVG(a.age); |
Pig UDF
Pig也支持用户自定义函数UDF,而且支持使用多种编程语言来实现UDF,目前支持的变成语言包括:Java、JavaScript、Jython、 Ruby 、Groovy、Python。以Java为例,可以通过继承自类org.apache.pig.EvalFunc来实现一个UDF,将实现的UDF泪打包后,Pig安装目录下的CLASSPATH下面,然后可以在Pig脚本中使用。例如,我们实现的UDF类为org.shirdrn.pig.udf.IPAddressConverterUDF,用来根据ip代码(long类型),转换为对应的点分十进制的IP地址字符串,打包后JAR文件名称为iptool.jar,则可以在Pig脚本中这样使用:
REGISTER 'iptool.jar'; a = LOAD '/data/etl/$date_string/$event_file' AS (event_code: long, udid: chararray, ip_code: long, network: chararray); b = FOREACH a GENERATE event_code, udid, org.shirdrn.etl.pig.udf.IPAddressConverterUDF(ip_code); DUMP b;
也可以实现一个自定义的累加器(Accumulator)或者过滤器,或者其他一些功能,可以实现相关的接口:Algebraic,Accumulator,FilterFunc,LoadFunc,StoreFunc,具体可以参考相关文档或资料。
另外,也可以通过在PiggyBank在来查找其它用户分享的UDF,可以在 http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank 中找到。
相关问题总结
- 运行Pig脚本,从外部向脚本传递参数
在实际使用中,我们经常需要从外部传递参数到Pig脚本中,例如,文件路径,或者日期时间,等等,可以直接使用pig命令的-p参数从外部传参,例如,有下面的Pig脚本compute_event_user_count.pig:
a = LOAD '/data/etl/hive_input/20150613/basis_event_2015061305-r-00001' USING PigStorage('\t'); DESCRIBE a; b = FOREACH a GENERATE $1 AS event_code, $2 AS udid, $4 AS install_id; c = GROUP b BY (event_code, udid); d = FOREACH c GENERATE group, COUNT($1); DUMP d;
上面我们是直接将文件路径写死在脚本中,如果需要从外部传递输入文件、输出目录,则可以改写为:
a = LOAD '$input_file' USING PigStorage('\t'); DESCRIBE a; b = FOREACH a GENERATE $1 AS event_code, $2 AS udid, $4 AS install_id; c = GROUP b BY (event_code, udid); d = FOREACH c GENERATE group, COUNT($1); STORE d INTO '$output_dir' USING PigStorage ('\t');
则可以执行Pig脚本,并传递参数:
bin/pig -p input_file=/data/etl/hive_input/20150613/basis_event_2015061305-r-00001 -p output_dir=/test/shiyj/pig/example_output -x mapreduce compute_event_user_count.pig
这样就可以实现从外部向Pig脚本传递参数,可以到HDFS上查看结果输出文件/test/shiyj/pig/example_output/part-r-00000。pig命令更多选项,可以查看pig帮助命令:
bin/pig -h
- 运行Pig脚本出现异常“Retrying connect to server: 0.0.0.0/0.0.0.0:10020”
实际应用中,我们几乎不可能将Pig安装到Hadoop集群的NameNode所在的节点,如果可以安装到NameNode节点上,基本不会报这个错误的。这个错误主要有是由于Pig没有安装在NameNode节点上,而是以外的其它节点上,它在执行计算过程中,需要与MapReduce的JobHistoryServer的IPC服务进行通信,所以在安装Hadoop时需要允许JobHistoryServer的IPC主机和端口被外部其它节点访问,只需要修改etc/hadoop/mapreduce-site.xml配置文件,增加如下配置即可:
<property> <name>mapreduce.jobhistory.address</name> <value>10.10.4.130:10020</value> <description>MapReduce JobHistory Server IPC host:port</description> </property>
如果第一次安装Hadoop没有配置该属性mapreduce.jobhistory.address,那么Hadoop集群的所有节点上会使用默认的配置值为0.0.0.0:10020,所以如果不在NameNode上安装Pig程序,导致Pig所在的节点上安装的Hadoop的配置属性mapreduce.jobhistory.address使用默认值,也就是Pig所在节点0.0.0.0:10020,所以Pig脚本就会执行过程中与本机的10020端口通信,显然会失败的。其实,如果已经在NameNode上启动了JobHistoryServer进程,只需要修改mapreduce.jobhistory.address的属性值,然后同步到所有安装Hadoop文件的节点,包括Pig所在节点即可,不需要重启NameNode节点上的JobHistoryServer进程。如果没有在NameNode上启动JobHistoryServer进程,执行如下命令启动即可:
mr-jobhistory-daemon.sh start historyserver
参考链接
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- WebSocket简介与最佳实践
- Apache Pig简介与实践
- Apache Pig简介与实践
- Spring Boot Admin 简介及实践
- Spring Boot Admin简介及实践
- [译]WebRTC基础实践 - 1. WebRTC简介
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
精通Windows应用开发
[美] Jesse Liberty Philip Japikse Jon Galloway / 苏宝龙 / 人民邮电出版社 / 59.00元
Windows 8.1的出现不仅提供了跨设备的用户体验,也提供了跨设备的开发体验。本书着眼于实际项目中所需要的特性,以及现有C#编程知识的运用,对如何最大限度地利用Metro、WinRT和Windows 8进行了讲解,内容详尽,注重理论学习与实践开发的配合。 Windows 8.1和WinRT的作用及其特殊性 如何使用先进特性创建具有沉浸感和吸引力的Windows 8.1应用 如......一起来看看 《精通Windows应用开发》 这本书的介绍吧!