`
student_lp
  • 浏览: 428894 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论
阅读更多

1.   整体描述

Pig包括两部分:①用于描述数据流的语言,称为Pig Latin;②用于运行Pig Latin程序的执行环境,当前有两个环境:单JVM中的本地执行环境和hadoop集群上的分布式执行环境。

一个Pig Latin程序有一系列的操作和变换组成。每个操作或变换对输入进行数据处理,然后产生输出结果。这些操作整体上描述了一个数据流。Pig执行环境把数据流翻译为可执行的内部表示,并运行它。在Pig内部,这些变换操作转换成一些列mapReduce作业,作为程序员,多数情况下并不需要知道这些转换是如何进行的。

1.1. Pig产生的原因

Pig是一种探索大数据规模数据集的脚本语言。MapReduce的一个缺点是开发周期太长。编写mapper和reducer,对代码进行编译和打包、提交作业,获取结果,这整个过程非常耗时,即便使用streaming能在这个过程中去除掉代码的编译和打包步骤,但这个过程仍然很耗时。

Pig的诱人之处在于它能够用控制台上的五六行Pig Latin代码轻松处理TB级的数据。注意:雅虎90%的MapReducejob是Pig生成的。Twitter80%以上是Pig生成的。

1.2.  Pig的优势

Pig提供了多个命令用于检查和处理程序中的数据结构,因此,他能够很好的支持程序员写查询。

Pig支持在输入数据的一个有代表性的子集上试运行,这样一来,用户可以在处理整个数据集前检查程序中是否有错误。

Pig被设计为可扩展的。处理路径中的每个部分,载入、存储、过滤、分组、连接,都是可以定制的。这些操作都可以使用用户定义函数进行修改。且UDF比用MapReduce程序开发的库更易于重用。

1.3. Pig的劣势

Pig并不适合所有的数据处理任务,和MapReduce一样,他是为数据批处理而设计的。如果想执行的查询只涉及到一个大型数据集中的一小部分数据,Pig的实现不会很好,因为它要扫描整个数据集或者其中的很大一部分。

在有些情况下,Pig的表现不如MapReduce程序。例如执行效率和某些灵活度上。

1.4.  Pig与数据库的比较

Pig Latin和sql很相似。Group by和describe这样的操作更加强了这种感觉。但是,两种语言之间,以及Pig和RDBMS之间,有几个方面是不同的。

他们之间最显著的不同是:Pig Latin是一种数据流编程语言,而sql是一种描述型编程语言。换句话说,一个Pig Latin程序是相对于输入的一步步操作。其中每一步都是对数据的一个简单的变换。相反,SQL语句是一个约束的集合,这些约束结合在一起,定义了输出。在很多方面看,用Pig Latin编程更像在RDBMS中“查询规划器”这一层对数据进行操作。查询规划器决定了如何将描述型语句转化为一些列执行的步骤。

RDBMS把数据存储在严格定义了模式的表内。Pig对它所处理的数据要求则宽松的多:可以在运行时定义格式,而且这是可选的。本质上,Pig可以在任何来源的元组上进行操作。它使用UDF从原始格式中读取元组。最常用的输入格式是用制表符隔离的字段组成的文本文件。Pig为这种输入提供了内置加载函数。和传统的数据库不同,Pig并不提供专门的数据导入过程将数据加载到RDBMS。在第一步处理中,数据是从文件系统中加载的。

Pig对复杂、嵌套数据结构的支持也使其不同于能处理平面数据类型的SQL。Pig的语言能够和UDF以及流操作紧密集成。他的这一能力及嵌套数据结构,使Pig Latin比大多数SQL的变种具有更强的定制能力。

几个支持在线和低延时查询的特性是RDBMS有但Pig没有的,例如事务和索引。Pig并不支持随机读写和几十毫秒的查询。他也不支持对一小部分数据的随机写。和MapReduce一样,所有的写都是批量的、流式的写操作。

Hive介于Pig和传统的RDBMS之间。和Pig一样,hive也被设计为用HDFS作为存储。但是他们之间有着显著的区别。Hive的查询语言HiveQL,是基于SQL的。任何熟悉SQL的人都可以轻松使用HiveQL写查询。和RDBMS相同,Hive要求所有数据必须存储在表中,表必须有模式,而模式由Hive进行管理。但是,Hive允许为预先存在于HDFS的数据关联一个模式。所以,数据的加载步骤是可选的。和Pig一样,Hive也不支持低延时查询。

2.  Pig Latin

2.1.  Pig Latin结构

一个Pig Latin程序由一组语句构成,一个语句可以理解为一个操作,或一个命令。并且为保证语句的正确性,一般以分号作为结束符。例如:

grouped_records = GROUP records BY year;

Pig latin有一个关键词列表。其中的单词在Pig Latin中有特殊含义,不能用于标识符。这些单词包括操作(load,illustarte)、命令(cat,ls)、表达式(matches,FLATTEN)以及函数(DIFF,MAX)等。详见后面的介绍。

Pig Latin的大小写敏感性采用混合的规则。操作和命令是大小写无关的,而别名和函数名是大小写敏感的。

2.2.  Pig Latin注释

Pig Latin有两种注释方法。

①双减号表示单行注释。Pig Latin解析器会忽略从第一个减号开始到行尾的所有内容,例如:

        --my program

         DUMP A;

      ②C语言风格的注释更灵活。这是因为它使用/*和*/符号表示注释开始和结束。这样,注释即可以跨多行,也可以内嵌在某一行内,例如:

 /*
  *description of my program spaning
  *multiple lines.
  */
 A = LOAD ‘input/pig/join/A’;
 Dump A;

2.3.  Pig Latin语句

在Pig Latin程序执行时,每个命令按次序进行解析。如果遇到语法错误或者其它错误,例如无定义的别名,解析器会终止运行,并显示错误消息。解析器会给每个关系操作建立一个逻辑计划。逻辑计划构成了Pig Latin程序的核心。解析器把为一个语句创建的逻辑计划加到到目前为止已经解析完的程序的逻辑计划上,然后继续处理下一条语句。

特别需要注意的是,在整个程序逻辑计划没有构造完成前,Pig并不处理数据。

多查询执行:在交互模式下,STORE和dump一样,总是会触发语句的执行。但是在批处理模式下,他不会触发执行,这是为了性能考虑而进行的设计。在批处理模式下,Pig会解析整个脚本。例如:  

         针对一个关系可以用EXPLAIN命令查看Pig创建的逻辑和物理计划。EXPLAIN也会显示MapReduce计划,即显示物理操作是如何组成MapReduce作业的。这个办法,可以查看Pig会为你的查询组成多少个MapReduce作业。

2.4.  Pig Latin关系操作

2.5.  Pig Latin诊断操作

2.6. Pig Latin自定义函数辅助操作

为了在Pig脚本中使用用户自定义函数,Pig Latin提供了register和define语句。

2.7.  Pig Latin提供的hadoop交互指令

文件系统相关的命令可以对任何hadoop文件系统的文件或目录进行操作。这些命令非常像hadoop fs命令。并且可以使用Pig的fs命令访问所有的hadoop文件系统。

2.8.  Pig Latin表达式

2.9. Pig Latin类型

①  Pig原子类型:数值、文本和二进制类型。

Pig有四种数值类型:int、long、float、double。他们和java中对应的数值类型相同。

Pig有bytearray类型,它类似于表示二进制大对象的java的byte数组。

Pig有Chararray类型,它类似与用utf-16格式表示文本类型的java.lang.String。

Pig有chararray类型,可以用来加载或存储UTF-8格式的数据。

Pig没有一种类型对应于java的Boolean、byte、short、char。Java的这些数据类型都能方便的使用Pig的int类型或者chararray类型表示。

② Pig复杂类型:元组(tuple)、包(bag)、映射(map)。

复杂类型通常从文件加载数据或者使用关系操作进行构建。

映射类型只能从文件,因为Pig的关系操作不能创建映射。如果需要,还可以用udf生成映射。

虽然关系和包在概念上是相同的,但事实上Pig对他们的处理稍有不同。关系是顶层构造结构,而包必须在某个关系中。

3.   模式

      Pig中的一个关系可以有一个关联的模式。模式为关系的字段指定名称和类型。例如load语句和as子句就是用来关联模式与关系的,如下:
  

在上面的例子中,年份已经声明为整数类型的,而不是chararray类型。如果对年份这一字段进行算术操作,那么用整数类型更为合适;相反,如果执行把它作为一个标识符,那么chararray类型就更为合适。

      我们也可以完全忽略类型声明,例如:
  

在这个例子中,我们在模式中只定义了字段名称:year、temperature、quality。默认的数据类型为bytearray。他表示一个二进制串。

      不必为每一个字段都给出类型。可以让某些字段的类型为默认的字节数组,就像如下模式声明所示例子中的year字段:
  

但是,如果要用这种方式确定模式,必须在模式中定义每一个字段。同样,不能只确定字段的类型而不给出其名称。另一方面,模式本身是可选的。可以省略AS子句。如下所示:   

只能使用位置符号引用没有对应模式的关系中的字段:$0表示关系中第一个字段,$1表示第二个,以此类推。他们的类型都是默认的bytearray。  

虽然不为字段分配类型很省事,但如果指定字段的类型,我们能使Pig Latin程序更清晰,也使程序更高效。因此,一般情况下,建议指明字段的数据类型。

3.1.  验证与空值

在Pig中,如果一个值无法被强制转换为模式中声明的类型,Pig会用空值null代替。在输出到屏幕(或使用store存储)时,空值null被显示(或存储)为一个空位。

Pig会为非法字段产生一个警告,但是他不会终止处理。大数据集普遍都有被损坏的值、无效值或意料之外的值,因而逐步修正每一条无法解析的记录一般都不太现实。作为一种替代方案,我们可以一次性的把所有的记录都找出来,然后再去处理他们。我们可以修正我们的程序,把这些记录过滤掉。

3.2.  模式合并

在Pig中,不会为数据流中每一个产生的关系声明模式。在大多数情况下,Pig能够根据关系操作之输入关系的模式来确定输出结果的模式。

那么模式是如何传播到新关系的呢?有些关系操作并不改变模式。因此,limit操作产生的模式就和他所处理的关系的模式相同。对于其他操作,情况可能更复杂一些。例如,UNION操作将两个或多个关系合并成一个,并试图同时合并传入关系的模式。如果这些模式由于数据类型或者字段个数不相同而不兼容,那么UNION产生的模式是不确定的。

针对数据流中的任何关系,可以使用describe操作来获取他们的模式。如果要重新定义一个关系的模式,可以使用带AS子句的foreach…generate操作来定义输入关系的一部分或全部字段的模式。

4.   函数

Pig中的函数有四种类型:

  • 计算函数(Eval function):计算函数获取一个或多个表达式作为输入,并返回一个表达式。例如:MAX.
  • 过滤函数(Filter function):过滤函数式一类特殊的计算函数。这类函数返回的是逻辑布尔值。例如:IsEmpty.
  • 加载函数(Load function):加载函数指明如何从外部存储加载数据到一个关系。
  • 存储函数(Store function):存储函数指明如何把一个关系中的内容存到外部存储。

4.1.  Pig的内置函数


如果表中没有你需要的函数,可以自己写。但是在此之前,你可以看一下piggybank。这是一个pig社区共享的pig函数库。网址为:http://wiki.apache.org/pig/PiggyBank.

4.2.  用户自定义函数

    在自定义函数这一章节中,涉及到的知识比较多,而且所有的书籍中也仅仅是写了几个简单的例子,还根据例子说了一些注意事项,不是很全面。如果涉及到自定义函数的编写。建议查看如下两个网址:http://archive.cloudera.com/cdh4/cdh/4/pig-0.10.0-cdh4.1.0/udf.htmlhttp://pig.apache.org/docs/r0.11.1/udf.html。在这里有详细的讲解和例子。

4.2.1.  UDF实例

package com.cyou.pig.udfs;
 
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
 
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.FileInputLoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
 
public class DateRangeLoad extends FileInputLoadFunc {
         protected RecordReader reader = null;
        
         private String loadLocation;
         private String currentDate;
         private int offset = 0;
         private int range = 0;
    private byte fieldDel = '\t';
    private ArrayList<Object> mProtoTuple = null;
        
    private final TupleFactory mTupleFactory = TupleFactory.getInstance();
         private final Pattern p= Pattern.compile("%date");
         private final SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
        
         public DateRangeLoad(){
                   this.currentDate = df.format(new Date());
         }
         public DateRangeLoad(String currentDate){
                   this.currentDate = currentDate;
         }
         public DateRangeLoad(String currentDate,String range){
                   this(currentDate);
                   this.range = Integer.parseInt(range);
         }       
         public DateRangeLoad(String currentDate,String offset,String range){
                   this(currentDate,range);
                   this.offset = Integer.parseInt(offset);
         }
         public DateRangeLoad(String currentDate,String offset,String range,String delimiter){
                   this(currentDate,offset,range);
        if (delimiter.length() == 1) {
            this.fieldDel = (byte)delimiter.charAt(0);
        } else if (delimiter.length() >  1 && delimiter.charAt(0) == '\\') {
            switch (delimiter.charAt(1)) {
            case 't':
                this.fieldDel = (byte)'\t';
                break;
            case 'x':
               fieldDel = Integer.valueOf(delimiter.substring(2), 16).byteValue();
               break;
            case 'u':
                this.fieldDel = Integer.valueOf(delimiter.substring(2)).byteValue();
                break;
            default:
                throw new RuntimeException("Unknown delimiter " + delimiter);
            }
        } else {
            throw new RuntimeException("PigStorage delimeter must be a single character");
        }
         }
        
         @Override
         public void setLocation(String location, Job job) throws IOException {
                   Matcher m = this.p.matcher(location);
                   if(m.find()){
                            StringBuffer buf = new StringBuffer();
                            Calendar cal=Calendar.getInstance();
                            cal.set(Integer.parseInt(this.currentDate.substring(0, 4)), Integer.parseInt(currentDate.substring(4, 6))-1,Integer.parseInt(currentDate.substring(6, 8)));
                            cal.add(Calendar.DATE, this.offset);
                            buf.append("{").append(this.df.format(cal.getTime()));
                            for(int i=1;i<this.range;i++){
                                     cal.add(Calendar.DATE, -1);
                                     buf.append(",").append(this.df.format(cal.getTime()));
                            }
                            buf.append("}");
                            location = m.replaceAll(buf.toString());
                   }
                   System.out.println("--------------------------------------------------------------------------");
                   System.out.println(location);
                   System.out.println("--------------------------------------------------------------------------");
                   this.loadLocation = location;
                   FileInputFormat.setInputPaths(job,location);
         }
 
         @Override
         public InputFormat getInputFormat() throws IOException {
                   if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
            return new Bzip2TextInputFormat();
        } else {
            return new PigTextInputFormat();
        }
         }
 
         @Override
         public void prepareToRead(RecordReader reader, PigSplit split)throws IOException {
                   this.reader = reader;
         }
 
         @Override
         public Tuple getNext() throws IOException {
                   try {
                            boolean notDone = reader.nextKeyValue();
                            if (!notDone) {
                                     return null;
                            }
                            Text value = (Text) reader.getCurrentValue();
                            byte[] buf = value.getBytes();
                            int len = value.getLength();
                            int start = 0;
 
                            for (int i = 0; i < len; i++) {
                                     if (buf[i] == fieldDel) {
                                               readField(buf, start, i);
                                               start = i + 1;
                                     }
                            }
                            // pick up the last field
                            readField(buf, start, len);
 
                            Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple);
                            mProtoTuple = null;
                            return t;
                   } catch (InterruptedException e) {
                            int errCode = 6018;
                            String errMsg = "Error while reading input";
                            throw new ExecException(errMsg, errCode, PigException.REMOTE_ENVIRONMENT, e);
                   }
         }
         private void readField(byte[] buf, int start, int end) {
        if (mProtoTuple == null) {
            mProtoTuple = new ArrayList<Object>();
        }
 
        if (start == end) {
            // NULL value
            mProtoTuple.add(null);
        } else {
            mProtoTuple.add(new DataByteArray(buf, start, end));
        }
    }
}

5. Pig项目实例

stop_event = load '/user/hive/warehouse/yidong.db/stopevent/dateline=$date/*';
stop_orig = foreach stop_event generate $1 as appid,$2 as appver,($3=='iOS'? 1: ($3=='android'? 2 : 3)) as platform,$4 as channel,$7 as guid,$31 as duration,SUBSTRING($0,0,10) as time;
stop_group = group stop_orig by (appid,appver,platform,channel,guid,time);
stop_duration = foreach stop_group generate group.appid,group.appver,group.platform,group.channel,group.guid,0 as start_num:int,SUM(stop_orig.duration) as duration:int,group.time as ctime,group.time as ltime;
 
current_event = load '/user/hive/warehouse/yidong.db/startevent/dateline=$date/*';
current_orig = foreach current_event generate $1 as appid,$2 as appver,($3=='iOS'? 1: ($3=='android'? 2 : 3)) as platform,$4 as channel,$7 as guid,SUBSTRING($0,0,10) as time,1 as num;
num_group = group current_orig by (appid,appver,platform,channel,guid,time);
num_count = foreach num_group generate group.appid,group.appver,group.platform,group.channel,group.guid,COUNT(current_orig.num) as start_num:int,0 as duration:int,group.time as ctime,group.time as ltime;
 
time_event = distinct current_orig;
time_orig = foreach time_event generate appid,appver,platform,channel,guid,0 as start_num:int,0 as duration:int,time as ctime,time as ltime;
 
orig_orig = load '/user/hive/warehouse/yidong.db/stat_user' as (appid,appver,channel,platform:int,guid,start_num:int,duration:int,ctime,ltime);
 
current_union = union ONSCHEMA time_orig,num_count,stop_duration,orig_orig;
current_group = group current_union by (appid,appver,channel,platform,guid);
current_table = foreach current_group generate FLATTEN(group),SUM(current_union.start_num) as start_num:int,SUM(current_union.duration) as duration:int,MIN(current_union.ctime) as ctime,MAX(current_union.ltime) as ltime;
 
store current_table into '/user/hive/warehouse/yidong.db/stat_user_$date';
 
rmf /user/hive/warehouse/yidong.db/stat_user;
 
mv /user/hive/warehouse/yidong.db/stat_user_$date /user/hive/warehouse/yidong.db/stat_user;

 

  • 大小: 38.2 KB
  • 大小: 58.5 KB
  • 大小: 11.1 KB
  • 大小: 8.9 KB
  • 大小: 76.1 KB
  • 大小: 24.2 KB
  • 大小: 84.2 KB
  • 大小: 38.1 KB
  • 大小: 11.4 KB
  • 大小: 15.7 KB
  • 大小: 11.5 KB
  • 大小: 7.3 KB
  • 大小: 16.8 KB
  • 大小: 30.7 KB
  • 大小: 73.4 KB
分享到:
评论

相关推荐

    Hadoop实战-第二版-陆嘉恒 (2012版)

    Pig详解15. ZooKeeper详解16. Avro详解17. Chukwa详解18. Hadoop的常用插件与开发19. Hadoop在yahoo的应用附录A: 云计算在线监测平台附录B: Hadoop安装、运行、使用说明附录C:使用DistributedCache的MapReduce...

    Hadoop实战-第2版-陆嘉恒.pdf

    Pig详解15. ZooKeeper详解16. Avro详解17. Chukwa详解18. Hadoop的常用插件与开发19. Hadoop在yahoo的应用附录A: 云计算在线监测平台附录B: Hadoop安装、运行、使用说明附录C:使用DistributedCache的MapReduce...

    【技术分享】house of pig一个新的堆利用详解 .pdf

    【技术分享】house of pig一个新的堆利用详解 网络安全 安全开发 解决方案 APT web安全

    深入云计算:Hadoop应用开发实战详解

    本书由浅入深,全面、系统地介绍...本书的内容主要包括hdfs、mapreduce、hive、hbase、mahout、pig、zookeeper、avro、chukwa等与hadoop相关的子项目,各个知识点都精心设计了大量经典的小案例,实战型强,可操作性强。

    piggyxp小猪IOCP项目源码

    这份代码是我博客里的文章《完成端口详解 - 手把手教你玩转网络编程系列之三》的配套代码 里面的代码包括VC++2008/VC++2010编写的完成端口服务器端的代码,还包括一个对服务器端进行压力测试的客户端,都是经过我...

    Javascript ES6中对象类型Sets的介绍与使用详解

    介绍 ECMAScript 6(以下简称ES6)是JavaScript语言的下一代标准。因为当前版本的ES6是在2015年发布的,所以又称ECMAScript 2015。 Sets 是ES6(ES2015)中一个新的对象类型,用来创建一系列...animals.add(':pig_face:')

    Spring Boot 2.4 新特性之一键构建Docker镜像的过程详解

    背景 在我们开发过程中为了支持 Docker 容器化,一般使用 Maven 编译打包然后生成镜像,能够大大提供上线效率,同时能够快速动态扩容,快速回滚,着实很方便。...pig 微服务平台所有的容器化都是基于此构建 &lt;p

    RelativeLayoutDemo

    New UI-布局之RelativeLayout(相对布局)详解的demo

    Http通过响应头控制浏览器行为

    Android之Http通信——2.详解Http的消息头与响应头中的demo,简单的例子

Global site tag (gtag.js) - Google Analytics