hive抽样

有时候需要快速估计值,可以借助抽样.Hive中抽样的方法:

抽取指定比例(如10%):

1
SELECT * FROM t_name TABLESAMPLE(0.1 PERCENT) s

抽取指定大小(如30M):

1
SELECT * FROM t_name TABLESAMPLE (30M)

块抽样,每个InputSplit抽取10行:

1
SELECT * FROM t_name TABLESAMPLE(10 ROWS) s

桶抽样:(要求原表是分桶表)

1
SELECT * FROM t_name_2 TABLESAMPLE(BUCKET 1 OUT OF 10 ON pcid);

一致性抽样:

上述抽样每次运行都是随机选取,所以结果每次都不同.
如果要求每次抽样的结果是一样的,可以使用随机数发生器的伪随机性,进行系统抽样:

1
2
SELECT * FROM t_name
WHERE rand(unix_timestamp(concat(fdate,' ',ftime)))<=0.1 -- 抽取10%

上述代码中将表中的两列(fdate,ftime)转化为随机数发生器的种子.
如果想让每次抽样的结果不同,也可以将种子换成当前的时间戳,比较灵活.

抽样后,可以对样本进行计算,估计总体的相应统计量.
如果是均值,可以用t检验获取总体的置信区间:

1
u±S/√n*t(1-0.5a,n-1)

如果是中位数,相应的置信区间公式如下:

1
2
上界位次: S1=(n-Z(a)*√n)/2
下界位次: n-floor(S1)+1

其他统计量的置信区间可以自行搜索,例如次序统计量: https://wenku.baidu.com/view/6526f76a7e21af45b307a861.html

高性能Mysql笔记-全文索引

MyISAM的全文索引(第七章)

有很多限制,比较弱.

  • 作用对象:

    1
    全文集合: 将需要索引的列全部拼接成一个字符串,然后进行索引.

    由于是对列进行拼接,因此无法设置哪一列更重要.(没有权重)

  • 实现:

    1
    2
    3
    B-Tree索引,两层.
    第一层: 所有关键字;
    第二层: 一组相关的文档指针.

    全文索引不会索引文档对象中的所有词语,过滤规则如下:

  1. 停用词. 默认按通用英语的使用,可以通过参数ft_stopword_file指定;
  2. 长度条件. ft_min_word_len <=len <= ft_max_word_len.

7.10.1 自然语言的全文索引

文档对象和查询的相关度.
相关度= f(匹配的关键词个数,关键词在文档中出现的次数)

原理:
索引中出现次数越少=>相关度越高
常见单词=>相关度低.

创建全文索引语法:

1
ALTER TABLE film_text ADD FULLTEXT INDEX fulltext_article(title,description);

自然语言搜索的查询语法:

1
2
3
4
select film_id,title,Right(description,25)
,Match(title,description) against ('factor casualties') as relevance -- 相关度
FROM film_text
WHERE Match(title,description) against ('factor casualties')

执行计划:
Mysql将搜索词语分为两个独立的关键词进行搜索. (factorycasualties)
搜索对象: titledescription组成的列的全文索引.

7.10.2 布尔全文索引

编写布尔搜索查询时,可以通过一些前缀修饰符来定制搜索.

1
2
3
4
5
6
dino: 包含dino的行rank更高;
~dino: 包含dino的行rank更低;
+dino: 行记录必须包含dino;
-dino: 行记录不能包含dino;
dino*: 以dino开头的行rank更高.
本例中dino作为搜索词可能太短,其实太短的词并不会被全文索引,查询优化器也可能要求搜索词>=ft_min_word_len.

查询语法:

1
2
3
4
5
select film_id,title,right(description,25)
FROM film_text
where match(title,description)
Against('+factory +casualties' In Boolean Mode);
-- title拼接description后必须同时包含factory和casualties

7.10.3 插件和限制

可以加入插件改变:

  1. 分词方式(如C++);
  2. 预处理(如PDF).

Mysql全文索引判断相关性的方法: 词频;
限制:

  1. 全文索引能全部load到内存时,才能快.
  2. 判断相关性只有词频,没有顺序;
  3. 优化器会优先使用全文索引,而不管是否有其他更优索引, 并且全文索引会在其他索引之前使用.
  4. 碎片多.

Sphinx + Mysql + SphinxSE (附录F)

Sphinx: 开源全文搜索引擎. 可将数据源配置为Mysql查询的结果. 可水平扩展.
SphinxSE: Mysql的Sphinx插件,以便支持将Sphinx的搜索结果和Mysql的表进行join.

Sphinx性能:

  1. 查询1GB时间: 10~100ms;
  2. 单CPU处理能力:10~100GB.

分布式搜索工作流程:

  1. 向所有服务器发送远程查询;
  2. 执行本地索引搜索;
  3. 从每个服务器读取部分搜索结果;
  4. 合并结果返回客户端.

架构

  • Indexed
    从不同数据源创建全文索引.作为后台任务,一般定时运行;
  • Searchd
    运行时服务于客户端,查询创建好的索引.

SphinxSE可插拔存储引擎

建表语法:

1
2
3
4
5
6
7
create table search_table
(id int not null
,weight int not null
,query varchar(3070) not null
,group_id int
,INDEX(query)
) engine=Spinx conncet="sphinx://localhost:3312/test"

查询语法:(把查询藏在query里,传递给sphinx)

1
select * from search_table where query='test;mode=all';

Searchd服务返回查询结果,存储引擎把数据转换成mysql表.

ElasticSearch比较:
https://zhuanlan.zhihu.com/p/21334385

决定倒向ElasticSearch社区.XD

高性能Mysql笔记-第1章

第一章 架构与历史

逻辑架构:
客户端
=>连接/线程处理
=>查询缓存/解析器
=>优化器
=>存储引擎.

其中只有innodb引擎会解析SQL中的外键定义.其他引擎不会解析SQL.
所有内容都应该同时关注两个层面的实现: 服务器层和存储引擎层.

1.1.1 连接管理

每个客户端连接都会在服务器进程中拥有一个线程,这个连接的查询只会在这个单独的线程中执行. 服务器会负责缓存线程,因此不需要为每个新连接创建或销毁线程.

1.1.2 优化与执行

首先查询缓存中有没有,如果没有:

Mysql会解析查询,并创建内部数据结构(解析树),然后进行优化,包括重写查询,决定表的读取顺序以及选择合适的索引.

用户可以使用hint指令指导优化.

优化器并不关心底层存储引擎的具体实现,但会向底层存储引擎请求一些数据.

1.2 并发控制

1.2.1 读写锁

读锁: 共享锁,不干扰其他锁.
写锁: 排他锁,排除其他读写锁.

1.2.2 锁粒度

  • 表锁
    开销最小,但并发度低.
    // ALTER TABLE语句都会使用表锁,不管底层引擎是什么.

  • 行锁
    最大程度的并发,但也有最大的锁开销. 由存储引擎各自实现.

1.3 事务

基本流程:

  1. Start transaction;
  2. 执行一组SQL
  3. commit.

事务的4个指标(ACID):

  1. 原子性: 要么全部成功,要么全部回滚.
  2. 一致性: 事务中间状态的修改不会保存到数据库中.(感觉和原子性有重合)
  3. 隔离性: 事务在最终提交之前,对其他事务是不可见的.
  4. 持久性: 一旦事务提交,修改就会永久保存到数据库中.

1.3.1 隔离级别

4种隔离级别(由低到高):
1.Read uncommitted:

未提交读. 有脏读. 能读到别的事务还没commit的修改. 太弱, 基本不用.

2.Read committed (Mysql以外的数据库默认级别)

提交读. 一个事务的修改,提交前对其他事务不可见.
存在不可重复读问题.
比如如下流程:

  1. A事务读V;
  2. B事务提交V;
  3. A事务读V.
    由于B事务中间提交了一次,A事务两次读到的值不一样,也就是不可重复读.

3.Repeatable Read (Mysql默认级别)

可重复读. 可以重复读单条记录. (方法,InnoDb增加多版本并发控制MVCC)
存在幻读问题.
比如如下流程:

  1. A事务读范围行Vs;
  2. B事务插入v到Vs中;
  3. A事务读范围行Vs.
    由于B事务中交提交了一次,A事务两次读到的范围行不一样,也就是幻读.
    (可以用MVCC+间隙锁解决.)

4.Serialable

可串行化. 解决了幻读.
强制事务串行执行. 增加了每一行锁.

总结问题:

  1. 未提交读: 脏读,不可重复读,幻读.
  2. 提交读: 不可重复读,幻读.
  3. 可重复读: 幻读.
  4. 可串行: 慢.

mysql默认是可重复读,解决正确使用的幻读,使用MVCC+间隙锁;
MVCC: 解决两次快照读的幻读; (两次快照读一定一样)
间隙锁: 解决两次当前读的幻读; (两次当前读一定一样)

而,快照读和当前读的结果对于mysql的可重复读级别来说,可能结果不一样。
因此我们定义的正确使用: 程序￿员不应假设”快照读”和”当前读”的结果一样。

快照读: select xxx
当前读: select xxx for update; 或者update/insert语句中隐含的读(基于当前数据库的数据状态)

如果在最高隔离级别(串行)下,事务中的所有读都会被加上间歇锁,
因此保证了快照读和当前读的结果。// 也就是即使程序员不懂这方面知识、不正确使用,也能得到正确的结果。

1.3.2 事务死锁

Innodb:
检查到死锁的循环依赖
=>立即返回错误.

  • 处理死锁:
    将持有最少行级写锁(排他锁)的事务进行回滚.

1.3.3 事务日志

WAL(Write-Ahead Logging) 异步刷盘.

1
2
1. 数据追加写到日志里(顺序IO);
2. 数据修改到原数据里. (随机IO).

第二步如果失败崩溃,可以利用第一步的日志修复.

1.3.4 Mysql中的事务

Mysql事务型引擎: InnoDB, NDB Cluster. (MyIsam不支持事务)
第三方引擎: XtraDB,PBXT
InnoDb支持所有4个隔离级别.

自动提交
Mysql默认使用自动提交. 如果不显式得开始一个事务,每个查询都被当成一个独立的事务.

可能导致强制提交事务的命令:

  1. Alter Table
  2. Lock Tables
  3. 其他导致大量数据更改的语句…

在事务中混合使用存储引擎(不推荐)

  • 服务层: 不管理事务;
  • 存储引擎: 具体实现事务.

混合使用InnoDb,MyISAM: (事务型表+非事务型表)

回滚时,非事务型表上的变更无法撤销, 导致数据库处于不一致状态.

隐式和显式锁定
InnoDb采用两阶段锁定协议.

1
2
加锁: 事务执行过程中随时加锁;
解锁: 仅当Commit或RollBack时.

隐式加锁: 存储引擎自动加锁
显式加锁: 明确指定语句: Select ... Lock In Share Mode

  • tip

    尽量不要显式加锁,而是交给Innodb实现. 以避免无法预料的错误.
    (除非研究得很深了.)

1.4 多版本并发控制 MVCC

MVCC: 行级锁的一个变种. 在很多情况下避免了加锁操作, 开销更低.
大多实现了非阻塞的读操作, 写操作只锁定必要的行.

  • 实现
    每个事务开始的时候保存数据在某个时间点的快照.

InnoDb的MVCC(空间换时间,少加锁)

每行记录后保存两个隐藏列:

  1. 行的创建系统版本号;
  2. 行的删除系统版本号.

系统版本号反映了时间.
每开始一个新的事务,系统版本号都会递增.
事务开始时刻的系统版本号会作为事务的版本号,
用来和查询到的每行记录的版本号比较.

##Innodb的可重复读级别下
Select

1
2
1. 创建版本号<=当前事务版本的数据;
2. 删除版本号>当前事务版本的数据 (或者没有删除).

Insert

1
1. 插入新行,创建版本号=当前事务版本号.

Delete(标记删)

1
2
1. 删除的行,删除版本号=当前事务版本号.
(打标记,而不是真的删)

Update(标记删+插入新)

1
2
1. 插入新行,创建版本号=当前事务版本号.
2. 原来的行,删除版本号=当前事务版本号.

底层原理

实际实现参见: https://liuzhengyang.github.io/2017/04/18/innodb-mvcc/
是将旧版本的行存在undo log中,如果没有依赖这部分undo log的事务结束了(提交或者回滚),
这部分undo log是会销毁的,因此不会造成永久的存储负担。

MVCC与隔离级别:

  1. 读未提交: 不使用MVCC;
  2. 读已提交: 使用MVCC; // 可能版本号条件不同
  3. 可重复读: 使用MVCC+当前读间隙锁; // 解决正确使用情况下的幻读
  4. 可串行读: MVCC+快照读和当前读都有间隙锁. // 解决所有情况下的幻读

MVCC相当于乐观锁或者无锁、空间换时间;
间隙锁相当于悲观锁。真实地锁了索引。

1.5 Mysql的存储引擎

  • 每个数据库: 一个目录

  • 每个数据表:

  1. 元数据(表定义): 表同名的.frm文件; (服务层统一实现)
  2. 数据; (存储引擎分别实现)
  3. 索引. (存储引擎分别实现)

元数据(表定义具体内容)

表的相关属性:

  • Name
  • Engine
  • Row_format:
  1. Dynamic: 行长度可变,包含Varchar或Blob的行.
  2. Fixed: 只包含固定长度列.
  3. Compressed: 压缩表.
  • Rows: 行数. Innodb:估算值. MyISAM: 精确值;
  • Index_length: 索引长度
  • Collcation: 默认字符集和字符排序规则.
  • 其他信息…

1.5.1 InnoDB存储引擎

设计目标: 大量短期事务.
Mysql主推引擎.

存储
数据: 表空间中,一系列数据文件组成.
索引: 表空间中,独立的索引文件.

隔离级别的实现:

  1. MVCC;
  2. 间隙锁: 不仅锁定查询涉及的行,还锁定索引中的间隙,防止幻读.

存储格式:(跨平台)

  1. 主键: 指向物理地址;
  2. 二级索引: 非主键索引,指向主键,所以主键应尽可能小.

MyISam

存储

  1. 数据: .MYD
  2. 索引: .MYI

变长行: 默认最大数据量256TB,因为指向数据记录的指针长度是6B.
可以修改Max_Rows和Avg_row_length的值,来改变指针长度.
(两者相乘=表最大容量)

并发
整张表加锁.

索引
支持全文索引,基于分词创建的索引.

压缩表
不再修改的表可以进行压缩,提高查询性能,减少磁盘空间.

1.5.3 其他引擎

Archive: 只支持插入和查询,不支持修改和删除.
BlackHole: 丢弃所有插入的数据,只记录日志. 有很多问题.
CSV: 存储CSV文件. (这个场景应该考虑使用Sql Server)
Federated: 访问其他Mysql服务器的代理,默认禁用.

  • Memory: 磁盘只保存结构,数据在内存,重启后丢失数据.
    使用场景:
  1. 查找或映射表;// 邮编和州名的映射表;
  2. 缓存周期性聚合数据;
  3. 保存数据分析中产生的中间数据.

限制:

  1. 表级锁;
  2. 只支持定长列, 如果指定了varchar,会转换成char.

Merge引擎:
多个MyIsam表合并的虚拟表;
引入分区功能后弃用.

NDB引擎:
Mysql集群版.

1.5.5 选择合适的引擎

场景:

  1. 全文索引: InnoDB + Sphinx
  2. 在线热备份: InnoDB
  3. 记录日志: MyISAM,Archive
  4. 10TB以下: InnoDB
  5. 10TB以上: InfoBright.
  6. 大部分情况优先考虑: Innodb

Netty in action笔记-第1-2章

Hadoop中用了Netty3,所以得看看这一块.
中文版的代码链接:
https://github.com/ReactivePlatform/netty-in-action-cn

第一章 Netty介绍

这一章主要介绍了一下Netty.
Netty封装了Java NIO中的一些复杂的细节和坑.

1.1 为什么使用Netty

  1. Netty提供了高层次抽象来简化TCP/UDP服务器的编程.
    用Netty可以实现FTP,SMTP,HTTP,WebSocket,SPDY.
  2. Netty社区很活跃.

1.1.2 Netty框架的组成

1.2 异步设计

Netty中主要使用了回调+Future.

1.2.1 回调

异步处理的一种技术是回调. 就是在一些关心的事件上注册回调函数.
// 个人理解,回调应该是同步非阻塞.
// 同步: 客户端等待注册完成;
// 非阻塞: 服务端只是注册事件,返回得很快.

1.2.2 Future

java.util.concurrent包中附带的Future接口.使用Executor异步执行.
每传递一个Runnable对象到ExecutorService.submit()方法就会得到一个回调的Future,能使用它检测是否执行完成.
// 个人理解,Future其实是异步阻塞.
// 异步: 客户端不等待服务端执行结束,拿到Future后,需要自己轮询结果.
// 阻塞: 服务端不会主动回调,只是提供一个查询接口.

1.4 Netty相比NIO优点

  1. 兼容性和跨平台性进一步提高;
  2. 扩展ByteBuffer.
    ByteBuffer允许包装一个byte[]来获得一个实例,可以尽量减少内存拷贝.
  3. 消除NIO的内存泄漏(jdk1.7以上)
    NIO对于缓存区的聚合和分散操作可能造成内存泄漏.
  • 分散(Scatter)

    ScatteringByteBuffer中的数据分散到多个ByteBuffer中.

  • 聚合(Gather)

    将多个ByteBuffer的数据聚合到GatheringByteChannel中.

  1. 解决epoll缺陷导致的100%cpu问题.

第二章 Netty核心概念,简单示例

这章用的是Netty4.(但愿和Netty3同理)
上来先整了几段代码,而没有先说概念.
代码如下:
https://github.com/xiaoyue26/netty-in-action-cn/blob/ChineseVersion/chapter2/Server/src/main/java/nia/chapter2/echoserver/EchoServer.java

可以看出服务端和客户端代码很类似,大致套路是:

  1. 创建一个EventLoopGroup;//类似于召集一群干活的(线程池)
  2. 创建一个Bootstrap(ServerBootstrap);// 类似于管家/控制面板
  3. Bootstrap配置上eventGroup,channel用的类,端口地址,处理链.
  4. 绑定到端口开始工作.

值得注意的是,所有涉及到Future的方法都是异步的,可以通过主动调用sync方法来进行同步等待.(当然也可以轮询)

  • 设计思想
  1. 在Bootstrap上使用Future;
  2. 在处理链上使用回调.
  • 具体细节
  1. ChannelInboundHandlerAdapter:
    处理完消息后需要释放资源;(ByteBuf.release())
  2. SimpleChannelInboundHandler:
    完成channelRead0后自动释放消息.

java并发编程的艺术笔记-第二章

第二章 java并发机制的底层实现原理

  • java代码执行流程
  1. java代码编译成class文件(字节码);
  2. class被类加载器加载到JVM中;
  3. JVM执行class,生成汇编码;
  4. 汇编码(转化成机器码/cpu指令)在cpu上执行.

因此java并发机制的底层实现依赖于两个层面:

  1. class=>汇编码过程中增加的指令;
  2. 并发相关cpu指令的具体执行过程.

2.1 volatile

  • 定义
    可见性. 对于volatile变量,所有线程看到的值是一致的.
    换句话说,某个线程对于volatile的修改能立即生效.

  • 实现

  1. class=>指令: 增加Lock指令
  2. Lock指令具体执行:
    (1) 将当前cpu包含该值的缓存行写回内存;
    (2) 在总线上通知其他cpu这个地址已发生更改,需要刷新缓存.
    (缓存一致性协议)
  • 相关优化
    由于上述实现中的2(1)为:”将当前cpu包含该值的缓存行写回内存”,
    换句话说,如果这个值跨行了,就会影响两行的数据,也就会导致两行的缓存失效,
    其他cpu刷新缓存的数据量变成两倍.
    因此尽量要把数据对齐到一行. (比如32位,64位)

2.2 synchronized

内置锁,锁某个对象.(可以是当前实例对象或当前类对象)

  • 实现
  1. class=>指令:
    增加:
    进入同步块: monitorenter
    离开同步块: monitorexit (正常离开或者异常)

  2. cpu对这俩指令的执行书里没有细讲,只说了对象头里相关数据是怎么存的.

2.2.1 对象头中锁相关数据

  • 对象头内容
  1. Mark Word: hashCode/分代年龄/锁信息.
  2. 类元数据地址;
  3. 数组长度. // 如果是数组
  • 不同锁标志的信息
  1. 轻量级锁: 指向栈中锁记录的指针; // 锁标志00
  2. 重量级锁: 指向互斥量的指针 ; // 锁标志10
  3. GC标记 : 空; // 锁标志11
  4. 偏向锁 : 线程ID,epoch,分代年龄,1; // 锁标志01

锁的4种状态: (锁只能升级,不能降级)

  1. 无锁;
  2. 偏向锁: 一个线程使用该对象;
  3. 轻量级锁: 多个线程交替使用该对象;
  4. 重量级锁: 多个线程同时竞争该对象.

偏向锁
HotSpot作者:
大多数情况下不存在多个线程竞争一个对象,这个时候可以优化让线程获得锁的代价更低.

  • 获取偏向锁流程
  1. 检查对象头里线程ID是不是自己或者是否无锁状态;
  2. 复制对象头中Mark Word到栈中;
  3. 在副本上写线程ID为自己ID;
  4. CAS,用副本替换Mark Word,获得偏向锁.

如果成功的话,下次进入同步块的时候,只要第1步能成功,就不再需要CAS操作了.
换句话说,这种场景下, 同一个线程可以重入同一个对象的锁,只有第一次需要CAS操作(代价比较大的操作).

  • 偏向锁的撤销(也就是对象头中存储的线程ID改掉)
  1. 别的线程也申请这个锁;
  2. 之前拥有锁的线程不存活=> 对象头设置成无锁;
  3. 之前拥有锁的线程存活 => 锁升级.

轻量级锁

  • 获取轻量级锁流程
  1. 检查
  2. 复制对象头中Mark word到栈中;
  3. 副本中写指向自己锁记录的指针;
  4. CAS,用副本替换Mark word,获得偏向锁.

如果第4步失败,尝试先不阻塞,使用自旋获取锁.(有可能已经拥有这个锁,试试看)
如果又失败,膨胀(升级)为重量级锁.

  • 轻量级锁的解锁
  1. CAS还原复制的对象头.

如果成功,就解锁;
如果失败,说明除了自己还有别人也改过对象头.膨胀为重量级锁.

对比:

  1. 偏向锁: 打个自己的标记;
  2. 轻量级锁: 不阻塞,自旋重试;
  3. 重量级锁: 阻塞,等待唤醒.

2.3 原子操作实现原理

  1. 总线锁: 某个cpu用Lock指令锁总线,独占内存; // 开销大
  2. 缓存锁: 某个cpu修改内存地址,使其他cpu缓存无效.//开销小

谷歌的一致性哈希算法

jump consistent hash

jump consistent hash是谷歌发表的一种一致性哈希算法.
空间复杂度: O(1);
时间复杂度: O(lgn).

设计目标:

1.平衡性,把对象均匀地分布在所有桶中。(这个大部分哈希算法都能做到)
2.单调性,当桶的数量变化时,只需要把最少量的对象从旧桶移动到新桶,不需要做更多移动。比如原来是10个桶,增加了10个桶,只需要移动一半的对象就好了.
(更改算法的输入参数n,会有一半的对象依然映射到原来的桶里,有一半的对象映射到新的桶里.)

问题分析

ch(key,num_buckets) 为桶数量为num_buckets时的hash函数,返回分配的桶下标。

  • num_buckets=1时:
    由于只有1个桶,显而易见,对任意k,有ch(k,1)==0

  • num_buckets=2时:
    为了使hash的结果保持均匀,ch(k,2)的结果应该有占比1/2的结果保持为0,有1/2跳变为1。

由此可以归纳,一般规律是:

num_bucketsn变化到n+1后,
ch(k,n+1) 的结果中,应该有占比 n/(n+1) 的结果保持不变,
而有 1/(n+1) 跳变为 n+1

因此,我们可以用一个随机数生成器,来决定每次要不要跳变,并且让这个随机数生成器的状态仅仅依赖于key。所以就得到下面这个初步代码:

1
2
3
4
5
6
7
8
9
10
int ch(int key, int num_buckets) {
random.seed(key) ;
int b = 0;
for (int j = 1; j < num_buckets; j++) {
if (random.next() < 1.0/(j+1) ) {
b = j ;
}
}
return b;
}

算法正确性

(0-based)
n个桶,从0开始往n-2跳变(n-2次),每次跳变的概率依次是1/2,1/3,1/41/n
从而保证keys在每个桶分布的概率均匀。

1
2
3
4
5
6
7
P(b=0(初始值)) =每次都不变=1/2*2/3*3/4...(n-2)/(n-1)*(n-1)/n=1/n;
P(b=1) =1/2(变)*2/3(不变)*....=1/n;
P(b=2) =1(变或者不变)*1/3(变)*3/4*...(n-1)/n=1/n;
P(b=3) =1(变或者不变)*1(变或者不变)*1/4*4/5...*(n-1)/n=1/n;
P(b=k) =1*1....1/k(变)*k/(k+1)...(n-1)/n=1/n;
P(b=n-2) =1/n;
P(b=n-1) =1- (上述所有的和)=1-(n-1)/(n-2)=1/n.

算法复杂度

n为桶的数量的话,进行n-1次跳变判断,算法复杂度是O(n).

改进

改进的思路

通过随机数,确定下一个跳变的j,而不是对每一个位置进行跳变判断。
因为跳变的概率从1/2开始一直在减少,所以依概率来说每次跳变的间隔大于1,所以计算下一个跳变值的次数少于n-1.

定义P(b,j>=i)的含义为: 当前跳变值为b时,下一个跳变值为j,j>=i的概率.

假设我们使用0-base的数组,则下标i位置不变的概率为: (i+1)/(i+2)
下一个跳变值j>=i时,也就是[b+1,i-2]区间内保持不变,因此有:

1
2
3
4
5
P(b,j>=b)=1 //(显然下一个跳变值>b)
P(b,j>=b+1)=1 //(显然下一个跳变值>=b+1)
P(b,j>=b+2)=1*(b+1/b+2)
...
P(b,j>=i)= 1*(b+1/b+2)*(b+2)/(b+3)...(i-1)/i = (b+1)/i

假设有一个在[0,1]区间均匀分布的随机变量R,由于均匀分布的特性,R < k的概率为 k.
(例如R< 0.3的概率为0.3),P(R<(b+1)/i)= (b+1)/i = P(j>=i);

因此可以生成一个[0,1]范围的随机数r,规定r<(b+1)/i的时候,就有j>=i;
因此 i<(b+1)/r. 由于对于任意的i都有j>=i,因此:

1
j=floor( (b+1)/r),

这样我们用一个随机数r得到了j。

改进后代码如下:

1
2
3
4
5
6
7
8
9
10
11
int ch(int key, int num_buckets) {
random.seed(key) ;
int b = -1; // 上次跳变值
int j = 0; // 这次跳变值
while(j<num_buckets){// j不能超出范围
b=j;
double r=random.next(); // 0<r<1.0
j = floor( (b+1) /r);
}
return b;
}

其中r为[0,1]区间的随机数(均匀分布)。
算法中使用了一个64位的线性同余随机数生成器。
结果分布的均匀性与输入key的分布无关,由伪随机数生成器的均匀性保证。
由于用的是伪随机数,生成器由key进行seed,因此能保证一致性.

时间复杂度

根据概率计算.
由于r平均为0.5,因此j平均来说是成倍增长的,因此改进后算法的平均时间复杂度为:O(log(n))

java并发编程实战-15-16章

第十五章 原子变量与非阻塞同步机制

并发包里的工具(如信号量SemaphoreConcurrentLinkedQueue)
synchronized性能更好,伸缩性更好.
原因是使用了原子变量与非阻塞同步机制.

非阻塞的底层: CAS操作. (compare and swap)
没有锁,Lock free,因此更接近完美.
因此没有活跃性问题.

15.1 锁的劣势

性能不行.

15.2 硬件对并发的支持

独占锁: 基于悲观假设,不互斥的话会出事.(悲观锁)
乐观方法: 特殊指令: 包括, Test And Set , Fetch and Increment, CAS, 条件存储.

15.2 CAS 比较并交换

CAS包括三个操作数: 内存位置,旧值,新值.
//类似对应java中mapreplace操作的三个操作数: key,旧值,新值.

CAS操作失败的线程不会挂,会获得失败信息.(同步非阻塞)
可以反复重试. (类似于poll的轮询)

CAS的缺点:
需要调用者自己处理竞争问题.// 重试,回退,放弃.
CAS实现的非阻塞算法通常比用锁写复杂一些.

15.3 原子变量类

包括AtomicInteger等等.提供各种原子操作,如CAS,自增等.
它和一样保证写后读,但还保证内容是原子更新的.(比如两块内存看似一起更新)

  • tip:

    原子变量类不适合作为容器的Key.
    容器的Key一般要是Immutable对象.(如Integer,String)

15.4 具体案例

这一节用CAS实现了链表,栈,原子的域更新器.
需要看着源码学习.

15.4.4 ABA问题

CAS(V,A,B)需要判断V位置是否为A,然后替换为B.
但如果V位置发生变化: A->B->A.
这样其实和我们希望的条件已经不同了,本质上是变化了,只不过值没变,版本号变了.
实例来说,就是链表节点引用没变,引用的值已经变了.

解决方案:
使用AtomicStampedReference(以及AtomicMarkableReference)支持在两个变量上执行原子的条件更新.

  • AtomicStampedReference
    [对象,引用]二元组,在引用上加上版本号.

  • AtomicMarkableReference
    [对象引用,布尔值]二元组,可标记节点为已删除的节点.

第十六章 java内存模型

16.1 概念

为了性能,会进行指令重排,预测执行等等.
内存模型: JMM.
内存模型规定,优化(重排)时应该遵守哪些约束.

  • 串行一致性:
    如果约束够用,执行结果就会和串行一样.

16.1.1 平台的内存模型

平台给出内存模型,约束自己,向外界保证xx条件下会发生什么.
提供接口/指令: 如内存栅栏,提供协调存储的接口.

16.1.3 JAVA内存模型

  1. 程序顺序: 同一个线程里按源码顺序执行;
  2. 监视器锁: 解锁会在加锁之前;(A线程释放了,B线程才能获得锁)
  3. volatile: 写后读. (原子变量也是)
  4. 线程启动: Thread.start之后才会有run等其他操作发生.
  5. 中断规则: (1)A线程中断B线程;(2)B检测到中断. 保证(1)在(2)前面.
  6. 终结器: 构造函数在终结器之前执行完成.
  7. 传递性: 上述规则可以传递.

1是针对单一线程的,2-7都是针对多个线程之间的代码顺序.
口诀: 程锁原线中终传.

16.1.4 借助同步

也就是借助上述7个已有的约束设计代码,达到同步效果.
比如我们平时用的锁,就是利用用了第二条,监视器锁规则.

类库中提供的约束:

  1. 线程安全容器: 写后读.
  2. CountDownLatch: 倒数操作在await返回之前.
  3. Semaphore: PV操作,V释放在P获得之前. (别人释放了,你才能获得)
  4. Future: get返回之前,任务的代码会执行完.
  5. Executor: 提交任务操作,将在执行任务操作之前.
  6. CyclicBarrier: (1)线程A到达栅栏;(2)其他线程离开栅栏. (1)会在(2)前面.换句话说,大家伙会等线程A到了才离开.

16.2 发布

错误示例:

1
2
3
4
5
6
7
8
9
10
@NotThreadSafe
public class A{
private static Resource resource;
public static Resource getInstance(){
if(resource==null){// 显然多个线程会在这里冲突.
resource=new Resource();
}
return resource;
}
}

安全但是慢:

1
2
3
4
5
6
7
8
9
10
@ThreadSafe
public class B{
private static Resource resource;
public synchronized static Resource getInstance(){// 方法级同步
if(resource==null){
resource=new Resource();
}
return resource;
}
}

直接静态初始化:

1
2
3
4
5
6
7
@ThreadSafe
public class C{
private static Resource resource=new Resource();
public static Resource getInstance(){
return resource;
}
}

占位符技术: 延迟静态初始化:

1
2
3
4
5
6
7
8
9
@ThreadSafe
public class D{
private static class ResourceHolder{ // 懒汉,完美.
public static Resource resource=new Resource();
}
public static Resource getInstance(){
return ResourceHolder.resource;
}
}

双检(DCL): 不推荐. 慢,繁琐.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@ThreadSafe
public class E{
private volatile static Resource resource; // volatile
public static Resource getInstance(){
if(resource==null){
synchronized(E.class){ // Class对象锁.
if(resource==null){
resource=new Resource();
}
}
}
return resource;
}
}
// 之所以需要volatile,是为了保证resource变量的写操作立即刷新到内存,起码在读之前. (StoreLoad屏障)

初始化安全域:

对象的初始引用不会被重排序到构造函数之前.

  • final:
    final域能够安全发布,通过final域可达的变量\容器的写入操作安全发布.

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
@ThreadSafe
public class SafeStates{
private final Map<String,String>States;
public SafeStates(){
states= new HashMap<>();
states.put("alaska","AK");
//...
}

public String getV(String s){
return states.get(s);
}
}

线程A创建的SafeStates对象,线程B能安全得访问getV方法.

java并发编程实战笔记-10-11章

第十章主要介绍活跃性危险,也就是安全性和活跃性的权衡.锁不多不少.
安全性: 就是正确性.锁要够多,不然数据并发访问就错了.
活跃性: 就是性能. 锁不能太多,死锁了,或者太卡了.

10.1 死锁

哲学家问题: 循环等待死锁.
数据库服务器如何解决事务死锁:

  1. 检测到等待关系有向图有环;
  2. 选一个牺牲者kill了.
  3. 应用程序自己重试被kill的事务.

10.1.1 锁顺序死锁

避免死锁的手段1:

控制获取锁的顺序.

如果所有线程获取锁(资源)的顺序一样,就不会死锁.

10.1.2 动态的锁顺序死锁

锁顺序很难处理. 比如两个账户转账.
一种方法是使用俩账户的hash码,比较顺序.
当遇到hash码冲突的时候,使用加时赛:

  1. 定义一个新的锁;
  2. 冲突的线程轮流申请这个锁,然后再申请账户锁.

相当于冲突的线程串行执行.

(其实我觉得对于这个问题,账户肯定有唯一id,用id排序就好了.)

10.1.3 协作对象之间的死锁

如果获取多个锁的操作不在唯一的同一个方法里, 问题变得麻烦.
成因:
方法1: 持有锁时调用外部方法,而外部方法请求了另一个锁.
方法2: 同上.

方法1,方法2获取锁的顺序相反时,可能死锁.

解决方法:
收缩synchronized的范围从方法级缩小到块级.

10.1.4 开放调用

开放调用: 调用方法时不需要持有锁.
也就是方法上没有加synchronized,而是在里头某一个块里用了.

10.1.5 资源死锁

资源不够死锁. 如线程资源\数据库连接资源.

  • 解决方案:
    书里没写, 我觉得可以考虑前文说的”选取牺牲者”方法.

10.2 死锁的避免与诊断

  1. 每个线程只获取一个锁. //比较不现实,涉及原子性的实现.
  2. 顺序获取.
  3. 支持定时的锁. // 代替内置锁

10.2.2 死锁的诊断

通过Thread Dump信息.
方法: 向JVM发送SIGQUIT信号.
命令行:

1
kill -3 [id]

jdk1.5: 有内置锁信息, 无显式锁信息;
jdk1.6: 有内置锁信息, 少量显式锁信息.
(?那岂不是意味着尽量不要使用显式锁?)

10.3 其他活跃性危险

活跃性危险: 太卡
包括: 死锁,饥饿,丢失信号,活锁.

10.3.1 饥饿

如短作业优先调度的时候,长作业就会饥饿.

10.3.2 糟糕的响应性

GUI线程优先级太低, 响应性就会差.

10.3.3 活锁

重复的失败.
如线程不停得获取锁,释放锁,似乎能完成,但其实总也完成不了,无间地狱,死循环.

  • 案例1: 华龙道
    发数据包碰撞. 重试算法一样,总是在相同的路口再碰撞.
    解决方法:
    增加随机性. 重试时间加一个随机参数.
  • 案例2:
    过度的错误恢复.
    解决方法:
    不过度.

第十一章 性能与可伸缩性

线程的使用
目的: 提高性能.
缺陷: 提高复杂性.
场景: 多cpu系统,任务不是cpu密集的.
衡量手段: 看cpu使用情况图.

11.1.1 可伸缩性

可伸缩性: 当增加资源时(cpu,内存等等),程序吞吐量/处理能力增加比例适中.

可伸缩性与性能往往矛盾.
单线程的性能优化方法 往往导致=> 可伸缩性下降.

11.2 Amdahl定律

增加计算资源时,程序理论上能实现的最高加速比.

Speedup <= 1/ ( F + (1-F)/N )
N: N个处理器
F: 必须串行的任务比例.

N趋近于无穷大时,加速比为 1/F.
(F为50%时,加速比最大为2.)

提高可伸缩性方法总结:

  1. 缩小锁的粒度;
  2. 减少锁的持有时间(好像和上一条差不多);
  3. 尽量使用非独占锁.

java并发编程实战笔记-8章

线程池的使用

这章介绍实际应用中配置调优的一些高级选项. 以及各种坑.

8.1 任务与执行策略的隐形耦合

有些任务需要指定执行策略:

  1. 依赖性任务: 就是任务之间不独立
  2. 线程不封闭的: 就是只能单线程跑的.
  3. 对响应时间敏感的: 如GUI.
  4. 使用ThreadLocal的: 线程池会重用线程. 因此可能有风险.

8.1.1 死锁

有界线程池不能无限提交. 如果里头的任务都死锁了,线程池也死锁了.

8.1.2 响应时间

如果任务都很慢,线程池的响应时间自然也慢.
可以限时或者增大线程池容量.

8.2 线程池大小公式

N = cpu数量 = Runtime.getRuntime().availableProcessors();
U = 目标cpu利用率
W/C= 等待时间和执行时间的比率 (响应度)
SIZE = N*U/(1+W/C)

8.3 配置ThreadPoolExecutor

可以通过Executors获取jdk设计好的一些线程池实现.

8.3.1 线程的创建与取消

基本大小: 没有任务时候的线程大小.
最大大小: 上限.
存活时间: 线程空闲时间达到存活时间,则被回收.

8.3.2 管理队列任务

线程池满了以后,提交的任务进入等待队列.
newFixedThreadPool: 无界等待队列 LinkedBlockingQueue
newSingleThreadExecutor: 无界等待队列 LinkedBlockingQueue

有界等待队列的话,需要饱和策略.

8.3.3 饱和策略

  1. 中止(默认): abort. 抛异常.
  2. 调用者运行: 让主线程自己干. 拥塞会外延到TCP层.
  3. 丢弃: 抛弃该任务. 不抛异常.
  4. 丢弃最老: 丢弃下一个将要执行的.(如果用了优先级队列,就是抛弃优先级最高的,会造成错误.)

8.4 扩展ThreadPoolExecutor

需要实际需求和应用案例才能学会.

8.5 递归算法的并行化

首先循环可以并行化:

1
2
3
4
5
6
7
for(final Ele e: eles){
exec.execute(
new Runnable(){
public void run(){process(e);}
}
);
}

递归也一样, 遍历依然是递归的, 但把每一个节点的计算收集到线程池中,异步计算.

1
2
3
4
dfs(node,exec,results){
exec.execute(...);
dfs(node.children());
}

第九章是图形界面,略过.

hive调优之数据倾斜

上一篇中记录了hive调优的一些常规手段. 但对于某些数据集, 常规手段是无能为力的, 例如数据倾斜时.

对于hive而言,数据倾斜就是某个reducer跑得特别慢,这一点可以从日志中reducer开在99%或某个值很久看出,也可以从web ui中查看:

1
2
3
4
运行后:
http://xxx:19888/jobhistory/tasks/job_1472710912354_3070682/r
运行前:
http://xxx:8088/proxy/application_1472710912354_3070684/mapreduce/tasks/job_1472710912354_3070684/r

如果具体看日志的话,会发现大部分时间在进行外排.
对于这种任务最重要的是消除外排,有如下几种优化手段:

1. 加内存

最简单粗暴就是给reduce加内存了. 让它别外排:

1
set mapreduce.reduce.memory.mb=10240;

类似的,如果mapper内存不够,可以减小每个mapper处理的数据量,增大mapper的数量:

1
set mapreduce.input.fileinputformat.split.maxsize=64000000;

2. 倾斜key单独处理

第二种手段也比较简单, 就是把出现倾斜的key找出来,假如很少的话,可以把它们摘出来,单独处理(或遗弃). 开启hive自动消除数据倾斜:(效果有效)

1
2
3
4
set hive.optimize.skewjoin = true;
set hive.skewjoin.key=1000;
set hive.groupby.skewindata=true;
set hive.groupby.mapaggr.checkinterval=100;

3. 局部聚合(1): 相同value聚合

(没有什么优化是增加一个阶段不能解决的.如果有,就再加一个阶段)
为了减少最后汇聚到reducer上的数据量,可以在之前增加一个阶段,对某个key的数据进行局部聚合.

以某次需求为例,需要求各个省市区维度下的丢包率\延迟的50,90,99分位数.数据量每天200G. 分位数计算极其耗时, 尤其是计算周统计数据时, 数据量达到TB级.

在使用了前一篇优化笔记手段以及上述手段后,依然耗时4小时.原来查询最耗时的部分如下:

1
2
3
4
5
6
7
select  es
,province
,city
,ipOprator
,percentile(xxx,0.5,0.9,0.99)
FROM ttt
GROUP BY es,province,ipOprator with cube

查看hive的percentile源码实现,其对于同一个key的处理逻辑大致这么几步:

1
2
3
4
5
6
7
(依次输入每个value)
1. O(n)
把所有value放进一个Map<value,LongWriteable>里,相同的value则增加map中的计数值;
2. O(nlogn)
在reduce中把map中的所有entry放入一个List中,然后对List根据value值进行全排序(`Collections.sort(entriesList, new MyComparator());`);
3. O(n)
从头开始扫一遍上一步的List, 根据计数器的值总和,分位数,定位到对应的分位数,返回.

将其重写为可以进行局部聚合, 从而略去第一步:

1
2
3
4
5
6
7
8
9
10
select  es
,province
,city
,ipOprator
,percent_new(c1,num,0.5,0.9,0.99)
FROM (select es,province,ipOprator,c1,count(1) as num
FROM xxx
GROUP BY es,province,ipOprator,c1
) as t
GROUP BY es,province,ipOprator with cube

优化后,时间缩短到30分钟.

  • TODO:
    优化第二步中的全排序.

4. 局部聚合2: 相同key聚合

由于上一案例中的聚合函数是分位数计算,聚合的粒度只能达到相同value聚合,对于其他聚合函数,如最大值,最小值等,如果语义上能对相同key先聚合,问题的规模就可以进一步缩小. 方法是先把相同key的数据分拆成不同的key,加上前缀或后缀 如:

1
2
3
4
key -> key_1
key -> key_2
...
key -> key_10

分拆的数量等于并行度,取决于原有的数据集, 然后先进行一阶段聚合,最后去掉前缀后缀,再进行一次聚合得到最后的结果.
这种方法的关键就是要求同一个key的聚合计算可以分拆.