。Pig Latin是一种数据流语言,变量的命名规则同java中变量的命名规则,变量名可以复用(不建议这样做,这种情况下相当与新建一个变量,同时删除原来的变量)
A = load 'NYSE_dividends' (exchange, symbol, date, dividends);A = filter A by dividends > 0;A = foreach A generate UPPER(symbol);
。注释:--单行注释;/*……*/多行注释;
。Pig Latin关键词不区分大小写,比如load,foreach,但是变量名和udf区分大小写,COUNT是udf,所以不同于count。
。Load 加载数据
默认加载当前用户的home目录(/users/yourlogin),可以在grunt下输入cd 命令更改当前所在目录。
divs = load '/data/examples/NYSE_dividends'
也可以输入完整的文件名
divs = load ‘hdfs://nn.acme.com/data/examples/NYSE_dividends’
默认使用TAB(\t)作为分割符,也可以使用using定义其它的分割符
divs = load 'NYSE_dividends' using PigStorage(',');
注意:只能用一个字符作为分割符
还可以使用using定义其它的加载函数
divs = load 'NYSE_dividends' using HBaseStorage();
as用于定义模式
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
也可以使用通配符加载一个目录下的所有文件,该目录下的所有子目录的文件也会被加载。通配符由hadoop文件系统决定,下面是hadoop 0.20所支持的通配符
glob | comment |
---|---|
? | Matches any single character. |
* | Matches zero or more characters. |
[abc] | Matches a single character from character set (a,b,c). |
[a-z] | Matches a single character from the character range (a..z), inclusive. The first character must be lexicographically less than or equal to the second character. |
[^abc] | Matches a single character that is not in the character set (a, b, c). The ^ character must occur immediately to the right of the opening bracket. |
[^a-z] | Matches a single character that is not from the character range (a..z) inclusive. The ^ character must occur immediately to the right of the opening bracket. |
\c | Removes (escapes) any special meaning of character c. |
{ab,cd} | Matches a string from the string set {ab, cd} |
。as 定义模式,可用于load ** [as (ColumnName[:type])],foreach…generate ColumnName [as newColumnName]
。store存储数据,默认用using PigStorage 使用tab作为分割符。
store processed into '/data/examples/processed';
也可以输入完整路径比如hdfs://nn.acme.com/data/examples/processed.
可以使用using调用其它存储函数或其它分割符
store processed into 'processed' using HBaseStorage();
store processed into 'processed' using PigStorage(',');
注意:数据存储并不是存储为一个文件,而是由reduce进程数决定的多个part文件。
。foreach…generate[*][begin .. end]
*匹配所有,同样适用与udf;
..匹配begin和end之间的部分,包括begin和end
prices = load 'NYSE_daily' as (exchange, symbol, date, open,high, low, close, volume, adj_close);beginning = foreach prices generate ..open; -- produces exchange, symbol, date, openmiddle = foreach prices generate open..close; -- produces open, high, low, closeend = foreach prices generate volume..; -- produces volume, adj_close
一般情况下foreach…generate…重新生成的模式中的数据名和数据类型保持原来的名字和数据类型,但是如果有表达式则不会,可以在generate 变量后使用as关键词定义别名;
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);sym = foreach divs generate symbol;describe sym;sym: {symbol: chararray}
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);in_cents = foreach divs generate dividends * 100.0 as dividend, dividends * 100.0; describe in_cents;in_cents: {dividend: double,double}
#用于map查找;.用于tuple(元组)投影;
bball = load 'baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]);avg = foreach bball generate bat#'batting_average';
A = load 'input' as (t:tuple(x:int, y:int));B = foreach A generate t.x, t.$1;
3.获取bag(包)中的数据
A = load 'input' as (b:bag{t:(x:int, y:int)});B = foreach A generate b.x;
A = load 'input' as (b:bag{t:(x:int, y:int)});B = foreach A generate b.(x, y);
下面的语句将执行不了
A = load 'foo' as (x:chararray, y:int, z:int);B = group A by x; -- produces bag A containing all the records for a given value of xC = foreach B generate SUM(A.y + A.z);
因为A.y 和 A.z都是bag,符号+对于bag不适用。
正确的做法如下
A = load 'foo' as (x:chararray, y:int, z:int);A1 = foreach A generate x, y + z as yz;B = group A1 by x;C = foreach B generate SUM(A1.yz);
。foreach中嵌套其它语句
--distinct_symbols.pigdaily = load 'NYSE_daily' as (exchange, symbol); -- not interested in other fieldsgrpd = group daily by exchange;uniqcnt = foreach grpd { sym = daily.symbol; uniq_sym = distinct sym; generate group, COUNT(uniq_sym);};
注意:foreach内部只支持distinct, filter, limit, order关键词;最后一句必须是generate;
--double_distinct.pigdivs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray);grpd = group divs all;uniq = foreach grpd { exchanges = divs.exchange; uniq_exchanges = distinct exchanges; symbols = divs.symbol; uniq_symbols = distinct symbols; generate COUNT(uniq_exchanges), COUNT(uniq_symbols);};
。flatten消除包嵌套关系
--flatten.pigplayers = load 'baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]);pos = foreach players generate name, flatten(position) as position;bypos = group pos by position;
--flatten_noempty.pigplayers = load 'baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]);noempty = foreach players generate name, ((position is null or IsEmpty(position)) ? {('unknown')} : position) as position;pos = foreach noempty generate name, flatten(position) as position;bypos = group pos by position;
。filter (注:pig中的逻辑语句同样遵循短路原则)
注意:null == 任何数据
。filter结合matches使用正则表达式(matches前加not表示不匹配)
pig中的正则表达式格式和java中的正则表达所一样,参考
各种转义字符,转义字符使用方式:\\后面跟上转义码
点的转义:. ==> u002E 美元符号的转义:$ ==> u0024 乘方符号的转义:^ ==> u005E 左大括号的转义:{ ==> u007B 左方括号的转义:[ ==> u005B 左圆括号的转义:( ==> u0028 竖线的转义:| ==> u007C 右圆括号的转义:) ==> u0029 星号的转义:* ==> u002A 加号的转义:+ ==> u002B 问号的转义:? ==> u003F 反斜杠的转义: ==> u005C
下面的例子查找包括CM.的记录
-- filter_not_matches.pig divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);notstartswithcm = filter divs by not symbol matches '.*CM\\2u002E1.*';
。group之后的数据是一个map,其中key是group所用的键值,value是group针对的变量;
可用()同时对多个变量作group,group…all用于所有变量(注意:使用all时没有by),group之后的变量分为两个部分,第一部分变量名是group(不能更改),第二部是和原始bag模式一样的bag。
--twokey.pigdaily = load 'NYSE_daily' as (exchange, stock, date, dividends);grpd = group daily by (exchange, stock);avg = foreach grpd generate group, AVG(daily.dividends);describe grpd;grpd: {group: (exchange: bytearray,stock: bytearray),daily: {exchange: bytearray, stock: bytearray,date: bytearray,dividends: bytearray}}
--countall.pigdaily = load 'NYSE_daily' as (exchange, stock);grpd = group daily all;cnt = foreach grpd generate COUNT(daily);
。cogroup对多个变量进行group
注意:所有key值为null的数据都被归为同一类,这一点和group相同,和join不同。
A = load 'input1' as (id:int, val:float);B = load 'input2' as (id:int, val2:int);C = cogroup A by id, B by id;describe C;C: {group: int,A: {id: int,val: float},B: {id: int,val2: int}}
。order by
对单列进行排序
--order.pigdaily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);bydate = order daily by date;
对多列进行排序
--order2key.pigdaily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);bydatensymbol = order daily by date, symbol;
desc关键词按降序进行排序,null小于所有词
--orderdesc.pigdaily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);byclose = order daily by close desc, open;dump byclose; -- open still sorted in ascending order
。distinct只能去掉整个元组的重复行,不能去掉某几个特定列的重复行
--distinct.pig -- find a distinct list of ticker symbols for each exchange -- This load will truncate the records, picking up just the first two fields.daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray);uniq = distinct daily;
。join/left join / right join
null不匹配任何数据
-- join2key.pigdaily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close);divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);jnd = join daily by (symbol, date), divs by (symbol, date);
--leftjoin.pigdaily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close);divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);jnd = join daily by (symbol, date) left outer, divs by (symbol, date);
也可以同时多个变量,但只用于inner join
A = load 'input1' as (x, y);B = load 'input2' as (u, v);C = load 'input3' as (e, f);alpha = join A by x, B by u, C by e;
也可以自身和自身join,但数据要加载两次
--selfjoin.pig -- For each stock, find all dividends that increased between two datesdivs1 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends);divs2 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends);jnd = join divs1 by symbol, divs2 by symbol;increased = filter jnd by divs1::date < divs2::date and divs1::dividends < divs2::dividends;
下面这样不行
--selfjoin.pig -- For each stock, find all dividends that increased between two datesdivs1 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends);jnd = join divs1 by symbol, divs1 by symbol;increased = filter jnd by divs1::date < divs2::date and divs1::dividends < divs2::dividends;
。union 相当与sql中的union,但与sql不通的是pig中的union可以针对两个不同模式的变量:如果两个变量模式相同,那么union后的变量模式与变量的模式一样;如果一个变量的模式可以由另一各变量的模式强制类型转换,那么union后的变量模式与转换后的变量模式相同;否则,union后的变量没有模式。
A = load 'input1' as (x:int, y:float);B = load 'input2' as (x:int, y:float);C = union A, B;describe C;C: {x: int,y: float}A = load 'input1' as (x:double, y:float);B = load 'input2' as (x:int, y:double);C = union A, B;describe C;C: {x: double,y: double}A = load 'input1' as (x:int, y:float);B = load 'input2' as (x:int, y:chararray);C = union A, B;describe C; Schema for C unknown. 注意:在pig 1.0中 执行不了最后一种union。
注意:union不会剔除重复的行
如果需要对两个具有不通列名的变量union的话,可以使用onschema关键字
A = load 'input1' as (w: chararray, x:int, y:float);B = load 'input2' as (x:int, y:double, z:chararray);C = union onschema A, B;describe C;C: {w: chararray,x: int,y: double,z: chararray}
。cross 相当于离散数学中的叉乘,输入行数分别为m行,n行,输出行数则为m*n行。
--thetajoin.pig --I recommand running this one on a cluster too daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);crossed = cross daily, divs;tjnd = filter crossed by daily::date < divs::date;
。limit
--limit.pigdivs = load 'NYSE_dividends';first10 = limit divs 10;
在pig中除了order by 之外生成的数据都没有固定的顺序。上面的程序每次生成的数据也是不一样的。
。sample 用于生成测试数据,按指定参数选取部分数据。下面的程序选取10%的数据。
--sample.pigdivs = load 'NYSE_dividends';some = sample divs 0.1;
。Parallel 设置pig的reduce进程个数
--parallel.pigdaily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close);bysymbl = group daily by symbol parallel 10;
parallel只针对一条语句,如果希望脚本中的所有语句都有10个reduce进程,可以使用 set default_parallel 10命令
--defaultparallel.pig set default_parallel 10;daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close);bysymbl = group daily by symbol;average = foreach bysymbl generate group, AVG(daily.close) as avg;sorted = order average by avg desc;
如果同时使用parallel和set default_parallel,那么parallel中的参数将覆盖set default_parallel
。UDF
注册udf
--register.pigregister 'your_path_to_piggybank/piggybank.jar';divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);backwards = foreach divs generate org.apache.pig.piggybank.evaluation.string.Reverse(symbol);
定义udf别名
--define.pigregister 'your_path_to_piggybank/piggybank.jar';define reverse org.apache.pig.piggybank.evaluation.string.Reverse();divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);backwards = foreach divs generate reverse(symbol);
构造函数带参数的udf
--define_constructor_args.pigregister 'acme.jar';define convert com.acme.financial.CurrencyConverter('dollar', 'euro');divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);backwards = foreach divs generate convert(dividends);
。托管java中的静态函数(效率较低)
--invoker.pigdefine hex InvokeForString('java.lang.Integer.toHexString', 'int');divs = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close);nonnull = filter divs by volume is not null;inhex = foreach nonnull generate symbol, hex((int)volume);
如果函数的参数是一个数组,那么传递过去的是一个bag
define stdev InvokeForDouble('com.acme.Stats.stdev', 'double[]');A = load 'input' as (id: int, dp:double);B = group A by id;C = foreach B generate group, stdev(A.dp);
。multiquery
--multiquery.pig players = load 'baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]);pwithba = foreach players generate name, team, position, bat#'batting_average' as batavg;byteam = group pwithba by team;avgbyteam = foreach byteam generate group, AVG(pwithba.batavg);store avgbyteam into 'by_team';flattenpos = foreach pwithba generate name, team, flatten(position) as position, batavg;bypos = group flattenpos by position;avgbypos = foreach bypos generate group, AVG(flattenpos.batavg);store avgbypos into 'by_position';
。split
wlogs = load 'weblogs' as (pageid, url, timestamp);split wlogs into apr03 if timestamp < '20110404', apr02 if timestamp < '20110403' and timestamp > '20110401', apr01 if timestamp < '20110402' and timestamp > '20110331';store apr03 into '20110403';store apr02 into '20110402';store apr01 into '20110401';
。设置pig环境
Parameter | Value Type | Description |
---|---|---|
debug | string | Sets the logging level to DEBUG. Equivalent to passing -debug DEBUG on the command line. |
default_parallel | integer | Sets a default parallel level for all reduce operations in the script. See for details. |
job.name | string | Assigns a name to the Hadoop job. By default the name is the filename of the script being run, or a randomly generated name for interactive sessions. |
job.priority | string Type | If your Hadoop cluster is using the Capacity Scheduler with priorities enabled for queues, this allows you to set the priority of your Pig job. Allowed values are very_low, low, normal, high, very_high. |
。parameter 向pig脚本传递参数
--daily.pig daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);yesterday = filter daily by date == '$DATE';grpd = group yesterday all;minmax = foreach grpd generate MAX(yesterday.high), MIN(yesterday.low);
用-p 传递参数,每个变量前都要加一个-p
pig -p DATE=2009-12-17 daily.pig
参数也可以放在一个文件里,每行一个参数,注释部分以#开头,使用-m 或者 -param_file.调用参数文件
pig脚本
wlogs = load 'clicks/$YEAR$MONTH01' as (url, pageid, timestamp);
参数文件
#Param file YEAR=2009- MONTH=12- DAY=17 DATE=$YEAR$MONTH$DAY
执行
pig -param_file daily.params daily.pig
也可以在pig内定义参数%declare 或者 %default,%default定义默认的参数,在特殊情况下可以被覆盖
注意:%declare和%default不能用于以下位置:
- pig脚本,此脚本非Macro宏,并且脚本被另外一个脚本调用(如果不被调用可以使用)
%default parallel_factor 10;wlogs = load 'clicks' as (url, pageid, timestamp);grp = group wlogs by pageid parallel $parallel_factor;cntd = foreach grp generate group, COUNT(wlogs);
。定义Macro宏,相当于子函数
--macro.pig -- Given daily input and a particular year, analyze how -- stock prices changed on days dividends were paid out. define dividend_analysis (daily, year, daily_symbol, daily_open, daily_close) returns analyzed { divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); divsthisyear = filter divs by date matches '$year-.*'; dailythisyear = filter $daily by date matches '$year-.*'; jnd = join divsthisyear by symbol, dailythisyear by $daily_symbol; $analyzed = foreach jnd generate dailythisyear::$daily_symbol, $daily_close - $daily_open;};daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);results = dividend_analysis(daily, '2009', 'symbol', 'open', 'close');
。引用pig文件,被引用的文件被执行一遍,相当于拼接在一起,被引用的文件中不能存在自定义变量
--main.pig import '../examples/ch6/dividend_analysis.pig';daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);results = dividend_analysis(daily, '2009', 'symbol', 'open', 'close');
默认搜索文件夹为当前文件夹,可以使用set pig.import.search.path设置搜索的路径
set pig.import.search.path '/usr/local/pig,/grid/pig';import 'acme/macros.pig';