粗俗理解clickhouse

what: clickhouse是啥?

clickhouse是俄罗斯开源的一个用于OLAP分析的核心引擎,它可以基于海量的日志数据接受类sql查询,以秒~分钟量级的延迟返回查询结果。
它目前应用在了俄罗斯的搜索引擎Yandex.Metrica中、欧洲核子研究中心: PB级存储、统计分析查询,以及我国各大互联网公司的BI后台引擎中。

应用: Yandex.Metrica

2014年: 每天120亿个事件。(点击、浏览)
374台服务器,20.3万亿行数据。
压缩后: 2PB
压缩前: 17PB

详细介绍官网:
https://clickhouse.yandex/docs/zh/
开源代码:
https://github.com/yandex/ClickHouse
中文文档:
https://github.com/yandex/ClickHouse/tree/master/docs/zh

why: 为啥选择clickhouse?

主要原因有: 性能高、跑分高、功能多、可用性高。

性能高、跑分高

// 俄罗斯的程序员在算法方面的活跃度排名世界第一
// C++实现、老毛子轻易不开源,参见nginx

摘自知乎: https://zhuanlan.zhihu.com/p/22165241
1亿数据:
比Vertica快5倍,比Hive快279倍,比Mysql快801倍;
10亿数据:
比Vertica快5倍,mysql无法完成。

单机性能

有page cache: 2-10GB/s(未压缩),上限30GB/s
无page cache: 1.2G/s(压缩率3)
(磁盘400MB/s)如果是10B的列,就是1-2亿行/s。

功能多

最重要的是有AggregatingMergeTree表引擎,专门优化了三个数据分析最实用的查询:(海量数据快速计算)

1
2
3
uniq: 计算uv
any: 抽样统计
quantiles: 分位数

上述几个功能如果用sparkSql/hive,一般耗时都是15分钟以上。(甚至到半小时、1小时)
如果用mysql的话,则由于维度爆炸的问题可能存不下这么多数据,并且无法灵活新增维度。
clickhouse对于海量数据处理没有spark/hive那么灵活,但是特化了OLAP的即时查询性能,本质上是处在不同领域的工具。
从数据仓库的角度来看:
ods层: 用spark/hive进行ETL后产生;
dw层: ods载入clickhouse后直接产生预聚合的数仓,支持即时查询;
dm层: mysql

对比

hbase/ES:一般用来支持海量数据点查询;
mysql: 用来支持无聚合的点查询;
clickhouse: 用来支持海量数据的聚合查询。
kylin: 比较接近clickhouse,底层是hbase+星型模型

其他引擎:

ReplcingMergeTree

删除相同主键的重复项(去重)

SummingMergeTree

将一个part中,相同主键的所有行聚合成一行,包含一系列聚合函数状态。

CollapsingMergeTree

提供折叠行功能: 把同主键的数据行去重到最多两行。(再次强调所有聚合都在part内)
场景: 用户访问状态记录、频繁变化的数据

前面说的clickhouse不支持update数据,所以用这个引擎可以近似达到一部分update的效果。
本质上就是类似于git的revert、银行系统里的冲正、mysql的MVCC。

比如我们要记录用户访问情况,先插入一条:
userid_0,5,146,1 表示0号用户访问了5个页面,停留146秒(最后一列的1暂时忽略)。
然后过了一会儿想改成它访问了6个页面,停留185秒,那就插入:

1
2
userid_0,5,146,-1
userid_0,6,185,1

首先把原来的取消掉,标记列-1。然后插入最新的状态数据,标记列1.

应当注意这些成对的1,-1会异步地被删除,所以不能查到状态变化历史,仅用于查最新。

这种引擎的建表语句:

1
2
3
4
5
6
7
8
9
CREATE TABLE UAct
(
UserID UInt64,
PageViews UInt8,
Duration UInt8,
Sign Int8
)
ENGINE = CollapsingMergeTree(Sign)
ORDER BY UserID

查询的时候的语法:

1
2
3
4
5
6
7
SELECT
UserID,
sum(PageViews * Sign) AS PageViews,
sum(Duration * Sign) AS Duration
FROM UAct
GROUP BY UserID
HAVING sum(Sign) > 0

CollapsingMergeTree要求插入的顺序不能乱来,要按状态的变化顺序。
如果顺序无法保证,可以使用VersionedCollapsingMergeTree,它的算法也很简单,就是要求用户多传一个version字段。

GraphiteMergeTree

直接接到日志收集。
可以存metrics指标可视化系统Graphite的rollup数据。
如果不rollup,可以用别的引擎。

Log系列的引擎(非主打)

用于小数据量(< 100w)的表。
包括: StripeLog,Log,TinyLog三个引擎。

特性:

  • 追加写,不支持改
  • 不支持索引
  • 非原子写入(可能有损坏的数据)

TinyLog:最简单的表引擎,适合一次写入即终身、多次查询的小数据,不支持并发数据访问,不支持同时写入同时读取。
Log:比TinyLog多一个偏移量优化.
Memory:以直接形式存储在内存中,读写变态快,但是记住是临时的,关机数据消失。
Buffer:缓冲,可以理解为将数据存储在内存中,然后达到一定阈值限制条件,那么先前的数据会自动写入设定的表格中。这样可以将部分热数据放在内存中缓存,快速访问和读取,而时间较为久远的数据写入表中释放内存,应该比较好理解。(可以实时盯数据)
External data:从字面理解,就是可以将文件数据等引入query语句中利用了。比如你想查找一些在你所给的名单中的用户的消费数据,那么你可以免除复制粘贴,直接将这个名单文件引入并使用,clickhouse会自动给这个文件建立一个临时表。

其他功能:

可用性高

任何时候随时可以给表添加字段、属性、维度,不会拖慢或影响集群运行速度。
BI系统很大的一个痛点是维度的组合爆炸,而且经常需要新增,clickhouse针对性地优化了这一点。(如果是mysql要新增维度列,需要重做整个表,即使是mysql8的瞬加字段也不行)

流水线式的数据处理流程,数据一旦进入系统,那么立即处于可以使用的状态,边读(查询)边写没有任何压力。

How: clickhouse的底层实现原理

主要思想是根据OLAP的特征舍弃了一部分功能,然后针对性地优化了一部分功能。主要方法包括LSM(MergeTree系列表引擎)、稀疏索引(缓存友好)、列式存储+数据压缩、VectorWise、用概率算法进行近似等等。

需求分析

OLAP应用的特点:

  1. 大多数是读请求
  2. 数据总是以相当大的批(> 1000 rows)进行写入
  3. 不修改已添加的数据
  4. 每次查询都从数据库中读取大量的行,但是同时又仅需要少量的列
  5. 宽表,即每个表包含着大量的列
  6. 较少的查询(通常每台服务器每秒数百个查询或更少)
  7. 对于简单查询,允许延迟大约50毫秒
  8. 列中的数据相对较小: 数字和短字符串(例如,每个URL 60个字节)
  9. 处理单个查询时需要高吞吐量(每个服务器每秒高达数十亿行)
  10. 事务不是必须的
  11. 对数据一致性要求低
  12. 每一个查询除了一个大表外都很小
  13. 查询结果明显小于源数据,换句话说,数据被过滤或聚合后能够被盛放在单台服务器的内存中

面临的困难:

  1. 维度组合爆炸;
  2. 聚合数据后,如果有修改很蛋疼.
  3. URL这种无法预聚合.

需求洞察:

用户只关心聚合后中极小一部分

市场上的备胎: sparkSQL,Impala,Drill都不好用。

舍弃的功能

  1. 事务支持
  2. 快速修改、删除数据。 (可以低速批量删除、修改)
  3. 点查询(检索单行): 因为用的是稀疏索引。
    (好处是稀疏所以索引能完全放入内存,范围查询很快)
  4. 高并发查询: 只支持100/s量级查询,对于内网应用、分析型业务足够。
  5. 窗口函数。

实现

基于上述几点需求分析的优化:

  1. cpu: VectorWise方法,将压缩的列数据整理成现代CPU容易处理的Vector模式。利用现代CPU的多线程。 SIMD: 每次处理一批Vector数据。
  2. 提高内存利用率: 稀疏索引;
  3. 硬盘: MergeTree系列表引擎(LSM算法),批量合并写入,提高IO吞吐率;
  4. 算法: 近似算法/概率算法。

架构: 表=>shard=>replica=>partiton=>part

稀疏索引

对应index_granularity参数:

1
M(SettingUInt64, index_granularity, 8192, "How many rows correspond to one primary key value.") \

索引中相邻mark之间的数据行数,默认8192.
借助稀疏索引,它能存更多的索引在内存中。(相当于存了B树的前几层或二级索引)。

其他配置:
https://github.com/yandex/ClickHouse/blob/master/dbms/src/Storages/MergeTree/MergeTreeSettings.h

比如io配置:

1
M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).") \

超过多少Bytes以后绕过内核缓冲,进行直接IO。(节省内存开销、数据复制开销)

其他配置的分三大块:

1
2
3
4
5
/** Merge settings. */ \    合并时的配置
/** Inserts settings. */ \ 插入时的配置
/** Replication settings. */ \ 副本的配置
/** Check delay of replicas settings. */ \ 副本检查延迟配置
/** Compatibility settings */ \ 兼容性配置

稀疏索引示例

数据存储:

1
2
3
4
5
6
全部数据  :     [-------------------------------------------------------------------------]
CounterID: [aaaaaaaaaaaaaaaaaabbbbcdeeeeeeeeeeeeefgggggggghhhhhhhhhiiiiiiiiikllllllll]
Date: [1111111222222233331233211111222222333211111112122222223111112223311122333]
标记: | | | | | | | | | | |
a,1 a,2 a,3 b,3 e,2 e,3 g,1 h,2 i,1 i,3 l,3
标记号: 0 1 2 3 4 5 6 7 8 9 10
  1. CounterID in (‘a’, ‘h’): [0, 3) 和 [6, 8) 区间
  2. CounterID IN (‘a’, ‘h’) AND Date = 3 : [1, 3) 和 [7, 8) 区间
  3. Date = 3: 扫全表。

表由按主键排序的数据 part 组成。
当数据被插入到表中时,会分成part并按主键的字典序排序。例如,主键是 (CounterID, Date) 时,part中数据按 CounterID 排序,具有相同 CounterID 的部分按 Date 排序。

不会合并来自不同分区的数据片段。(性能考虑)
不保证相同主键的所有行都会合并到同一个数据片段中。(没有必要)

索引文件: 每个part创建一个
每隔index_granularity一个索引行号(mark);
对于每列,跟主键相同的索引行处也会写入mark。这些mark让你可以直接找到数据所在的列。

表引擎:MergeTree族引擎

表引擎(即表的类型)决定了:

数据的存储方式和位置,写到哪里以及从哪里读取数据
支持哪些查询以及如何支持。
并发数据访问。
索引的使用(如果存在)。
是否可以执行多线程请求。
数据复制参数。

clickhouse中最强大的都是合并树引擎系列。

  • 理念:
    批量写入,后台合并;

  • 特点:

  1. 数据按主键排序; (类似于聚簇)
  2. 允许使用主键分区; (类似于Hive)
  3. ReplicatedMergeTree系列支持副本(类似于hdfs)
  4. 支持数据采样;(类似于Mysql performanceSchema)

建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2
) ENGINE = MergeTree()
[PARTITION BY expr]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]

示例语句:

1
2
3
4
5
ENGINE MergeTree() 
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS index_granularity=8192

默认情况下主键跟排序键(由 ORDER BY 子句指定)相同。
这里可以看出它不支持唯一索引,重复是很自然的。由上层自己保证。

SummingMergeTree 和 AggregatingMergeTree 引擎中
列分为两种:

  • 维度
  • 度量 (各种pv,uv等等)

Mysql的做法是把所有维度作为主键; (每次新增维度很痛)
clickhouse的推荐做法是把旧的维度作为主键(保留少量),所有维度(旧维度+新维度)作为排序列。
这里排序列的修改是轻量级的:
旧的维度是整体排序列的前缀(已然有序),仅需排序新加的行。

推荐使用方案:
原始数据=> MergeTree (确保原始数据不丢失)
原始数据=> SummingMergeTree/AggregatingMergeTree (得到预聚合数据)

引擎会定期合并相同主键的数据进行聚合。最终结果中多半还是有重复主键,但是同一个part中不会有。

具体来说:
SummingMergeTree: 把相同排序列的行聚合。
被聚合的列在建表语句中通过columns指定。(数值、非主键)
(如果columns为空会聚合所有非排序列)

特殊情况:

  1. 某行所有度量列值都是0,直接删除该行;(sum优化)
  2. 非数值(无法汇总): 随机选一个值.
  3. 支持sumMap函数: 某列是map结构。

AggregatingMergeTree引擎

SummingMergeTree只支持算pv,AggregatingMergeTree能支持算uv,分位数,抽样,三个函数:

1
2
3
uniq
anyIf (any+If)
quantiles

创建:(物化视图)

1
2
3
4
5
6
7
8
9
10
CREATE MATERIALIZED VIEW test.basic
ENGINE = AggregatingMergeTree() PARTITION BY toYYYYMM(StartDate)
ORDER BY (CounterID, StartDate)
AS SELECT
CounterID,
StartDate,
sumState(Sign) AS Visits, -- 聚合1: pv
uniqState(UserID) AS Users -- 聚合2: uv 注意是记录了状态(特定的二进制表示法)
FROM test.visits
GROUP BY CounterID, StartDate;

插入数据的时候只需要插入到test.visits.
视图中也会有数据,并且会聚合。

查询:

1
2
3
4
5
6
7
SELECT
StartDate,
sumMerge(Visits) AS Visits, -- 注意都变成了merge后缀
uniqMerge(Users) AS Users
FROM test.basic
GROUP BY StartDate
ORDER BY StartDate;

算法: uniq

上一节中AggregatingMergeTree的uniq求uv,其实有三个函数:

1
2
3
uniq: 用UniquesHashSet近似求uv(BJKST算法)
uniqHLL12: 用HLL近似求uv
uniqExact: 用HashSet精确求uv

源码见:
https://github.com/yandex/ClickHouse/blob/master/dbms/src/AggregateFunctions/AggregateFunctionUniq.cpp

其中HLL就是HyperLogLog算法。

而第一个UniquesHashSet(https://github.com/yandex/ClickHouse/blob/ef50601b5ceeeaf5763eab6c0013954c12eb00b1/dbms/src/AggregateFunctions/UniquesHashSet.h)
两者的思想都是uv越大,不同的hash值越多。

UniquesHashSet的特点是内存消耗小,性能高。
具体实现是将输入hash到UInt32,然后插入到数组中,如果遇到碰撞则进行线性探测. (原始输入丢弃,只存hash值)随着插入进行达到阈值UNIQUES_HASH_MAX_SIZE时,则将当前存的值丢弃一半,只保留能整除2的值,提高skip_degree值,然后开始只接受能整除2的输入。依此类推,后续就是只接受整除4,8,16的值。最后获取结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
size_t size() const
{
if (0 == skip_degree)
return m_size;
size_t res = m_size * (1ULL << skip_degree);
/** Pseudo-random remainder - in order to be not visible,
* that the number is divided by the power of two.
*/
res += (intHashCRC32(m_size) & ((1ULL << skip_degree) - 1));
/** Correction of a systematic error due to collisions during hashing in UInt32.
* `fixed_res(res)` formula
* - with how many different elements of fixed_res,
* when randomly scattered across 2^32 buckets,
* filled buckets with average of res is obtained.
*/
size_t p32 = 1ULL << 32;
size_t fixed_res = round(p32 * (log(p32) - log(p32 - res)));
return fixed_res;
}

rehash的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void rehash()
{
for (size_t i = 0; i < buf_size(); ++i)
{
if (buf[i] && !good(buf[i]))
{
buf[i] = 0;
--m_size;
}
}

/** After removing the elements, there may have been room for items,
* which were placed further than necessary, due to a collision.
* You need to move them.
*/
for (size_t i = 0; i < buf_size(); ++i)
{
if (unlikely(buf[i] && i != place(buf[i])))
{
HashValue x = buf[i];
buf[i] = 0;
reinsertImpl(x);
}
}
}

其中good函数含义就是能否被2^skip_degree整除。

  • 线性探测:
    为了加快速度,增加了一个假设: 所有数据只插入Key/更新Key,不删除Key。
    (这个假设在大数据处理/统计的场景下,大多都是成立的,spark中openHashSet也是线性探测)
    有了这个假设它可以去掉拉链表,使用线性探测来实现哈希表。
  • 内存利用率高: 去掉了8B指针结构,能够创建更大的哈希表,冲突减少;
  • 内存紧凑: 位图操作快,一个内存page就能放下很多位图,8B就能放64个位置,缓存友好(while循环pos++)。

存储

假如表结构是:

1
2
3
4
5
6
create table test.mergetree1 
(sdt Date
, id UInt16
, name String
, cnt UInt16)
ENGINE=MergeTree(sdt, (id, name), 10);

分区字段是日期sdt.
对应的目录结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
├── 20180601_20180601_1_1_0
│ ├── checksums.txt
│ ├── columns.txt -- 元数据
│ ├── id.bin -- 压缩列
│ ├── id.mrk -- 索引mark
│ ├── name.bin
│ ├── name.mrk
│ ├── cnt.bin
│ ├── cnt.mrk
│ ├── cnt.idx
│ ├── primary.idx -- 主键
│ ├── sdt.bin
│ └── sdt.mrk -- 保存一下块偏移量
├── 20180602_20180602_2_2_0
│ └── ...
├── 20180603_20180603_3_3_0
│ └── ...
├── format_version.txt
└── detached -- 破损数据

总结

clickhouse为啥比hive/spark快:

  • 7*24小时都在后台预聚合.hive/spark计算的时候才申请资源,平时只占一点点;
  • 可以用星型模型缩减数据类型、压缩友好;
  • 计算过程没有hive/spark中的shuffle概念,全是mapAgg;

clickhouse为啥比mysql快:(仅限clickhouse擅长的查询)

  • 预聚合
  • 多核优化、vector优化更彻底
  • 分区+稀疏索引,整个索引能放内存,然后并发查part(这点还是要结合多核优化)
  • 根据排序键排序存放

优化的方面:

  1. cpu: VectorWise方法,将压缩的列数据整理成现代CPU容易处理的Vector模式。利用现代CPU的多线程。 SIMD: 每次处理一批Vector数据。
  2. 提高内存利用率: 稀疏索引;
  3. 硬盘: MergeTree系列表引擎(LSM算法),批量合并写入,提高IO吞吐率,牺牲随机读能力;
  4. 算法: 近似算法/概率算法,HLL\BJKST算法等。

io中的缓冲——如何理解O_Direct

IO缓冲主要有4层:
1.用户自己的缓冲;
2.库缓冲;
3.内核缓冲;
4.磁盘缓冲。

=== 应用层: (进程挂丢数据) 看到文件句柄
application buffer: 比如我们代码中写的int[]arrayData;
clib buffer: fwrite以后到这层。这里写的是c库(IObuffer),也可能是java库中的缓冲(BufferedOutputStream)。
如果数据才到这一层库缓冲,还没系统调用,此时程序core dump的话,数据就丢了。
=== 内核层: (内核挂丢数据) 看到inode和数据块
page cache: 内核层的缓冲。fflush以后到这里, fclose先到这里然后继续到磁盘。
driver: 具体设备的驱动软件
=== 设备层: (断电丢数据) 看到扇区
disk cache: 磁盘缓冲。fsync/fclose至少到这里。fsync是同步会完全等返回。

为啥要有库缓冲

(比如clib buffer
因为从应用层到内核层需要系统调用、内核态切换,开销比较大,为了减少这件事发生的次数,没有必要因为1个字节的改动发生系统调用。

绕过库缓冲的方案:

用mmap(内存映射文件),把内核空间的page cache映射到用户空间。

为啥要有内核缓冲

内核用pdflush线程循环检测脏页,判断是否写回磁盘。
由于磁盘是单向旋转,重新排布写操作顺序可以减少旋转次数。(合并写入)
plus:
O_SYNC参数: 访问内核缓冲时是异步还是同步。O_SYNC表示同步。

绕过内核缓冲的方案

O_Direct参数,直接怼Disk cache。

为啥要有磁盘缓冲

驱动通过DMA,将数据写入磁盘cache。
磁盘缓冲主要是保护磁盘不被cpu写坏。是与外部总线交换数据的场所。(断电丢数据)

绕过磁盘缓冲的方案

RAW设备写,直接写扇区: fdisk,dd,cpio工具。

领域驱动设计-第一~六章笔记

领域驱动设计(DDD)这本书主要是讲抽象概念、理念、思想,具体可以有不同的实现,具体明确一些尺度或者细节。

第一章 消化知识

这一节主要讲了作者开发一个用于印刷电路板(PCB)的软件工具的过程。
他请教了相关专家,逐步建模。

涉及到的角色:

软件开发工程师、业务方

涉及到的过程:

需求分析、提炼模型
消化PCB相关的业务知识、
形成业务方和开发工程师都能看懂的共同语言/名词(类似于DSL)

我的理解是相当于产品做需求分析的那步,由程序员直接参与,快速迭代讨论形成DSL,省得吃产品理解的二手需求经常出错。

DSL相关:

1.4节还提到用策略设计模式来写代码,让需求方也能看懂代码中运用的策略(可读性),让DSL范围扩散到架构图和顶层代码。

第二章 语言的使用

按我理解还是要形成DSL,图简略。
语言精简程度=>大量使用短语。

第三章 绑定模型和实现

模型和实现要对应,不然就难以维护实现,模型失去意义。
推荐了一下面向对象编程。
建模与代码实现不能完全分离。

第二部分 模型驱动设计的构造块

设计原则:职责驱动
(SOLID原则)

模型驱动涉及到的几个构造块的概念:
Service: 服务;
Entity: 实体,可以理解成数据库中的一张表;
Value Object: 值对象,传输中的不可变对象,为了明确不可变的特性;
Factory: 工厂;
Reposity: 负责隐藏存储层细节;
Aggregate: 负责封装多组VO/实体,聚合;(有说法这个不是某个类,而是一个虚拟概念)
Aggregate Root: 一个实体,作为网关服务,修改生成Aggregate,服务给外界.(我理解就是粗粒度封装entity,减少整个领域的接口数量、暴露在外的引用数量)

领域外的术语:
DTO: 和VO太像了; 但不在领域驱动的语境里;
POJO: 纯粹属于语法语境了;

第四章 分离领域

分离领域,也就是分层架构,然后重点是把领域层分离出来。
ddd的四个概念层:

  • UI层/表示层: view
  • 应用层: 尽量薄,按我理解就是controller
  • 领域层/模型层: 业务概念、状态、规则(按我理解就是logic)
  • 基础设施层:持久化、消息传递、UI渲染

层之间松散连接、单向依赖。
如果下层要调用上层: 回调、观察者模式。

不要把所有东西都放到applicationContext里头,只放助于解耦的部分。
这个是出于性能考虑,毕竟spring是用一个concurrentHashMap作为Ioc容器的。啥都往里头奔放了。
(可以只放大粒度对象。

smartUI: 就是在UI代码里写很多逻辑,巨复杂不好维护。不推荐。

第五章 软件中所表示的模型

Entity: 实体,可以跨实现跟踪,(可以在数据库中查到).
Value Object: VO, 仅用于传输,是某个状态的镜像,不可变。
Service: 服务,只封装方法,无状态,可以放心调用。粒度中等为好。

简化关系

遇到多对多、双向关联关系,可以寻找自然偏向,从而把它简化为:1对多,单向关联。

例子:
一般问美国1790年的总统是谁,而比较少问华盛顿是哪个国家的总统。
所以国家和总统的关系可以简化为单向。

第六章 领域对象的生命周期

Aggregate

根/Aggregate Root: 汽车Entity
边界内实体/Entity: 轮胎

Aggregate外部只引用根。(利于垃圾收集)
边界内其他实体以Value Object形式交付给外部。(不可变,利于修改可控,修改由根控制,保证一致性)

Factory

创建对象或Aggregate工作很复杂的时候,可以引入Factory来封装。
Factory的设计模式包括:

  • 简单工厂: 对字符串switch case;或传入类名,反射创建对象;

  • 工厂: 不把所有创建放一个类,每个类有自己的工厂,巨啰嗦;(符合开闭原则,用冗余来获取简单)

  • 抽象工厂

  • Builder模式

    Factory的作用:

    1. 把和运行时工作无关的复杂创建逻辑抽离到别的地方;
    2. 解耦两个类的关联:(跨Agg边界的两个类) 比如需要用账号类创建交易对象,但是本质上账号对象和交易对象关系比较弱,可以用工厂来创建,这样耦合度低一些。

防止Factory的滥用导致类膨胀,正常情况直接用构造函数。
Factory还可以负责反序列化,重建对象。

Repository

封装对数据的访问,方便随时切换底层实现(内存哑实现、缓存)

事务

repository不封装事务,事务的决定权留给调用方。

Factory vs Repository

Factory: 生命周期的开始(Entity的创建);
Repository: 生命周期的中间和结束(从数据库查出数据重建Entity不算新建).

spark-sql中的分位数算法

spark有两个分位数算法:

  1. percentile: 接受Int,Long,精确计算。底层用OpenHashMap,计数,然后排序key;
  2. percentile_approx:接受Int,Long,Double,近似计算。用的GK算法。论文参见《Space-efficient Online Computation of Quantile Summaries》(http://dx.doi.org/10.1145/375663.375670)
    基本思想是以最小空间完成分位数统计,比如把相邻的1w个数压缩成(平均数,个数)元组。如果空间够用,就不进行这种压缩。(所以如果如果统计90分位数,传入的精度参数至少应为10,如果统计999分位数,传入的精度参数至少为1000,默认精度是10000。)

俩算法和Hive版本的基本是一样的。
区别:

  1. spark的percentile多了一个频次参数,也就是可以接受分阶段聚合;(percentile_approx木有)
  2. spark底层用的openHashMap,速度快5倍,内存消耗更少。

为啥OpenHashMap性能优于HashMap?

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
OpenHashMap为了加快速度,增加了一个假设: 所有数据只插入Key/更新Key,不删除Key。
(这个假设在大数据处理/统计的场景下,大多都是成立的)
有了这个假设它可以去掉拉链表,使用线性探测的开放定址法来实现哈希表。

OpenHashMap底层数据委托给了OpenHashSet,所以本质上是看OpenHashSet为啥快。
OpenHashSet用BitSet(位图)来存储在不在集合中(位运算,很快),另开一个数组存储实际数据:

1
2
3
protected var _bitset = new BitSet(_capacity)
protected var _data: Array[T] = _
_data = new Array[T](_capacity)

这俩成员始终保持等长,_bitset的下标x位置为1时,_data的下标x位置为中就有实际数据。(手动维持联动)
插入数据时,hash一下key生成pos,看看_bitset中对应位置有没有被占用,有的话就死循环++pos:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def addWithoutResize(k: T): Int = {
var pos = hashcode(hasher.hash(k)) & _mask
var delta = 1
while (true) {
if (!_bitset.get(pos)) {
// This is a new key.
_data(pos) = k
_bitset.set(pos)
_size += 1
return pos | NONEXISTENCE_MASK
} else if (_data(pos) == k) {
// Found an existing key.
return pos
} else {
// quadratic probing with values increase by 1, 2, 3, ...
pos = (pos + delta) & _mask
delta += 1
}
}
throw new RuntimeException("Should never reach here.")
}

逻辑很简单,由于假设了不会删除key,线性探测法变得实用。

小结一下OpenHashSet快的原因:

  1. 内存利用率高: 去掉了8B指针结构,能够创建更大的哈希表,冲突减少;
  2. 内存紧凑: 位图操作快,一个内存page就能放下很多位图,8B就能放64个位置,缓存友好(while循环pos++)。

percentile实现:

Percentile.scala文件:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
首先看注释:

1
2
3
4
/* Because the number of elements and their partial order cannot be determined in advance.
* Therefore we have to store all the elements in memory, and so notice that too many elements can
* cause GC paused and eventually OutOfMemory Errors.
/

基本思想是把所有元素保存在内存中。
因此它其实支持两阶段聚合:
_FUNC_(col, percentage [, frequency])
可以传入一个参数frequency表示频次.
// 2017-02-07加上的特性,比我写hive版本的分阶段聚合udaf早了10个月。

percentile_approx实现

代码:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
底层委托给QuantileSummaries实现的。
主要有俩个成员变量:

1
2
sample: Array[Stat] : 存放桶,超过1000个桶的时候就压缩(生成新的三元组);
headSampled: ArrayBuffer[Double]:缓冲区,每次达到5000个,就排序后更新到sample.

主要思想是减少空间占用,因此很多排序,spark的实现merge sample的时候甚至都没有管俩sample已经有序了,直接sort了:

1
2
3
4
5
// TODO: could replace full sort by ordered merge, the two lists are known to be sorted already.
val res = (sampled ++ other.sampled).sortBy(_.value)
val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count)
new QuantileSummaries(
other.compressThreshold, other.relativeError, comp, other.count + count)

Stat的定义:

1
2
3
4
5
6
7
/**
* Statistics from the Greenwald-Khanna paper.
* @param value the sampled value
* @param g the minimum rank jump from the previous value's minimum rank
* @param delta the maximum span of the rank.
*/
case class Stats(value: Double, g: Int, delta: Int)

插入的函数:(每N个数,排序至少1次(merge还有1次),因此是O(NlogN))

1
2
3
4
5
6
7
8
9
10
11
12
13
def insert(x: Double): QuantileSummaries = {
headSampled += x
if (headSampled.size >= defaultHeadSize) {
val result = this.withHeadBufferInserted
if (result.sampled.length >= compressThreshold) {
result.compress()
} else {
result
}
} else {
this
}
}

插入数据的其中一个步骤:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private def withHeadBufferInserted: QuantileSummaries = {
if (headSampled.isEmpty) {
return this
}
var currentCount = count
val sorted = headSampled.toArray.sorted
val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]()
// The index of the next element to insert
var sampleIdx = 0
// The index of the sample currently being inserted.
var opsIdx: Int = 0
while (opsIdx < sorted.length) {
val currentSample = sorted(opsIdx)
// Add all the samples before the next observation.
while (sampleIdx < sampled.length && sampled(sampleIdx).value <= currentSample) {
newSamples += sampled(sampleIdx)
sampleIdx += 1
}

// If it is the first one to insert, of if it is the last one
currentCount += 1
val delta =
if (newSamples.isEmpty || (sampleIdx == sampled.length && opsIdx == sorted.length - 1)) {
0
} else {
math.floor(2 * relativeError * currentCount).toInt
}

val tuple = Stats(currentSample, 1, delta)
newSamples += tuple
opsIdx += 1
}

// Add all the remaining existing samples
while (sampleIdx < sampled.length) {
newSamples += sampled(sampleIdx)
sampleIdx += 1
}
new QuantileSummaries(compressThreshold, relativeError, newSamples.toArray, currentCount)
}

获取结果:O(n)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Target rank
val rank = math.ceil(quantile * count).toInt
val targetError = math.ceil(relativeError * count)
// Minimum rank at current sample
var minRank = 0
var i = 1
while (i < sampled.length - 1) {
val curSample = sampled(i)
minRank += curSample.g
val maxRank = minRank + curSample.delta
if (maxRank - targetError <= rank && rank <= minRank + targetError) {
return Some(curSample.value)
}
i += 1
}

优化思路

结合yuange在微博/km上分享的思路,用计数器区代替密集数据区的hashmap(其实也是GK算法的精确版)。逼近O(N)复杂度。
// TODO benchmark、优化算法

spark中编写UDAF的4种姿势

摘要:

(探索解决sql的多行处理能力盲区)

  1. 搭配collect_set+UDF;

  2. RDD: combineByKey;

  3. Dataframe: 继承UserDefinedAggregateFunction;

  4. Dataset: 继承Aggregator。

    前文探索了解决sql对于单行处理的能力盲区(http://xiaoyue26.github.io/2019/05/08/2019-05/%E5%B0%86pyspark%E4%B8%AD%E7%9A%84UDF%E5%8A%A0%E9%80%9F50/ ),本文接着探索解决sql对于多行处理(UDAF/用户自定义聚合函数)的能力盲区。

姿势1:搭配collect_set+UDF

基本思想是强行把一个group行拼成一个数组,然后编写一个能处理数组的UDF即可。如果是多行变多行,则UDF的输出也构造成数组,然后用explode打开。如果想要把多行聚合成一行(类似于sum),则直接输出结果即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def str_list2idfa(txt_list):
try:
res = list()
for txt in txt_list:
res.append(str2idfa(txt))
return res
except:
return []


if __name__ == '__main__':
spark = SparkSession.builder.appName(app_name).getOrCreate()
provider = TDWSQLProvider(spark, user=user, passwd=passwd, db=db_name)

in_df = provider.table(in_table_name, ['p_2019042100']) # 分区数组
print(in_df.columns)
# 1. 创建udaf:
str_list2idfa_udaf = udf(str_list2idfa # 实际运行的函数
, ArrayType(ArrayType(StringType())) # 函数返回值类型
)
# 2. 在df中使用,将数组转成二维数组:
out_t1 = in_df.groupBy('uin').agg(
str_list2idfa_udaf(
collect_set('value')
).alias('value_list')
)
print(out_t1.columns)
out_t1.printSchema()
out_t1.createOrReplaceTempView("t1")
# 3. 将二维数组打开,一行变多行,一列变两列:
out_t2 = spark.sql('''
select uin
,idfa_idfv[0] as idfa
,idfa_idfv[1] as idfv
from t1
lateral view explode(value_list) tt as idfa_idfv
''')
out_t2.printSchema()
print(out_t2.take(10))
  • 优点: 开发成本低,不用编译。
  • 缺点: 性能一般,增加了转换数组、explode的成本,可能导致聚合过程完全在单点进行,对于数据倾斜的承受能力较低。

姿势2: 使用RDD的combineByKey算子

上述方法本质上是用UDF强行模拟了UDAF的功能,性能上有所损失。第二种方法是使用RDD的combineByKey算子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
val spark = SparkSession.builder.appName("UdafDemo").getOrCreate()
val rddProvider = new TDWProvider(spark.sparkContext, user, pass, db) // 这个返回rdd
val inRdd = rddProvider.table("t_dw_dc0xxxx", Array("p_2019042100"))
println("getNumPartitions:")
println(inRdd.getNumPartitions)
val kvRdd: RDD[(Long, String)] = inRdd
.map(row => (row(3).toLong, UdfUtils.str2idfa(row(9))))
.filter(x => x._2.isDefined)
.map(x => (x._1, x._2.get))

val combineRdd: RDD[(Long, mutable.Set[String])] = kvRdd
.combineByKey(
(v: String) => mutable.Set(v),
(set: mutable.Set[String], v: String) => set += v,
(set1: mutable.Set[String], set2: mutable.Set[String]) => set1 ++= set2)

val outRdd: RDD[(Long, String)] = combineRdd.flatMap(kv => {
val uin = kv._1
val set = kv._2
val res = mutable.MutableList.empty[(Long, String)]
set.foreach(x => res += ((uin, x)))
res.iterator
})
outRdd.take(10).foreach(println)
// println(outRdd.count())
  • 优点: 代码简洁,容易理解,性能高;
  • 缺点: 需要学习RDD相关知识。

姿势3: 使用Dataframe(继承UserDefinedAggregateFunction)

假设用户比较熟悉Dataframe操作,还可以通过继承UserDefinedAggregateFunction类编写一个完整的UDAF:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// part1: UDAF:
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

object DfUdaf extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil)

// Map[String,Null] 当Set用了
def bufferSchema: StructType = new StructType().add("idfa_idfv", MapType(StringType, NullType))

override def dataType: DataType = MapType(StringType, NullType)

override def deterministic: Boolean = true

override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, Map[String, Null]())
}

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val map = buffer.getAs[Map[String, Null]](0)
val value = input.getAs[String](0)
val idfa_idfv = UdfUtils.str2idfa(value)
if (idfa_idfv.isDefined) {
buffer.update(0, map ++ Map(idfa_idfv.get -> null))
}
}

override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val map1 = buffer1.getAs[Map[String, Null]](0)
val map2 = buffer2.getAs[Map[String, Null]](0)
buffer1.update(0, map1 ++ map2)
}

override def evaluate(buffer: Row): Any = buffer.getAs[Map[String, Null]](0)
}

// part2: main函数:
val spark = SparkSession.builder.appName("UdafDemo").getOrCreate()
val sqlProvider = new TDWSQLProvider(spark, user, pass, db)

// val rddProvider = TDWProvider(spark.sparkContext, user, pass, db) // 这个返回rdd
val inDf = sqlProvider.table("t_dw_dc0xxxx", Array("p_2019042100"))
println("getNumPartitions:")
println(inDf.rdd.getNumPartitions)

spark.udf.register("collect_idfa", DfUdaf)
inDf.createOrReplaceTempView("t1")
val outDf = spark.sql("" +
"select uin,idfa_idfv " +
"from " +
"(select uin,collect_idfa(value) as vmap from t1 group by uin) a " +
"lateral view explode(vmap) tt as idfa_idfv,n" +
"")
outDf.take(10).foreach(println)

优点: 可以直接在sql中引用,重用性高,性能高;
缺点: 开发成本高,只支持scala,需要编译。

姿势4: 使用Dataset(继承Aggregator)

如果用户对于Dataset的api比较熟悉,可以继承Aggregator开发UDAF:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// part1: UDAF:
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator

class DsUdaf[IN](val f: IN => String) extends Aggregator[IN, Set[String], Set[String]] {

override def zero: Set[String] = Set[String]()

override def reduce(buf: Set[String], a: IN): Set[String] = {
val idfa_idfv = UdfUtils.str2idfa(f(a))
buf ++ idfa_idfv
}

override def merge(b1: Set[String], b2: Set[String]): Set[String] = b1 ++ b2

override def finish(reduction: Set[String]): Set[String] = reduction

override def bufferEncoder: Encoder[Set[String]] = Encoders.kryo[Set[String]]

override def outputEncoder: Encoder[Set[String]] = Encoders.kryo[Set[String]]

}
// part2: main函数:
val spark = SparkSession.builder.appName("UdfDemo").getOrCreate()
val sqlProvider = new TDWSQLProvider(spark, user, pass, db)

// val rddProvider = TDWProvider(spark.sparkContext, user, pass, db) // 这个返回rdd
val inDf = sqlProvider.table("t_dw_dc0xxxx", Array("p_2019042100"))
println("getNumPartitions:")
println(inDf.rdd.getNumPartitions)

import spark.implicits._
inDf.createOrReplaceTempView("t1")
val df2 = spark.sql("select uin,value from t1")
df2.printSchema()

val inDS = df2.as[UinValue]
// inDS.take(10).foreach(println)
val outDs: Dataset[(Long, Set[String])] = inDS.groupByKey(_.uin).agg(new DsUdaf[UinValue](_.value).toColumn)
// outDs.take(10).foreach(println)
val ds2 = outDs.flatMap(pair => {
val uin = pair._1
val idfa_set = pair._2
idfa_set.map(idfa => (uin, idfa))
})
ds2.printSchema()
ds2.take(10).foreach(println)

其中Encoder部分由于还不支持Set集合类型,可以使用kryo序列化成二进制。(更多Encoder相关参见:http://xiaoyue26.github.io/2019/04/27/2019-04/spark%E4%B8%AD%E7%9A%84encoder/

优点: 类型安全,继承Aggregator开发的成本略小于继承UserDefinedAggregateFunction;
缺点: 只支持scala,需要编译。

总结

本文总结了在Rdd,Dataframe,Dataset三种api下编写UDAF的方法(三种api的对比参见http://xiaoyue26.github.io/2019/04/29/2019-04/spark%E4%B8%ADRDD%EF%BC%8CDataframe%EF%BC%8CDataSet%E5%8C%BA%E5%88%AB%E5%AF%B9%E6%AF%94/ ),以及使用UDF模拟UDAF功能的方法。大家可以根据自己熟悉的api和需求选择。

  • 如果不在意性能:用collect_set+UDF模拟一个;(姿势1)
  • 如果在意性能,但是只用一次: 可以直接用RDD的combineByKey,代码较短;(姿势2)
  • 如果在意性能,而且会反复复用: 建议使用Dataframe,继承UserDefinedAggregateFunction编写一个UDAF。(姿势3)

将pyspark中的UDF加速50%

摘要

调用jar中的UDF,减少python与JVM的交互,简单banchmark下对于50G数据集纯map处理可以减少一半处理时间。
牺牲UDF部分的开发时间,尽量提高性能。
以接近纯python的开发成本,获得逼近纯scala的性能。兼顾性能和开发效率。

背景

当遇到sql无法直接处理的数据时(比如加密解密、thrift解析操作二进制),我们需要自定义函数(UDF)来进行处理。出于开发效率的考虑,我们一般会选择tesla平台,使用pyspark脚本。

Before: 最简单的UDF

一个最简单的UDF处理大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def str2idfa(txt):
try:
txtDecoded = base64.b64decode(txt)
bytesWithSalt = bytes(txtDecoded)
# 省略实际处理代码
return 'dump_data'
except:
print('error here')
return '-1#-1'


if __name__ == '__main__':
spark = SparkSession.builder.appName(app_name).getOrCreate()
in_provider = TDWSQLProvider(spark, user=user, passwd=passwd, db=db_name)

in_df = in_provider.table('t_dw_dc0xxxx', ['p_2019042100']) # 分区数组
print(in_df.columns)
in_df.createOrReplaceTempView("t1")
# 1. 注册udf:
spark.udf.register("str2idfa", str2idfa, StringType())
# 2. 在sql中使用:
out_t1 = spark.sql('''select uin
,str2idfa(value) as idfa_idfv
from t1
''')
print(out_t1.columns)
print(out_t1.take(10))

底层实现原理

如上图所示,pyspark并没有像dpark一样用python重新实现一个计算引擎,依旧是复用了scala的jvm计算底层,只是用py4j架设了一条python进程和jvm互相调用的桥梁。
driver: pyspark脚本和sparkContext的jvm使用py4j相互调用;
executor: 由于driver帮忙把spark算子封装好了,执行计划也生成了字节码,一般情况下不需要python进程参与,仅当需要运行UDF(含lambda表达式形式)时,将它委托给python进程处理(DAG图中的BatchEvalPython步骤),此时JVM和python进程使用socket通信。

上述使用简单UDF时的pyspark由于需要使用UDF,因此DAG图中有BatchEvalPython步骤:

BatchEvalPython过程

参考源码:https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
可以看到和这个名字一样直白,它就是每次取100条数据让python进程帮忙处理一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 第58行:
// Input iterator to Python: input rows are grouped so we send them in batches to Python.
// For each row, add it to the queue.
val inputIterator = iter.map { row =>
if (needConversion) {
EvaluatePython.toJava(row, schema)
} else {
// fast path for these types that does not need conversion in Python
val fields = new Array[Any](row.numFields)
var i = 0
while (i < row.numFields) {
val dt = dataTypes(i)
fields(i) = EvaluatePython.toJava(row.get(i, dt), dt)
i += 1
}
fields
}
}.grouped(100).map(x => pickle.dumps(x.toArray))

由于我们的计算任务一般耗时瓶颈在于executor端的计算而不是driver,因此应该考虑尽量减少executor端调用python代码的次数从而优化性能。

参考源码:https://github.com/apache/spark/blob/master/python/pyspark/java_gateway.py

1
2
3
4
5
6
7
8
9
10
11
12
// 大概135行的地方:
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.ml.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
# TODO(davies): move into sql
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")

pyspark可以把很多常见的运算封装到JVM中,但是显然不包括我们的UDF。
所以一个很自然的思路就是把我们的UDF也封到JVM中。

After: 调用JAR包中UDF

首先我们需要用scala重写一下UDF:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
object UdfUtils extends java.io.Serializable {

case class Idfa(idfa: String, idfv: String) {
private def coalesce(V: String, defV: String) =
if (V == null) defV else V

override def toString: String = coalesce(idfa, "-1") + "#" + coalesce(idfv, "-1")
}

def str2idfa(txt: String): Option[String] = {
try {
val decodeTxt: Array[Byte] = Base64.getDecoder.decode(txt)
// TODO 省略一些处理逻辑
val str = "after_some_time"
val gson = new Gson()
val reader = new JsonReader(new StringReader(str))
reader.setLenient(true)
val idfaType: Type = new TypeToken[Idfa]() {}.getType
Some(gson.fromJson(reader, idfaType).toString)
}
catch {
case e: Throwable =>
println(txt)
e.printStackTrace()
None
}
}
// 关键是这里把普通函数转成UDF:
def str2idfaUDF: UserDefinedFunction = udf(str2idfa _)

然后在pyspark脚本里调用jar包中的UDF:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
from pytoolkit import TDWSQLProvider, TDWUtil, TDWProvider
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, LongType, StringType, StructField, IntegerType
from pyspark.sql.functions import udf, struct, array
from pyspark.sql.column import Column
from pyspark.sql.column import _to_java_column
from pyspark.sql.column import _to_seq
from pyspark.sql.functions import col

def str2idfa(col):
_str2idfa = sc._jvm.com.tencent.kandian.utils.UdfUtils.str2idfaUDF()
return Column(_str2idfa.apply(_to_seq(sc, [col], _to_java_column)))


spark = SparkSession.builder.appName(app_name).getOrCreate()
sc = spark.sparkContext
if __name__ == '__main__':
in_provider = TDWSQLProvider(spark, user=user, passwd=passwd, db=db_name)
in_df = in_provider.table('t_dw_dcxxxx', ['p_2019042100']) # 分区数组
print(in_df.columns)
in_df.createOrReplaceTempView("t1")
out_t1 = in_df.select(col('uin')
, str2idfa(col("value"))) # 直接使用scala的udf,节省43%时间,减少两个transform
print(out_t1.columns)
print(out_t1.take(10))

其中_jvm变量是sparkContextJVMView对象的名字,此外sc中还有_gateway变量以连接JVM中的GatawayServer
提交时,在tesla上的配置spark-confjar包路径:

1
2
spark.driver.extraClassPath=pipe-udf-1.0-SNAPSHOT-jar-with-dependencies.jar
spark.executor.extraClassPath=pipe-udf-1.0-SNAPSHOT-jar-with-dependencies.jar

同时在依赖包文件中上传jar包。

这样一通操作之后,DAG图变成了这样:

可以看到比之前少了两个transform,没有了BatchEvalPython,也少了一个WholeStageCodeGen
经过简单banchmark,对于50G数据集纯map处理。
第一种方案:大约13分钟;
第二种方案:大约7分钟。
第二种方案大约能节省一半的时间,并且进一步测试使用scala完全重写整个计算,运行时间和第二种方案接近,也大约需要7分钟。

总结

在pyspark中尽量使用spark算子和spark-sql,同时尽量将UDF(含lambda表达式形式)封装到一个地方减少JVM和python脚本的交互。
由于BatchEvalPython过程每次处理100行,也可以把多行聚合成一行减少交互次数。
最后还可以把UDF部分用scala重写打包成jar包,其他部分则保持python脚本以获得不用编译随时修改的灵活性,以兼顾性能和开发效率。

spark中RDD,Dataframe,DataSet区别对比

RDD,Dataframe,DataSet的定义

RDD: immutable、spark的基础数据集。底层api; 1.0+, 存放java/scala对象;
Dataframe: immutable、多了列名、Catalyst优化。高级api; 1.3+, 存放row对象;(有列名)
Dataset: 比Dataframe多类型安全。高级api。 1.6+, 存放java/scala对象(暴露更多信息给编译期)
可以理解成DataSet每行存一个大对象。(比如样例中的DataSet[Person]

DataSet一般比DataFrame慢一点点,多了类型安全的开销(Row=>Java类型

这里说的类型安全是什么?

是编译期类型安全

Dataframe: 访问不存在的列名=>运行时报错; (非类型安全)
DataSet: 访问不存在的列名=>编译时报错。 (类型安全)

DataSet的中每个元素都是case class之类的,完全定义类型的,因此编译时就能确定schema合法性。

Dataset[Row] = DataFrame
因此Row可以看做非编译期类型安全的对象。

java/scala对象 <=> DataFrame的row对象/DataSet的元素:

  1. 对于dataframe: 传入StructType;
  2. 对于dataSet: 使用Encoder。

类似于hive的反序列化:

hive反序列化原理:

1
HDFS files –> InputFileFormat –> <key, value> –> Deserializer –> Row object

hive序列化原理(Serialize):

1
Row object –> Serializer –> <key, value> –> OutputFileFormat –> HDFS files

Encoder: Dataset[Row] -> Dataset[T]
JVM对象和非堆自定义内存二进制数据
Encoders生成二进制代码来跟非堆数据交互,并且提供有需访问给独立的参数而不是对整个对象反序列化。

优化

RDD: 没有优化, 程序员自己保证RDD运算是最优的;
DataFrame: 走catalyst编译优化,类似于Sql的优化。根据成本模型,逻辑执行计划优化成物理执行计划。
DataSet: 同DataFrame.

强调一点,DataFrame底层也是用的RDD实现,因此如果程序员足够牛逼,理论上执行计划能写得比DataFrame的计划好。

序列化

shuffle的时候、或者cache写内存、磁盘的时候,需要序列化。

RDD: 使用java序列化(或者kryo)成二进制; (成本高)
DataFrame: Tungsten计划优化。序列化到堆外内存,然后无须反序列化,直接根据schema操作二进制运算。(因为DataFrame比RDD多一个schema信息)
https://github.com/hustnn/TungstenSecret
DataSet: 基本同DataFrame,多了Encoder的概念。访问某列的时候,可以只反序列化那个局部的二进制。

Tungsten binary format

钨丝计划使用的二进制格式

垃圾收集

RDD: 由于上一节中的序列化,gc压力较大;
DataFrame: 放堆外内存,无需jvm对象头开销,无gc;
DataSet: 同df.

钨丝计划的秘密:

https://github.com/hustnn/TungstenSecret

总结一下钨丝计划的3大优化:

  1. 内存管理、直接操作二进制数据:放堆外内存(避免gc开销),不用jvm对象(减少对象头开销)。
  2. 缓存友好的计算: 考虑存储体系;(flink也有) 对指针排序,根据schema访问key的二进制。(估计key只能是原生类型,其他类就得反序列化了)
  3. code generation:充分利用最新的编译期和cpu性能:把jvm对象转到堆外unsafeRow,以便利用第一点。

比如8B的String的对象头有40B开销。
UnsafeShuffleManager: 直接在serialized binary data上sort而不是java objects

wholeStageCodeGen

把transform的很多步骤,合并成一个步骤,使用字符串插值生成一份重写后的代码(逼近于手写最优解),然后用Janino(微型运行时嵌入式java编译器)编译成字节码。

spark中的encoder

参考资料:
https://stackoverflow.com/questions/53949497/why-a-encoder-is-needed-for-creating-dataset-in-spark
It also uses less memory than Kryo/Java serialization.

What: Encoder是啥?

所有DataSet都需要Encoder

Encoder是spark-sql用来序列化/反序列化的一个类。主要用于DataSet
本质上每次调用toDS()函数的时候都调用了Encoder,不过有时候我们察觉不到,因为用了隐式调用(import spark.implicits._)。
可以直接看Encoder源码注释中的样例:

1
2
3
4
import spark.implicits._
val ds = Seq(1, 2, 3).toDS() // implicitly provided (spark.implicits.newIntEncoder)
// 将数字(JVM对象)转换为DataSet中的元素
// 这里由于是常见的原始类型,所以spark提供了隐式encoder的调用,隐藏了这些细节。

Encoder将jvm转换为堆外内存二进制,使用成员位置信息,降低反序列化的范围(反序列化需要的列即可)。
// (类似于Hive中的反序列化,把kv转换为row)

Encoder不要求线程安全。

Why: 为啥用Encoder?

stackoverflow上说encoder消耗更少的内存。因为kryo把dataSet中的所有行都变成了一个打平的二进制对象。
10x faster than Kryo serialization (Java serialization orders of magnitude slower)
DataFrame本质上是DataSet[Row],用的固定是RowEncoder,所以不需要传Encoder。
Encoder底层是钨丝计划的堆外内存优化,节省了jvm对象头、反序列化、gc的开销。

When: 啥时候使用Encoder

Encoder适用于原始类型、case class对象(因为有默认的apply/unapply方法)、spark-sql类型。

Encoder支持的类型非常多,不支持的情况:

1
2
3
4
1. 如果类型是javabean,类的成员如果是容器,只能是List,不能是其他容器(还没有实现);
2. 不支持大于5的Tuple;
3. 不支持`Option`;
4. 不支持`null`值的`case class`。

不支持的时候,可以把不支持的部分用kyro-Encoder,相当于不支持的部分直接当做一个二进制,不享受优化,但其他不支持部分可以享受优化。

How: 怎么使用Encoder:

显式: 使用Encoders类(类似于utils)的静态工厂方法;
隐式:import spark.implicits._: 原始类型和Product类型(也就是case class)可以直接隐式支持。

1. 创建DataSet时显式使用:

源码注释中的样例:

1
2
3
4
5
6
7
// eg1: String:
List<String> data = Arrays.asList("abc", "abc", "xyz");
Dataset<String> ds = context.createDataset(data, Encoders.STRING());
// eg2: 复合Tuple:
Encoder<Tuple2<Integer, String>> encoder2 = Encoders.tuple(Encoders.INT(), Encoders.STRING());
List<Tuple2<Integer, String>> data2 = Arrays.asList(new scala.Tuple2(1, "a");
Dataset<Tuple2<Integer, String>> ds2 = context.createDataset(data2, encoder2);

2. 创建DataSet时隐式使用:

看createDataset的签名:

1
2
def createDataset[T](data: RDD[T])(implicit arg0: Encoder[T]): Dataset[T]
// Creates a Dataset from an RDD of a given type.

所以Encoder其实可以隐式传:

1
2
import spark.implicits._
val ds = Seq(1, 2, 3).toDS() // implicitly provided

3. UDAF中使用:

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala

当我们为DataSet定义UDAF的使用。
语义上: 因为涉及到数据转换,不可避免地会需要使用Encoder,这个时候是显式使用。
语法上: 由于继承了Aggregator也必须使用Encoder

源码阅读

Encoder

1
2
3
4
5
6
7
8
trait Encoder[T] extends Serializable {
/** Returns the schema of encoding this type of object as a Row. */
def schema: StructType
/**
* A ClassTag that can be used to construct and Array to contain a collection of `T`.*/
def clsTag: ClassTag[T]
// 存了ClassTag的话,就能在运行时构建泛型的数组了。
}

TypeTag: 相当于scala以前的Manifest,用于存储泛型参数的实际类型。(泛型参数的实际类型运行时会被JVM擦除,有了TypeTag就能在运行时获得实际类型了)
ClassTag: 相当于scala以前的ClassManifest,功能大致同上,但存得少些,比如如果是泛型的泛型,参数是泛型数组List[T],TypeTag能全部存下,ClassTag就存一个List

1
2
3
4
5
6
classTag[List[Int]]
//scala.reflect.ClassTag[List[Int]] =↩
// ClassTag[class scala.collection.immutable.List]
typeTag[List[Int]]
//
// reflect.runtime.universe.TypeTag[List[Int]] = TypeTag[scala.List[Int]]

ExpressionEncoder

Encoder的内置唯一实现类。
jvm对象<=>内部行格式: 钨丝计划unsafeRow、expressions提取case class的变量名。
可以支持Tupple但不支持Option和null值的case class

它会生成变量名name和位置的绑定,以便钨丝计划的code gen使用unsafe row.
Tupple最多到5.

Serializer: raw object=>InternalRow, 用expression解析提取对象值;
Deserializer: InternalRow=>raw object,用expression构造对象。
因为unsafeRow是二进制存放在堆外,所以转换成row看做序列化。

Encoders

(注意比Encoder多一个s)
提供了很多静态工厂方法获得Encoder(实际上目前获得的都是ExpressionEncoder)
大致可以分为几类:

  1. java原始类型: Encoders.BOOLEAN
  2. scala原始类型: Encoders.scalaBoolean等.(多一个scala前缀)
  3. javaBean类型: bean[T](beanClass: Class[T])。但目前成员只支持List容器,不支持其他的容器。支持原始类型或嵌套javaBean。
  4. kryo序列化类型: kryo[T: ClassTag]
  5. java序列化类型: javaSerialization[T: ClassTag]
  6. Tuple类型: 从Tuple2到Tuple5.
  7. Product类型: 也就是case class.

其中前三种是直接调用ExpressionEncoder,第四第五种本质上是间接调用了ExpressionEncoder:

1
2
3
4
5
6
7
8
9
10
11
12
13
ExpressionEncoder[T](
schema = new StructType().add("value", BinaryType),
flat = true,
serializer = Seq(
EncodeUsingSerializer(
BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)),
deserializer =
DecodeUsingSerializer[T](
Cast(GetColumnByOrdinal(0, BinaryType), BinaryType),
classTag[T],
kryo = useKryo),
clsTag = classTag[T]
)

因此第四第五后两种序列化本质上是把整个对象看做一个二进制类型,不利于后续优化和减少反序列化。

原始类型还包括:

1
2
3
4
java的:
byte,short,int,long,float,double,java.math.BigDecimal,java.sql.Date,java.sql.Timestamp
scala的:
Array[Byte],byte,short,int,long,float,double

2019年的人们如何生成HTTPS证书

由于配置相关的教程总是有年限限制,过期就不能用了,本教程至少保证2019.4.14还可用。
环境: centos,nginx,chrome
备注: 可以避免chrome的NET::ERR_CERT_COMMON_NAME_INVALID错误。

摘要

从HTTP升级到HTTPS: 用openssl命令创建本地的CA,然后自签证书,然后配置到nginx中,最后信任一下本地CA即可。

需求

从HTTP协议升级到HTTPS。
用HTTPS可以防止会话内容被拦截解析,同时防止中间人攻击,防止他人伪装称你的网站,欺骗你的客户。
SSL协议的详细含义可以参见:
https://xiaoyue26.github.io/2018/09/26/2018-09/%E9%80%9A%E4%BF%97%E7%90%86%E8%A7%A3SSL-TLS%E5%8D%8F%E8%AE%AE%E5%8C%BA%E5%88%AB%E4%B8%8E%E5%8E%9F%E7%90%86/

现有架构

请求=>nginx服务器=>后端网站服务

改造思路/原理

原理上只需要让nginx负责SSL协议的部分即可,不需要动后端网站服务。
客户端发送HTTPS请求到nginx服务器,nginx服务器转发HTTP请求到后端网站服务。
(封装的思想,上层变动对底层HTTP服务透明)
HTTPS中断、TLS中断:
nginx直接负责搞定ssl部分,netty等后端服务只需要负责http部分就好了。
如果依赖nginx的话,netty的SslHandler什么的都可以废掉了XD

所以我们只需要关心架构中的前半部分:
请求=>nginx服务器

再分解一下这部分的话:
用户=>浏览器(chrome)==https请求=>nginx服务器

整个架构中我们需要修改的部分:

  1. nginx配置.

是的,就这么一项。所以改造成本很低。
当然了,如果不想花钱买官方CA证书的话,也就是自己弄一个CA, 然后给自己的网站颁发证书的话,还需要改动用户浏览器的信任CA,那么需要修改的部分就增加一项了:

  1. nginx配置;
  2. 用户浏览器信任CA。

这里的证书、CA是个啥概念呢?
证书: 就好比我们网站的身份证;
CA : 就好比派发身份证的派出所。
本质上是一个信任传递、担保的过程,用户浏览器会默认信任几个官方的CA,只要官方CA承认的网站,信任传递一下,用户就可以也信任了。
参见下图可以通过chrome右键”检查”的security面板查看证书的详细信息。

所以如果花钱让官方CA帮我们签发证书的话,用户可以直接默认信任我们的证书;
而如果我们自己弄的CA的话,好比自己开的黑作坊,用户不可能直接信任黑作坊签发的身份证的,就需要修改用户浏览器配置了,加入我们的私人CA证书。

公网HTTPS

生成数字证书

可以参考:
http://www.ruanyifeng.com/blog/2016/08/migrate-from-http-to-https.html

https://www.gogetssl.com/
https://www.ssls.com/
https://sslmate.com/
购买SSL证书。

免费的:
https://certbot.eff.org/
可以用这个工具,选择转发服务器和操作系统,生成证书:
https://certbot.eff.org/lets-encrypt

配置nginx

把原来nginx配置中的:

1
listen       80;

改成:

1
2
3
listen 443 ssl;
ssl_certificate /usr/local/nginx/ssl/private/server.crt;
ssl_certificate_key /usr/local/nginx/ssl/private/device.key;

这里的service.crt就是数字证书了。
如果要支持http和https同时可以访问,就把listen 80再加上。

如果要强制https,即使访问了http也强制跳转https(一般都需要这样搞),可以增加rewrite配置:

1
2
3
4
5
server {
listen 80;
server_name localhost;
rewrite ^(.*) https://$server_name$1 permanent;
}

局域网HTTPS

公网https起码要买个域名,买个服务器(阿里云),如果只是局域网玩玩、或者自签证书,可以如下操作:

  1. 本地生成一个CA;
  2. 用这个CA给自己网站的数字证书签名,生成网站数字证书;
  3. 修改nginx配置;
  4. 配置用户chrome信任第一步中的CA。

可以看出多了1,2两步来生成证书,代替购买证书;
多了第4步来强制用户信任非官方CA.

1. 本地生成CA

找个干净的目录开始操作:

1
2
openssl genrsa -des3 -out rootCA.key 2048
openssl req -x509 -new -nodes -key rootCA.key -sha256 -days 1024 -out rootCA.pem

里面可以填下密码,email地址和ca的名字。其他的可以留空。

第一条命令: 生成本地CA的密钥rootCA.key(要记住你设置的密码,比如我的是staythenight);
第二条命令: 用这个密钥进行签名,生成一张CA的证书rootCA.pem.
(这里设置的过期时间为1024天).

2. 生成网站数字证书(用这个CA给自己网站的数字证书签名)

为了避免chrome的NET::ERR_CERT_COMMON_NAME_INVALID错误,需要在网站证书里填一些额外的信息。
首先创建文件server.csr.cnf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[req]
default_bits = 2048
prompt = no
default_md = sha256
distinguished_name = dn

[dn]
C=US
ST=RandomState
L=RandomCity
O=RandomOrganization
OU=RandomOrganizationUnit
emailAddress=296671657@qq.com # 修改成自己的email
CN = kandiandata.oa.com # 修改成自己的域名

然后创建文件v3.ext:

1
2
3
4
5
6
7
8
authorityKeyIdentifier=keyid,issuer
basicConstraints=CA:FALSE
keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment
subjectAltName = @alt_names

[alt_names]
DNS.1 = kandiandata.oa.com # 修改成自己的域名
DNS.2 = localhost

创建证书:

1
2
3
4
5
6
openssl req -new -sha256 -nodes -out server.csr -newkey rsa:2048 -keyout device.key -config server.csr.cnf

openssl x509 -req -in server.csr \
-CA rootCA.pem \
-CAkey rootCA.key \
-CAcreateserial -out server.crt -days 1800 -sha256 -extfile v3.ext

第一条命令: 用server.csr.cnf配置生成网站证书server.csr,同时生成网站私钥device.key(给nginx用的)。
第二条命令: 用CA私钥rootCA.key以CA的名义(rootCA.pem)️给网站证书签名,生成CA签名后的证书server.crt,同时加上v3.ext中的配置(防止chrome报错)。

到这里我们就准备好了下一步nginx要用到的两个文件:

1
2
server.crt
device.key

server.crt: 网站的数字证书;
device.key: 网站的私钥,用来解开用户发过来的通信密码。详细原理参见:
http://xiaoyue26.github.io/2018/09/26/2018-09/%E9%80%9A%E4%BF%97%E7%90%86%E8%A7%A3SSL-TLS%E5%8D%8F%E8%AE%AE%E5%8C%BA%E5%88%AB%E4%B8%8E%E5%8E%9F%E7%90%86/

3. 配置nginx

这里和之前的一样,打开ssl支持,监听443:

1
2
3
listen 443 ssl;
ssl_certificate /usr/local/nginx/ssl/private/server.crt;
ssl_certificate_key /usr/local/nginx/ssl/private/device.key;

加上监听80+重定向:

1
2
3
4
5
server {
listen 80;
server_name localhost;
rewrite ^(.*) https://$server_name$1 permanent;
}

4. 配置用户chrome信任第一步中的CA

将第一步中的rootCA.pem发送给用户,让它安装即可。
(千万不要发错了。)
如果是mac系统,可以直接双击安装到钥匙串中:

在钥匙串中选择系统=>证书,然后完全信任ca的证书即可:

最后得到chrome的承认:

其他

还可以查看openssl支持的ssl/tls版本:

1
openssl ciphers -v | awk '{print $2}' | sort | uniq

查看本地的443端口是否支持tls1.2协商:

1
openssl s_client -connect localhost:443 -tls1_2

成功的话会返回一大段内容,包括:

1
2
3
4
5
6
7
8
# 前面一大堆
---
Certificate chain
0 s:/C=US/ST=RandomState/L=RandomCity/O=RandomOrganization/OU=RandomOrganizationUnit/emailAddress=296671657@qq.com/CN=kandiandata.oa.com
i:/C=XX/L=Default City/O=tencent/OU=kandian/CN=pipe_ca/emailAddress=296671657@qq.com
---
Server certificate
# 后面一大堆

失败的话:

1
2
3
4
5
---
no peer certificate available
---
No client certificate CA names sent
---

mysql统计信息更新

本文只关注innodb。

mysql优化器选择执行计划的时候需要依据一定的采样统计信息,不然对数据完全不了解的话,就无法选择成本低的执行计划了。

统计信息的配置有以下几个自由度:

  1. 是否持久化;
  2. 更新统计信息的时机;
  3. 采样多少个page。

是否持久化

采样统计信息可以有两种选择:

  1. 持久化: 默认是持久化,也就是存磁盘。
  2. 非持久化.

控制的选项:

1
show variables like '%innodb_stats_persistent%';

默认是on,也就是持久化。
具体存哪里呢,主要是存mysql库和information_schema库下(5.6.x):

1
2
3
4
INFORMATION_SCHEMA.TABLES
INFORMATION_SCHEMA.STATISTICS
mysql.innodb_table_stats
mysql.innodb_index_stats

更新统计信息的时机

相关参数:

1
2
innodb_stats_on_metadata: 是否每次都重新计算统计信息(配合非持久化使用),默认off;
innodb_stats_auto_recalc: 插入数据量超过原表10%的时候更新统计信息,默认on。

总结一下mysql更新统计信息的时机:

  1. 手动运行触发语句如analyze table xx的时候;
  2. 如果innodb_stats_auto_recalc为on: 插入数据量超过原表10%的时候更新统计信息;
  3. 如果innodb_stats_on_metadata为on: 每次查询schema.table表的是更新统计信息(一般不开启,性能太差)。

采样page数量

相关参数:

1
2
3
| innodb_stats_sample_pages            | 8     |
| innodb_stats_persistent_sample_pages | 20 |
| innodb_stats_transient_sample_pages | 8 |

innodb_stats_sample_pages废弃改成了innodb_stats_persistent_sample_pagesinnodb_stats_transient_sample_pages,灵活控制持久化和非持久化下的采样page数。

可以看出默认情况持久化采样20个page。

单表配置

上述所有都是全局配置,还可以为每个表单独3个参数:

1
2
3
STATS_PERSISTENT  : 1: 持久化统计信息;
STATS_AUTO_RECALC : 超过10%更新统计信息。
STATS_SAMPLE_PAGES: 采样页数。

可以看出为每个表设置的参数依然是这3个自由度: 是否持久化、更新统计信息时机、采样页数。