kafka学习笔记

原理:(与rocketMQ对比,理解更深刻)
http://blog.csdn.net/chunlongyu/article/details/54018010

quickStart:
http://kafka.apache.org/quickstart

照着官网做就好了.起步简单.

#Master/Slave概念差异
Kafka:

Master/Slave是个逻辑概念,1台机器,同时具有Master角色和Slave角色。

RocketMQ:

Master/Slave是个物理概念,1台机器,只能是Master或者Slave。在集群初始配置的时候,指定死的。其中Master的broker id = 0,Slave的broker id > 0。

#Broker概念差异
Kafka:

Broker是个物理概念,1个broker就对应1台机器。

RocketMQ:

Broker是个逻辑概念,1个broker = 1个master + 多个slave。所以才有master broker, slave broker这样的概念。
那这里,master和slave是如何配对的呢? 答案是通过broker name。具有同1个broker name的master和slave进行配对。

RocketMQ不依赖ZK,是因为设计上进行了简化,不需要那么多选举.
用个简单的NameServer就搞定了,很轻量,还无状态,可靠性也能得到很好保证。

java并发编程实战笔记-6-7章

6.1节 背景

(为了引出Executor框架)
任务执行方式:

  1. 串行; 单线程,太慢;
  2. 完全并行; 每个任务一个线程,开销太大.
  3. 使用Executor框架. OK

6.2 Executor框架

java.util.concurrent提供的线程池.
可以通过实现Executor接口,自定义执行策略:

  1. 谁来执行;
  2. 执行顺序;(FIFO,LIFO,优先级)
  3. 并发度;
  4. 线程池容量(包括等待的);
  5. 什么时候拒绝任务,拒绝哪一个;
  6. 执行任务前后的操作.

或者直接使用Executors中编写好的线程池/执行策略:

1
2
3
4
5
newFixedThreadPool // 定长
newCachedThreadPool // 无限增长,但会复用原来的
newSingleThreadExecutor // 单线程
newScheduledThreadPool // 定长,但可以定时\延迟执行.
newWorkStealingPool // 用fork-join的,工作觅取的 // 1.8新增.

Executor的生命周期

为了以各种方式关掉Executor,库中写了ExecutorService:

1
2
3
4
5
6
7
8
9
public interface ExecutorService extends Executor {
void shutdown();//平缓得关闭,不再接收新任务,执行剩余的;
List<Runnable> shutdownNow();//取消运行中的和未执行的;
boolean isShutdown();//是否已经下达shutdown命令
boolean isTerminated();//是否完成了shutdown命令
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// ... 还有一些invoke和submit
}

ExecutorService生命周期有3种状态:
运行,关闭,已终止.

ExecutorService中的任务的生命周期:
创建,提交,开始,完成.

上述生命周期都是单向不可逆的.

线程池的局限

  1. 适用于同构任务,异构任务分解粒度不够细,提升不够大;

第七章 取消与关闭

背景:

java中无法简单\安全得停止取消某个线程;
需要使用中断(一种协作机制),从一个线程发出取消请求,中断另一个线程.因此其实需要被中断的线程预先提供安全停止\取消的方法,其中包括清理资源等操作.

取消策略:

  1. HOW: 其他线程如何请求取消;
  2. WHEN: 本线程何时受理取消请求;
  3. WHAT: 取消时具体要干什么.

Thread中的中断方法:

1
2
3
4
5
public class Thread{
public void interrupt(){}// 中断此线程
public boolean isInterrupted(){}//查询中断状态
public static boolean interrupted(){}// 查询中断状态,且清除中断.
}

中断策略

有些线程不支持取消,但可以支持中断.

中断的方法:

  1. 直接中断:

    1
    Thread.currentThread().interrupt();
  2. 限时任务:

    1
    2
    3
    Future<?>task=exec.submit(r);
    task.get(timeout,unit);
    task.cancel();
  3. 处理不可中断的阻塞:

(1)java.io包中的同步Socket I/O:
InputStreamOutputStream的read,write方法都不会响应中断.
中断方法: 关闭底层套接字,让read,write抛出SocketException.

(2)java.io包中的同步I/O:
中断方法:
中断InterruptibleChannel.抛出ClosedByInterruptExeception.
关闭InterruptibleChannel.抛出AsynchronousCloseException.

(3)Selector的异步I/O:(java.nio.channels)
中断方法:
close或wakeup方法. 抛出ClosedSelectorException.

(4)等待内置锁.
使用Lock类中的lockInterruptible方法.

取消策略的设计

1.设置取消信号量.

太山寨. 缺陷:
(1) while循环中不能有阻塞,否则无法取消.
因此需要检查while循环中的每一行代码,确保安全比较麻烦.

2.捕获中断异常.自定义存盘操作.

1
2
// 用cancel接口封装后调用.
public void cancel(){this.interrupt();}

3.任务交给ExecutorService托管.// 本质上调了interrupt.

4.使用毒丸对象.

缺陷: 仅当生产者消费者数量已知情况.无界队列场景下使用.

Executors保存进度

shutdownNow会返回尚未开始的线程列表. 无法获得中途取消的.
可以自己封装一遍ExecutorsService. 重写execute方法,捕获异常判断状态,记录取消的线程.
详见代码:
https://github.com/xiaoyue26/july/blob/master/src/main/java/practice/chapter7/TrackingExecutor.java

java并发编程实战笔记-4-5章

对象的组合

设计线程安全类

三个步骤:

  1. 找出构成对象状态的所有变量;
  2. 找出约束状态变量的不变性条件;//收集同步需求,例如哪些操作必须是原子性的.
  3. 建立对象状态的并发访问管理策略.

包装类

常见容器ArrayList不是线程安全的, 但可以通过Collections.synchronizedList方法转化成一个线程安全的容器.(实现上使用装饰器模式,封装对于底层对象的访问).
此时,只要这个包装类持有数据对象的唯一引用,则可以保证容器的安全.

监视器模式

通过一个私有的锁保护状态:

1
2
3
4
5
6
7
8
9
10
public class PrivateLock {
private final Object myLock=new Object();
@GuardedBy("myLock")
JRSUIConstants.Widget widget;
void someMothode(){
synchronized (myLock){
// do some thing
}
}
}

对象的组合

  1. 一个没有成员的对象A,无状态,因此是线程安全的;
  2. 当A中增加一个成员,如:
    1
    2
    3
    public class A{
    private final AtomicLong aa=new AtomicLong(0);
    }
    因为引用不可变,且AtomicLong是线程安全的,因此A依然是安全的.
    但如果增加多个线程安全成员,当且仅当它们独立的时候是安全的.
    如果不独立,例如某些aa和bb的取值组合是不合法的,则是不安全的.
    (例如上界下界都是AtomicLong.但不独立.)

委托:

此时A是否线程安全取决于aa,换言之,A的安全性委托aa来保证.

综上可知,尽量委托给一个线程安全的类或容器,避免所托非人.XD

基础构建模块

将线程安全委托给一个JDK实现好的同步容器后,如果需要进行复合操作,而且需要这几个操作具有原子性,依然需要自己加锁.这个时候就得知道底层的同步容器原先使用的锁是什么,才好照着写.

//每当遇到难题,都可以考虑生成一个静态镜像,线程局部变量,避开这个难题.实在不行只好升级手段,增加复杂度.

迭代器与ConcurrentModificationException

同步容器的迭代器在迭代之前会获取容器的锁,
如果迭代过程中,检测到容器有修改,则会抛这个异常.
(实现上,是通过计数器实现的,而且没有同步(性能考虑),因此也有可能没有意识到已经修改了,读了失效数据.)

  • 隐藏迭代器
    容器类的toString方法.直接打印容器的时候,会触发对容器内所有内容的迭代.
    其他类似的方法有:
    hashCode,equals,ContainsAll等等.
    以及容器整体作为一个key,存入另一个容器时.

同步容器和并发容器

同步容器:

HashTable,Vector,Collections.synchronizedXxx包装的.

  • 强一致性.
  • 迭代时锁容器,不允许修改.

并发容器:

ConcurrentMap,CopyOnWriteList,BlockingQueue,ConcurrentLinkedQueue.

  • 弱一致性.(size,isEmpty只返回近似值)
  • 迭代时一般只锁局部,容忍修改.
  • 性能高.

基本上应该使用并发容器代替同步容器.

两种队列的使用场景

  1. BlockingQueue:
    生产者消费者,queue.put(xxx),queue.take();
  2. BlockingDeque:(双端队列)
    工作队列\工作觅取(fork-join);完成了自己的工作队列后,从别人队列的尾巴觅取新的工作,均摊工作量.

阻塞方法与中断方法

阻塞状态:

BLOCKED,WAITING,TIMED_WAITING

如果一个方法签名抛出InterruptedException,则说明它是阻塞方法.
因为一个阻塞方法一般会被中断打断.

中断: Interrupt
查询线程是否中断: interrupt方法;
中断线程: 也是 interrupt方法.

当你的方法catch到了一个InterruptedException,两种处理方法:

  1. 传递. 接着往外抛;
  2. 恢复中断.
    当已经是最外层的时候,(在Runnable这一层了)
    就不能往外抛了,再抛线程就挂了.
    这个时候保持中断状态就好:(恢复中断)
    1
    2
    3
    4
    catch(InterruptedException e ){

    Thread.currentThread().interrupt();
    }

同步工具类

可以用于同步线程的工具类,包括:

BlockingQueue 阻塞队列
Semephore 信号量
Barrier 栅栏
Latch 闭锁

闭锁 Latch

作用相当于一道门,所有线程得等这扇门打开才能继续运行.
Latch是一次性的,打开后就不能再关上.

  • 使用的时候就像主线程设定了好了一扇门挡住起跑线,把所有工作线程挨个释放,一头撞到门上卡住了;等到某个时机放下门,瞬间释放所有线程;
  • 设置一扇门挡住主线程,让主线程等待子线程跑完;
  • 结束的时候,每一个线程到达终点就把计数器减一,直到大家都结束,再在终点释放主线程.
1
2
3
4
final CountDownLatch startGate=new CountDownLatch(1);
// 开始的门,只需主线程一人下令countDown,因此计数器为1;
final CountDownLatch endGate=new CountDownLatch(nThreads);
// 结束的门,需要n个子线程countDown,因此计数器为n.
  • 完整代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    public class TestLatch {

    public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
    final CountDownLatch startGate = new CountDownLatch(1);// countDown一次就能打开
    final CountDownLatch endGate = new CountDownLatch(nThreads);//countDown nThreads次才能打开(所有线程都countDown过)

    for (int i = 0; i < nThreads; i++) {// 先全部放出去,然后一头撞在startGate上;
    Thread t = new Thread(() -> {
    try {
    startGate.await();// 一头撞在startGate上
    try {
    task.run();
    } finally {
    endGate.countDown();//每个线程countDown一次
    }
    } catch (InterruptedException e) {
    e.printStackTrace();//ignore
    }
    });
    t.start();
    }
    long start = System.currentTimeMillis();
    System.out.println("准备开始");
    startGate.countDown();// 打开开始的门
    endGate.await();// 主线程等待全部结束. (n次countDown结束)
    System.out.println("结束等待");
    long end = System.currentTimeMillis();
    return end - start;

    }

    public static void main(String[] args) throws InterruptedException {
    Runnable task = new Runnable() {
    @Override
    public void run() {
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("running task");
    }
    };
    TestLatch testLatch = new TestLatch();
    long during = testLatch.timeTasks(10, task);
    System.out.println("Runtime: " + during);
    }

    }

FutureTask\Callable\Runnable

FutureTask三种状态: waiting,running,completed.

  • Runnable

    顾名思义,有个run函数,可以run.

  • Callable

    比Run多个返回值V.

  • FutureTask

    声明实现的接口是Runnable和Future.
    但实际上里头有个适配器,把Runnable转成Callable.
    而且它可以接收Callable作为构造函数的参数.
    运行的时候直接ftask.run()即可.
    取结果则是直接ftask.get()即可.

还可以把FutureTask传给Thread的构造函数\加入线程池接受调度.

感觉FutureTask<V>挺好用的.

第五章的原则总结:

  1. 尽量使用final;
  2. 不可变对象一定线程安全;
    尽量使用不可变对象(空间换时间);
  3. 尽量封装,把数据封装到对象中,以便以后构造不可变或者同步策略;
  4. 每一个可变对象,都要考虑用锁保护;
  5. 保护同一个不变性条件的所有变量时,使用同一个锁;
  6. 注意在复合操作上加锁;
  7. 不武断得认为不需要同步;
  8. 明确地指出(注解或注释)每个类是否线程安全.

疑问:

  1. Collections.unmodifiableMap(xxx)
    1
    2
    3
    4
    5
    6
    7
    final xxx;//引用是不可变的,如果是基本类型,就是值.
    final xxx = Collections.unmodifiableMap(xxx);
    /**不但引用是不可变的,第一级寻址对象也是不可更改的.
    调用map.put(xxx,xxx)会抛unSupported异常.
    但显然这不是万能的,用户也可以先用get获取到对象,然后再把对象里的数据给改了.(类似于二级寻址)
    因此里面如果存的是不可变对象,是可以的,否则还是可以改.
    */
  2. ConcurrentMap: 一个线程安全的容器. 保证多线程都能访问一级寻址对象.

java并发编程实践笔记-1-3章

避免多线程错误三个方法:
(1)不共享变量(ThreadLocal);//空间换时间
(2)共享变量设定为不可变(Immutable);//空间换时间
(3)使用同步(Synchronized).//时间换空间

tips:

java8stream在parallel中,不能操作非线程安全的类。

Synchronized

内置锁.
非静态方法synchronized: 对当前对象加锁;
静态方法synchronized: 对Class对象加锁.

同步块:

1
2
3
synchronized(this){
//do something
}

因为锁是加在某个对象上,所以叫做内置锁.线程在进入同步块的时候获得锁,出来的时候释放锁.

释放锁的情况:

  1. 正常退出
  2. 异常退出.
  • 优点:
    同步块中的代码能作为一个原子操作.

  • 缺点:
    只有一个线程能获得锁.性能较低.

可重入

重入: 当某个线程试图获取它自己已经持有的锁.
Synchronized内置锁是可重入的,意味着线程可以多次获取自己持有的内置锁,都可以获得成功.

实现:

锁在对象里,锁中记录持有者的线程和获取计数器.
计数器为0时,锁被认为不被任何线程所持有;
当线程请求一个计数器为0的锁时,锁中记录下锁的持有者,并且将计数器置为1.如果再一次进入同步块,计数器+1,退出一次同步块计数器-1,当计数器为0,锁被释放.

可见性

不加任何同步控制的变量,由于指令重排,多线程等因素,可见性无法保证,读线程可能读到的不是最新的值(读到失效数据).这种级别是最低安全性.

对于大部分基本类型来说,最低安全性是可以容忍的.但对于double,long来说,由于被更新的可能是数据的一半,除非使用volatile关键字等机制,否则读出来的可能不仅是失效数据,而是一个混合了失效数据\最新数据的近乎随机的值,可能带来毁灭性的后果.

volatile

作用: 保持可见性
缺点: 不保证操作的原子性. (需要原子性,应该使用锁或原子类)
使用场景:

  1. boolean值,状态变量;
  2. 只有一个线程写,其他线程读.
    或 写的时候不依赖原先的值(不是自增这种).

编译优化与变量

jvm在server模式下对循环内没有改变的变量进行优化,将其提出循环.
;在client模式下则不会有这种优化.

示例代码:

1
2
3
4
boolean otherDone;//此处应该volatile
while(!otherDone){
//sleep
}

上述代码如果不加volatile,则在server模式下可能会死锁,因为!otherDone的判断可能会被优化到循环外面.导致无限循环.

线程封闭

不在线程间共享变量,因此可以达到线程安全.
例如使用ThreadLocal.

栈封闭

线程封闭的特例.
由于每个线程有自己的栈,而局部变量在栈上,因此只使用局部变量的话,就是栈封闭.(达到线程安全)
由于基本类型无法获取引用,因此基本类型无法逸出,是安全的;
而对象引用可能逸出,因此需要编程人员自行保证不会逸出.
(不把引用乱传递)

发布与逸出

  1. 线程安全的叫发布;
  2. 不安全的叫逸出.

安全发布的两种途径:

  1. 发布的对象是不可变的;
  2. 发布的时候使用同步方法.

途径1

其中,不可变的方法:

private final,且返回clone或Arrays.copy的值.

途径2

把不可变对象安全发布的方法:

  1. 静态初始化函数中初始化一个对象引用;
  2. 把对象引用存放到volatile或AtomicReference;
  3. 把对象引用存放到正确构造对象的final类型域;
  4. 把对象引用存放到一个由锁保护的域中.(如放入同步容器中,包括HashTable,Vector,sychronizedMap,synchronizedMap,concurrentMap,CopyOnWriteArrayList,BlockingQueue,ConcurrentLinkedQueue).
  • 方法1: 静态初始化函数
    1
    public static Holder holder=new Holder(42);

对于不可变对象,安全发布后就可以用了;
对于可变对象,安全发布以后还需要安全得使用,也就是使用的时候也需要同步.

  • 总结:

    不可变对象: 任意发布
    事实不可变: 安全发布
    可变: 安全发布且安全使用.

建议

  1. 尽量使用final,private.
    其中final能保证初始化值过程中的安全性.

疑问

  1. immutable和threadLocal选哪个?

java泛型

上界<? extends T>
不能往里存,只能往外取

1
2
3
4
5
6
7
8
9
10
11
12
13
Plate<? extends Fruit>p= new Plate<Apple>(new Apple());
// 赋值后,p为一个容器,容器元素类型为CAP#1,是一种Fruit的子类.

// 写:
//不能存入任何元素:
p.set(new Fruit());//Error 无法确定CAP#1能否接住Fruit.
p.set(new Apple());//Error

// 读:
// 读出来的东西只能存放在Fruit及其基类中:
Fruit new1=p.get(); // 可以,CAP#1可以存放到Fruit中
Object new2=p.get(); // 可以
Apple new3=p.get();//Error

下界<? super T>
不影响往里存,但往外取只能放在Object对象里

1
2
3
4
5
6
7
8
9
10
Plate<? super Fruit>p=new Plate<Fruit>(new Fruit());
// 赋值后,p为一个容器,容器元素类型为CAP#1,是一种Fruit的基类.
// 写:
p.set(new Fruit());// 可以, CAP#1可以接住Fruit
p.set(new Apple());//基类指针可以存放子类对象

// 读:
Apple new1=p.get();//Error 不确定Apple能否接住CAP#1
Fruit new2=p.get();//Error
Object new3=p.get();

PECS原则
(Producer Extends Consumer Super)原则

  1. 频繁往外读取内容的,适合用上界Extends。
  2. 经常往里插入的,适合用下界Super。

java jdk源码阅读

Collections

sort,shuffle,等各种集合运算.
返回并发集合,同步容器,不可修改集合,等等.

Arrays

fill,等等.

Math. StrictMath

addExact
subExact
在加减溢出时可以抛异常.(Math.abs不抛异常)

1
2
3
4
5
6
7
8
9
10
11
12
// 位运算小技巧
// 1. x和y异号 <=> 异或后<0
(1)x^y <0
// 异号定义: 都不等于0, 异号.

(2)x和y同号 => x^y >=0
一般使用(1).
// 2. &
(1) x和y均小于0 <=> x&y < 0
(2) x和y均大于0 => x&y>=0
(3) x和y异号 => x&y>=0
// (2),(3)难以区分,一般使用(1) .

HTTP method 规范

HTTP method 规范

  • 创建题目
    /api/questions
    POST
  • 更新题目
    /api/questions
    PUT

  • 删除题目
    /api/questions
    DELETE

  • 获取题目
    /api/questions
    GET

HTTP4种方法的用法规范

  • POST(create)
  • PUT(createOrUpdate)
  • GET(get)
  • DELETE(delete)

解释一下PUT的createOrUpdate:
POST 和 PUT 都可以用于create, 不同的地方是PUT是指定id的, 如果该id对应的资源服务端已经存在则是update, 否则就是create. 而POST是不带id的, 每次POST都会创建一个新对象.

PUT的例子如收藏功能
所以PUT是幂等的, POST不是幂等的.
另外DELETE也是幂等的.
GET, DELETE 不能带payload.

层次化url:

GET /api/courses/12/questions/101 的接口表示获取课程Id=12 的一道Id=101的题目

服务端无request session

服务端不记录客户端一次会话过程的上下文信息, 如果业务上需要也是由客户端来记录上下文信息, 并在每一次请求中以参数(或cookie)的方式带上
所以服务端无状态

hive笔记

hive 小技巧

GROUP BY xxx WITH CUBE的时候,要区分是维度是total_count还是null,
可以用GROUPING__ID.
当GROUPING__ID的二进制在某列为0,则为total_count,否则是具体值导致的null. (换句话说就是需要二进制操作,代码会很复杂,需要udf)

hive 1.2.1 bug:

1
2
3
4
5
6
7
8
9
10
11
1. alter table temp.feng_test1  add COLUMNs (col2 string);
增加一列后,无法读取新的一列的数据.

现象: 使用alter语句增加一列,重新insert overwrite数据后,新增列的数据始终为null.

解决方案:
1. 对于外部表,可以通过重建整个表结构解决.
首先保存建表语句,然后drop table,最后重建整个表即可.

2. 对于有分区的内部表,可以通过重建分区解决,首先使用drop partition语句删除分区和数据,然后重跑这个分区的数据(或者事先备份好数据然后add partition). 注意,区别于直接使用insert overwrite partition语句.
3. 对于无分区的内部表,暂无特别好的办法,只能先把表数据备份到另一个目录,(备份表结构和数据)然后drop table,最后重建表即可.注意,区别于直接使用insert overwrite table语句.

hive传参数:

1
hive -d DATE=1 -e 'select ${DATE}'

hive 更新函数:

注意事项: DROP function xxx时,得保证创建函数时用的jar还在,不然可能导致更新函数不成功,每次运行时还是会加载原有的jar包.

1
2
3
4
5
6
7
8
9
10
11
SELECT thriftparser(answerresults, "com.fenbi.ape.hive.serde.thriftobject.conan.AnswerResults")
FROM ape.ori_mysql_ape_conan_task_task_report_da
limit 10
;

drop function thriftparser;

create function thriftparser AS 'com.fenbi.ape.hive.serde.ThriftSerdeUniversalUDF' USING JAR 'hdfs://f04/lib/ape-serde-1.1-SNAPSHOT.jar';

reload function;
reload functions;

从8088获取hive查询的详细语句:
http://f04:8088/proxy/application_1465276959835_417313/mapreduce/conf/job_1465276959835_417313
具体操作方法是先进入job的ApplicationMaster,具体jobid链接,左侧configuration,然后在右侧下方小字的key输入框,输入hive.query.string进行查询即可。

突然想到,对数据量小的表可以先做Distinct,数据量大的表可能得用group by。
前者用一个reduce即可,后者可以用多个reduce,跑得快一点。
http://idea.lanyus.com/
需要看的教程:
[https://issues.apache.org/jira/browse/HIVE-591]
http://git-scm.com/book/zh/v2
http://pcottle.github.io/learnGitBranching/?demo
smartGit
derby数据库是什么? 就是一个轻量级的数据库 好多apache项目都默认自带。

查看所有函数/查看指定函数用法。

1
2
SHOW FUNCTIONS; 
desc function find_in_set;
  • sql code style flow:
    1
    2
    3
    4
    5
    SELECT
    FROM
    WHERE
    GROUP
    HAVING

hbase是直接查询(KV);
hive会转化为map-reduce任务。
pig是定制reduce部分。

hive使用列式存储,使用dt=?指定partition会加快查询速度,像索引一样。列式存储基础知识
列式存储天生适合压缩

  • 列式存储天生适合压缩(相同数据类型)
  • 列式存储数据库对分布式支持友好

投影等操作的对比

  • 自然连接是一种去重复列后的等值连接,它要求两个关系中进行比较的分量必须是相同的属性组,并且在结果中把重复的属性列去掉。而等值连接并不去掉重复的属性列。
  • 选择->限制->where—>指定行;投影->select->指定列。

列式存储查询流程:
列式存储查询流程

  1. 根据where的行限制,去字典表里找到字符串对应数字(只进行一次字符串比较)。
  2. 用数字去列表里匹配,匹配上的位置设为1。
  3. 把不同列的匹配结果进行位运算得到符合所有条件的记录下标。
  4. 使用这个下标,再根据select语句中指定的列,组装出最终的结果集。

$HOME/.hiverc目录下可设定启动脚本。
set hive.metastore.warehouse.dir/=...

查看hive版本:

1
2
3
4
5
6
7
8
9
hive>set hive.hwi.war.file;
$ echo $HIVE_HOME
/home/maintain/hive/apache-hive-0.14.0-bin

#终止:
ctrl+c (不是win+c)
两次。
若未执行完毕则终止的是jvm;
若执行完毕则返回结果在文件系统里,切断的是与文件系统的联系。

设定变量:

1
2
3
set foo=bar2;
set hivevar:foo=bar2; #等效
set foo;# 查看结果

hive中可使用shell命令等:(也是分号结尾)

1
2
3
4
! /bin/echo "hello world";
! pwd
dfs -ls / ;
-- copyright ... #注释以`--`开头

注释以--开头 比较特殊。

各种分隔符p46

一个概念:读时模式
传统数据库是写时模式,在写入的时候进行模式检查、有效性验证。
hive读时模式,就是加载数据的时候才进行验证,尽可能恢复数据的有效性和合法性。对于错误的类型返回null,如数值类型中存放了字符串。

hive不支持行级操作、不支持事务。

可以重复使用Use指令,hive没有嵌套数据库。

递归删除数据库
drop database if exists xxxbase cascade;

支持正则:(show databases里用不了)
show tables 'empl.*';
mysql 里是:show tables like '%empl%' ;

输出表信息:
describe formatted mydb.employees;

set hive.stats.reliable=false是什么含义,设定为暂时不可用?
这个是把状态收集器关掉,效果是在insert overwrite的时候,不去重。

查看是管理表还是外部表:
describe formatted mydb.employees;
输出信息的中部有这一项(table_type).

创建相同表结构但没有数据的表:

1
2
3
create table if not exists mydb.employees3
Like mydb.employees
Location '/path/to/data';

创建带分区的表:

1
2
3
4
5
6
7
8
9
10
11
create table employees(
name STRING,
salary Float,
subordinates Array<String>,
deductions map<string,float>,
address struct<street:string,
city:string,
state:string,
zip:int>
)
partitioned by (country string,state string);

hive会按照分区创建目录。然后不在字段中存储分区信息。

查看已有的分区设置:

1
show partitions tmp_tutor_user_profile;

设置查询时是否必须指定分区:

1
2
set hive.mapred.mode=strict;
set hive.mapred.mode=nonstrict;

载入数据的方式创建分区:p60

hive不储存建表语句?(Navicat for MySQL中的对象信息)。
show create table test1 里有.

增加分区信息:

1
2
3
4
alter table log_messages 
add partition
(year=2015,month=1,day=2)
location 'hdfs://master_server/data/log/2012/01/02';

emailUtils

  • 大部分alter操作不会改变数据,只改变元数据。
    1
    2
    3
    alter table log_messages
    partition(year=2012,month=12,day=2)
    set location 's3n://ourbucket/logs/2011/01/02';
    上述命令不会把数据从旧的路径转移或者删除。

  • 例外:

    1
    2
    3
    alter table log_messages 
    drop if exists partition
    (year=2012,month=12,day=2);

    上述语句会删除管理表中的数据,而不会删除外部表中的数据。

  • 对字段进行重命名、修改位置、类型和注释:

    1
    2
    3
    4
    alter table log_messages
    change column hms hours_minutes_secends INT
    comment 'the hours,minutes and seconds part of the timestamp'
    after severity;
  • 装载数据:

    1
    2
    3
    load data local inpath '${env:HOME}/california-employees'
    overwrite into table employees
    partition (country='US',state='CA');

    如果分区目录不存在,命令会先创建分区目录然后再将目录装载(拷贝)到该目录下。hive会检查文件格式是否与表结构相符,但不会检查数据是否和表模式匹配。(读时模式)

hdfs中装载数据时,不使用local关键字。

p133中? 存疑. into? table?
应该是into.

1
2
load data local inpath 'log2.txt' into weblogs 
partition(20110102);
  • 动态分区

    1
    2
    3
    4
    insert overwrite table employees
    partition (country,state)
    select ..., se.cnty, se.st
    from staged_employees se;

    混合动态和静态分区:

    1
    2
    3
    4
    5
    6
    insert overwrite table employees
    partition (country='US',state)
    select ..., se.cnty, se.st
    from staged_employees se
    where se.cnty='US'
    ;

    静态分区必须在动态分区前。

  • 属性设定:

    属性名称 缺省值 描述
    hive.exec.dynamic false 设置true即开启动态分区功能
    hive.exec.dynamic.mode strict nonstrict表示运行所有分区都是动态的
    hive.exec.max.dynamic.partitions.pernode 100 每个mapper或reducer可以创建的最大动态分区个数
    hive.exec.max.dynamic.partitions +1000 一个语句可以创建的最大动态分区个数
    hive.exec.max.created.files 100000 全局可以创建的最大文件个数
  • 导出数据

  1. 格式相同:
    1
    hadoop fs -cp source_path target_path
  2. 格式不同:
    1
    2
    3
    4
    5
    insert overwrite local directory 'tmp/ca_employees'
    select name, salary, address
    from employees
    where se.state='CA'
    ;
  3. 多输出目录:
    1
    2
    3
    4
    5
    6
    7
    8
    from staged_employees se
    insert overwrite directory '/tmp/or_employees'
    select * from where se.cty='US' and se.st='OR'
    insert overwrite directory '/tmp/ca_employees'
    select * from where se.cty='US' and se.st='CA'
    insert overwrite directory '/tmp/il_employees'
    select * from where se.cty='US' and se.st='IL'
    ;
  • 查看结果文件内容:
    1
    2
    ! ls /tmp/ca_employees;
    ! cat /tmp/ca_employees/000000_0 ;

取整:round (salary)
数学函数p82

1
2
byte int short long 1,2,4,8
boolean char float double 1,2,4,8
  • 部分内置函数
    返回值类型 样式 描述
    type cast(expr as type) 把expr转换为type类型
    string concat(binary s1,binary s2,...) 拼接字符串
    string concat_ws(string separator, string s1,string s2,...) 使用指定分隔符拼接字符串

set hive.exec.mode.local.auto;这个值为什么是false?

由于浮点数的不准确性,与钱有关或者涉及到比较的关键数字都不使用浮点数,使用string.

  • 正则
    • like使用sql通配符(%abc% )
    • Rlike使用java正则表达式(.*(abc).*)

  • 聚合函数、表连接:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    select year(ymd), avg(price_close) from stocks
    where exchange='NASDAQ' AND symbol='AAPL'
    group by year(ymd)
    having avg(price_close)>50.0
    ;
    # 注释:'--'
    -- inner join: 表应从小到大排列以便hive优化;
    select * from a.ymd, b.price_close
    from stocks a join stocks b on a.ymd=b.ymd
    where a.symbol='APPL'
    AND b.symbol='IBM'
    ;

    -- left outer join 左外连接
    # 左表中符合where子句的所有记录将返回
    # 右表中不符合on连接的列将返回NULL
    select * from a.ymd, b.price_close
    from stocks a left outer join dividend b on a.ymd=b.ymd
    where a.symbol='APPL'
    ;
    -- 执行顺序, 先执行join,再使用where进行过滤。

    -- 嵌套select:
    select s.*, d.* from
    (select * from stocks where ...) s
    left outer join
    (select * from dividend where...) d
    on s.ymd=d.ymd
    ;

    --类似的, 右外连接 right outer join
    -- 全连接 full outer join

    -- 左半开连接: left semi-join
    select s.* from stocks s
    left semi join
    dividends d
    on s.ymd=d.ymd and s.symbol=d.symbol
    ;
    #返回左边表的记录,用on中的条件进行过滤。
    #比inner join(直接join)更高效,但不能返回右表中的数据。

    -- 笛卡尔积: 没有on语句的join
  • map-side join
    通过设置set hive.auto.convert.join=true;开启。

  • distribute by
    distribute by控制mapper的输出在reducer中是如何划分的。

    1
    2
    3
    4
    select * from stocks s
    distribute by s.symbol
    sort by s.symbol asc, s.ymd asc
    ;

    此处distribute by指定具有相同股票交易码的记录会分发到同一个reducer中进行处理。cluster by s.symbol相当于distribute by s.symbol sort by s.symbol desc的简写,只支持降序。

order by保证全局有序;
sort by只保证每个reducer的任务局部有序。

  • 抽样查询

    1
    2
    3
    4
    5
    6
    7
    select * from numbers 
    tableSample(bucket 3 out 10 on rand())
    s;
    -- 指定列分桶:
    select * from numbers
    tableSample(bucket 3 out 10 on number)
    s;
  • 数据块抽样(百分比)

    1
    2
    3
    4
    5
    select * from numbersflat TableSample(0.1 percent)
    s;

    -- 最小抽样单元是一个hdfs数据,所以数据大小小于128MB时会返回所有数据
    -- 不一定适用于所有文件格式

哈斯分区是什么? p115
分隔符的可读性为何这么低还是不清楚。(?)

1
2
3
-- 两种顺序调换的意义?
from...insert...select...
from...select...

示例文件夹:
/Users/xiaoyue26/Documents/pipe_warehouse/solar/dwSolarUserStat

  • 索引
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    create Index employees_index
    on Table employees (country)
    as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
    with Deferred Rebuild #意思不要立刻开始创建索引的数据,只是先声明一个索引
    -- 新索引呈空白状态
    IDXproperties ('creator'='me', 'created_at'='some_time')
    in table employees_index_table #可选 也可以把索引建成一个文件
    partitioned by (country, name)
    comment 'Employees indexed by country and name.'
    ;
  1. 排重后值较少的列可使用Bitmap索引(As 'BITMAP');
  2. 重建索引:
    1
    2
    3
    4
    5
    alter index employees_index
    on table employees
    partition (country='US')#指定重建某分区的索引,如果省略会重建所有分区的索引。
    rebuild
    ;
  3. 显示索引:
    1
    2
    show formatted index on employees
    ;
  4. 删除索引
    1
    2
    3
    drop index if exists employees_index
    on table employees
    ;
    如果原表被删除了,对其建立的对应的索引和索引表也会被删除。
    如果原始表的某个分区被删除了,这个分区对应的分区索引也会被删除。

?:书上日期用的是数字’20110102’(int),为何咱们用的是字符串’2015-11-02’?

NameNode将所有系统元数据信息保存在内存中。一个hdfs实例所能管理的文件总数是有上限的,而MapRAmazon S3则没有这个限制。

hive数据库和关系型数据库区别:

关系型 hive
唯一键\主键\自增键
三范式(ACID) 单行中存储一对多关系,一致性较差,但I/O性能高(连续存储)
普通数据结构 集合结构(array,map,struct)
`ACID`:原子性、一致性、隔离性、持久性。
  • 一次扫描多次输出
    1
    2
    3
    4
    5
    6
    from history
    insert overwrite sales
    select * where action='purchased'
    insert overwrite credits
    select * where action='returned'
    ;

ETL,Extraction-Transformation-Loading的缩写,
中文名称为数据提取、转换和加载。

ETL工具有:
OWB(Oracle Warehouse Builder)、ODI(Oracle Data Integrator)、Informatic PowerCenter、Trinity、AICloudETL、DataStage、Repository Explorer、Beeload、Kettle、DataSpider

ETL负责将分散的、异构数据源中的数据如关系数据、平面数据文件等抽取到临时中间层后进行清洗、转换、集成,最后加载到数据仓库或数据集市中,成为联机分析处理、数据挖掘的基础。

  • 引用hiveconf变量:
    1
    2
    3
    4
    5
    6
    $ hive -hiveconf dt=2011-01-01
    insert overwrite table distinct_ip_in_logs
    partition (hit_data=${dt})
    select distinct(ip) as ip from weblogs
    where hit_date='${hiveconf:dt}'
    ;

如何获得表中已有数据的规模信息(不用count)?
先用show create table test1查询到表在hdfs上的location,
然后用dfs -du -h hdfs://location1查询到文件大小。

  • 分桶数据存储
  1. 建表:
    1
    2
    3
    4
    create table weblog (user_id INT,url String, source_ip String)
    Partitioned by (dt string)
    clustered by (user_id) into 96 buckets
    ;
    使用user_id字段作为分桶字段,表数据按字段值哈希值分发到桶中。同一个user_id下的记录通常会存储到同一个桶内。假设用户数比桶数要多,那么桶内就将会包含多个用户的记录。

结合p109map-side join学习。

2.分桶后插入数据:

1
2
3
4
5
6
7
8
set hive.enforce.bucketing=true;

From raw_logs
insert overwrite table weblog
partition (dt='2009-02-25')
select user_id,url,source_ip
where dt='2009-02-25'
;

3.分桶表连接优化开启:

1
2
3
4
5
6
7
8
set hive.optimize.bucketmapJOIN=true;
#待连接的两个表分桶数量呈倍数关系时可优化。
# 且on语句中连接键与分桶键相同。
#sort-merge JOIN:
#分桶数完全相同时:
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin=true;
set hive.optimize.bucketmapjoin.sortedmerge=true;

SerDe 全称是 Serializer and Deserializer
HDFS files –> InputFileFormat –> ‘<’key, value> –> Deserializer –> Row object
Row object –> Serializer –> ‘<’key, value> –> OutputFileFormat –> HDFS files

1
2
3
4
5
6
7
st=>start: HDFS files
e=>end: 结束
op1=>operation: InputFileFormat
op2=>operation: '<'key,value>
op3=>operation: Deserializer
op4=>operation: Row object
st->op1->op2->op3->op4->e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
explain select * from tmp_table1 limit 100;
OK #执行返回码
STAGE DEPENDENCIES:
Stage-0 is a root stage

STAGE PLANS:
Stage: Stage-0
Fetch Operator
limit: 100
Processor Tree:
TableScan
alias: tmp_table1
Statistics: Num rows: 0 Data size: 72 Basic stats: PARTIAL Column stats: NONE
Select Operator
expressions: id (type: string), perf (type: map<string,int>)
outputColumnNames: _col0, _col1
Statistics: Num rows: 0 Data size: 72 Basic stats: PARTIAL Column stats: NONE
Limit
Number of rows: 100
Statistics: Num rows: 0 Data size: 72 Basic stats: PARTIAL Column stats: NONE
ListSink

Time taken: 0.11 seconds, Fetched: 20 row(s)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
hive> explain select count(*) from tmp_table1 limit 100;
OK
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: tmp_table1
Statistics: Num rows: 0 Data size: 72 Basic stats: PARTIAL Column stats: COMPLETE
Select Operator
Statistics: Num rows: 0 Data size: 72 Basic stats: PARTIAL Column stats: COMPLETE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
TopN Hash Memory Usage: 0.3
value expressions: _col0 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: _col0 (type: bigint)
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
Limit
Number of rows: 100
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: 100
Processor Tree:
ListSink

Time taken: 0.055 seconds, Fetched: 50 row(s)

explain extended

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
explain extended select count(*) from tmp_table1 limit 100;
OK
ABSTRACT SYNTAX TREE:

TOK_QUERY
TOK_FROM
TOK_TABREF
TOK_TABNAME
tmp_table1
TOK_INSERT
TOK_DESTINATION
TOK_DIR
TOK_TMP_FILE
TOK_SELECT
TOK_SELEXPR
TOK_FUNCTIONSTAR
count
TOK_LIMIT
100


STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: tmp_table1
Statistics: Num rows: 0 Data size: 72 Basic stats: PARTIAL Column stats: COMPLETE
GatherStats: false
Select Operator
Statistics: Num rows: 0 Data size: 72 Basic stats: PARTIAL Column stats: COMPLETE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
tag: -1
TopN: 100
TopN Hash Memory Usage: 0.3
value expressions: _col0 (type: bigint)
auto parallelism: false
Path -> Alias:
hdfs://f04/user/hive/warehouse/temp.db/tmp_table1 [tmp_table1]
Path -> Partition:
hdfs://f04/user/hive/warehouse/temp.db/tmp_table1
Partition
base file name: tmp_table1
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
COLUMN_STATS_ACCURATE true
bucket_count -1
colelction.delim ,
columns id,perf
columns.comments
columns.types string:map<string,int>
field.delim
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://f04/user/hive/warehouse/temp.db/tmp_table1
mapkey.delim :
name temp.tmp_table1
numFiles 1
serialization.ddl struct tmp_table1 { string id, map<string,i32> perf}
serialization.format
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 72
transient_lastDdlTime 1438742089
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
COLUMN_STATS_ACCURATE true
bucket_count -1
colelction.delim ,
columns id,perf
columns.comments
columns.types string:map<string,int>
field.delim
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://f04/user/hive/warehouse/temp.db/tmp_table1
mapkey.delim :
name temp.tmp_table1
numFiles 1
serialization.ddl struct tmp_table1 { string id, map<string,i32> perf}
serialization.format
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 72
transient_lastDdlTime 1438742089
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: temp.tmp_table1
name: temp.tmp_table1
Truncated Path -> Alias:
/temp.db/tmp_table1 [tmp_table1]
Needs Tagging: false
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: _col0 (type: bigint)
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
Limit
Number of rows: 100
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
GlobalTableId: 0
directory: hdfs://f04/home/maintain/hive/hive/tmp/hive-maintain/maintain/13951f1a-3bd6-481f-a901-1b2c185ec877/hive_2015-11-05_14-37-47_859_3395572406114851553-1/-ext-10001
NumFilesPerFileSink: 1
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
Stats Publishing Key Prefix: hdfs://f04/home/maintain/hive/hive/tmp/hive-maintain/maintain/13951f1a-3bd6-481f-a901-1b2c185ec877/hive_2015-11-05_14-37-47_859_3395572406114851553-1/-ext-10001/
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
columns _col0
columns.types bigint
escape.delim \
hive.serialization.extend.nesting.levels true
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
TotalFiles: 1
GatherStats: false
MultiFileSpray: false

Stage: Stage-0
Fetch Operator
limit: 100
Processor Tree:
ListSink

Time taken: 0.059 seconds, Fetched: 143 row(s)
1
2
hive.exec.reducers.max=
(集群总reduce槽位个数*1.5)/(执行中的查询平均个数)

若小任务多可开启优化:推测执行和JVM重用。

虚拟列?

I/O密集型该使用压缩;CPU密集型任务则不然。

sequence file存储格式: 压缩且可分。

CLI会话中通过set命令设置的属性在同一个会话中会一直生效的。


show functions;列出所有函数(包括用户自定义函数(UDF))。
describe function extended concat;展示函数的简单介绍。
UDTF自定义表生成函数;
UDAF自定义聚合函数。


pythonmap-reduce:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# mapper.py:
import sys
for line in sys.stdin:
words=line.strip().split()
for word in words:
print "%s\t" % (word.lower())
#reducer.py:
import sys

(last_key, last_count)=(None,0)
for line in sys.stdin:
(key,count)=linde.strip().split("\t")
if last_key and last_key!=key:
print "%s\t%d" % (last_key, last_count)
(last_key, last_count)=(key, int(count))
else:
last_key=key
last_count+=int(count)
if last_key:
print "%s\t%d" % (last_key, last_count)

使用transform关键字调用脚本,省得写UDF:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
create table docs (line String)
;
create table word_count (word String, count Int)
Row format delimited fields teminated by '\t'
;
from(
from docs
select transform (line) using '${env:HOME}/mapper.py'
as word,count
cluster by word) wc
insert overwrite table word_count
select transform (wc.word,wc.count)
using '${env:HOME}/reducer.py'
as word, count
;

  • 自定义序列化:
    json SerDe:P214
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    create external table messages(
    msg_id bigint,
    tstamp string,
    text string,
    user_id bigint,
    user_name string
    )
    row format SerDe "org.apache.hadoop.hive.contrib.serde2.JsonSerde"
    with serdeProperties(
    "msg_id"="$.id",
    "tstamp"="$.created_at",
    "text"="$.text",
    "user_id"="$.user.id",
    "user_name"="$.user.name"
    )
    Location '/data/messages'
    ;

1
2
3
4
hive -hiveconf start_date='2015-11-15' -hiveconf end_date='2015-11-21' -f ./comments_data.hql

solarWarehouse
userstat函数

这个地方错好多次了:

left join 不在on里使用where

hive udf

中位数算法:
https://segmentfault.com/a/1190000008322873
http://www.voidcn.com/article/p-yzrnuwwi-bdv.html

hive类型和java类型

Hive column type UDF types
string java.lang.String, org.apache.hadoop.io.Text
int int, java.lang.Integer, org.apache.hadoop.io.IntWritable
boolean bool, java.lang.Boolean, org.apache.hadoop.io.BooleanWritable
array java.util.List
map java.util.Map
struct Don't use Simple UDF, use GenericUDF

UDAF

http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html
GenericUDAFEvaluator的几种MODE定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static enum Mode {
/**
* PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
* 将会调用iterate()和terminatePartial()
*/
PARTIAL1,
/**
* PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
* 将会调用merge() 和 terminatePartial()
*/
PARTIAL2,
/**
* FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合
* 将会调用merge()和terminate()
*/
FINAL,
/**
* COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
* 将会调用 iterate()和terminate()
*/
COMPLETE
};

特别提醒:
Merge函数的输入参数是(State other),因为other对象会被复用,因此other里的成员的使用不能是浅拷贝,(比如直接塞到当前state里),可以用深拷贝.(对于List,Map等容器)

系统内置的UDAF函数可以从:FunctionRegistry类中查看.
流程控制更为精细的UDAF,AVG:
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage

hive调用java方法:
select java_method(“java.lang.Math”,”max”,1,2);

UDF数据类型相关

1
2
3
select array(1,2,3) -- => [1,2,3]
select array(1,'2',3) -- => ['1','2','3'] 只要有一个string,就全是string
select array(1,NULL,3) -- => [1,NULL,3] NULL则不影响类型转换.

Hive Serde

Serde相关

https://www.coder4.com/archives/4031
反序列化原理(Deserialize):

1
HDFS files –> InputFileFormat –> <key, value> –> Deserializer –> Row object

序列化原理(Serialize):

1
Row object –> Serializer –> <key, value> –> OutputFileFormat –> HDFS files

hive的serde内置:
Avro
ORC
RegEx
Thrift
Parquet
CSV
JsonSerDe

自定义Serde:

需要extends AbstractSerDe, 实现6个方法:
initialize
,deserialize,getSerializedClass,serialize
,getSerDeStats,getObjectInspector
参见Hive1.2.1的openCSV实现,可以总结各个方法的作用.

1.initialize方法:

1
2
读取hadoop配置和表配置(建表语句中的),准备一些和元数据有关的数据.
比如所有列的类型,输出列的size,分隔符等等.将这些信息存到对象里备用.

2.serialize方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
序列化. 
由原理可知,输入是一个Row对象,输出是一些Key,value对,以便被OutputFileFormat解析.
由于OpenCSV使用的OutputFileFormat是
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
因此,这个方法的输出要与之吻合.
/**
* 输入: 行对象 (行数据Obj,行类型ObjOI)
* 输出: Key,value. 要和hive默认的outputFileFormat吻合.
* 即要和 org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
* 的输入相同. 而这个类是忽略Key的,因此这个方法的实际输出只有Value.
* */
OpenCSV里的大致内容就是把行类型ObjOI转成StructOI,
然后把每一列的类型取出来,再用类型把数据取出来,再把数据全转成String,
传给CSVWriter,写到一个StringWriter里,然后转成一个Text返回.

3.deserialize方法:

1
2
3
4
5
6
7
8
反序列化.
由原理可知,输入是一个Key,Value,输出是一个行对象.
其中Key,Value由InputFileFormat生成,对于OpenCSV来说就是默认的Hive设定:
org.apache.hadoop.mapred.TextInputFormat
输出实际上是一个List<String>.

OpenCSV里的大致内容就是把输入转成Text再转成String然后用CSVReader读出来,
写到一个String[]里,再存到一个List<String>里(补一些缺的,确保列数一样),然后返回.

4.getObjectInspector方法:

1
2
3
返回整个表的结构类型OI.
从内容上看返回了StructOI.(包括列名和列数,每一列的类型)
对于OpenCSV来说每一列都是String.

5.getSerializedClass方法:

1
2
3
4
5
获取写的时候返回的类.
对于OpenCSV来说,返回Text.class.
因为它的Serialize方法的下游是hive的
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
而且Serialize的实现实际上也返回了一个Text.(声明的只是Writable)

6.getSerDeStats方法:

1
2
3
4
5
6
7
8
9
返回序列化类的写入状态信息,具体包括RawDataSize和RowCount. 也就是原始数据大小和行数.
Hive1.2.1实现的不是很健全.自己用得也很少.
只在两处提取了这个信息:
1. FileSinkOperator. 写入数据的时候获取了一下RawDataSize.
2. MapOperator. 读取数据的时候获取了一下RawDataSize.

甚至对于openCSV,它直接返回了一个null.
对于Hive1.2.1,我们自定义实现Serde的时候,也可以返回null,因为Hive1.2.1的源码中对于这个返回值都是有null判定的. 但如果要对性能做进一步研究,要记录这两个数据,可以参考
LazySimpleSerDe中的实现.