spark笔记

job: action分割
task: shuffle分割
application: 多个job

join\cogroup: 确保Rdd1.partitioner = Rdd2.partitioner = join.partitioner
换句话就是 rdd1的key和rdd2的key,还有join时用的key,三者是相等的。

语法概要

官方示例代码库:
http://spark.apache.org/examples.html
此外还有安装目录的examples目录。

生成rdd

1
2
3
4
5
6
// (1) 从数组:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
// (1)从文件:
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String]

RDD操作

RDD分为:

  1. 普通RDD;
  2. PairRDD。
    两者能用的函数大不相同,每次操作RDD前需要复核一下到底是普通RDD还是Pair RDD。

方法分为:

  1. transform: 变换结构;
  2. action: 真正有输出,有动作(特例是forEachPartition这种类似遍历,反函数式编程的)。

普通RDD-transform

1
2
3
4
5
6
// scala用箭头=>:
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
// python用lambda:
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

也可以传递函数:

1
2
3
4
5
// scala借用object单例:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)

其他函数汇总:(因为不是pair rdd,因此基本都无shuffle)

无shuffle,1对1

1
2
map(func): 
filter(func):无shuffle

无shuffle,1对多、多对多

1
2
3
flatMap(func)	:  每个item可以返回一个seq;
mapPartitions(func) : 输入迭代器,返回迭代器
mapPartitionsWithIndex(func): 输入迭代器,返回迭代器

集合运算

1
2
3
4
5
6
7
sample(withReplacement, fraction, seed)	
union(otherDataset)
intersection(otherDataset)
distinct([numTasks])): 如果是pairRDD,有shuffle,可以定义并行度。
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)

此外还有一个forEachPartition,返回值为空,是个遍历的action。
有shuffle的变换一般都可以定义[numTasks],也就是可以定义并行度。

闭包

闭包是指 executor 要在RDD上进行计算时必须对执行节点可见的那些变量和方法。闭包被序列化并被发送到每个 executor

闭包的变量副本(序列化后)发给每个 executor.

Pair RDD-transform

加上了很多shuffle操作的函数(算子)

1
2
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

其他函数汇总:

1
2
3
4
5
6
7
8
groupByKey([numTasks]): 返回(K, Iterable<V>),一般用reduceByKey代替这个算子
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks]): (K, V) 和 (K, W) => (K, (V, W))
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
partitionBy(partitioner): 一般比repartition靠谱,因为下一步能用到key,而不是随机划分

Action汇总:
可以注意到凡是最终需要输出、反函数式编程的(遍历)就是action,最后汇聚到driver单点处理。

1
2
3
4
5
6
7
8
9
10
11
reduce(func): 注意和map相反,reduce是action。数据最后聚合成单点。
collect()
count()
first()
take(n)
takeSample(withReplacement, num, [seed])
takeOrdered(n, [ordering])
saveAsTextFile(path)
saveAsSequenceFile(path)
countByKey() : 因为整合了reduce的功能
foreach(func) 以及foreachPartition等。

自定义UDAF的核心: combineByKey

大部分shuffle算子都是调用combineByKey实现的,可以说combineByKey就是shuffle的核心。
combineByKey定义:

1
2
3
4
5
6
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val pairs = sc.parallelize(List(("prova", 1), ("ciao", 2),
("prova", 2), ("ciao", 4),
("prova", 3), ("ciao", 6)))
// aggregateByKey:
pairs.aggregateByKey(List[Any]())(
(aggr, value) => aggr ::: (value :: Nil),
(aggr1, aggr2) => aggr1 ::: aggr2
).collect().toMap
// combineByKey:
pairs.combineByKey(
(value) => List(value),
(aggr: List[Any], value) => aggr ::: (value :: Nil),
(aggr1: List[Any], aggr2: List[Any]) => aggr1 ::: aggr2
).collect().toMap

combineByKey比aggregateByKey更加通用,区别是它的第一个参数创建初始聚合器都是函数,而aggregateByKey第一个参数是一个初始值。

缓存

1
2
cache()
persist([LEVEL])

在 shuffle 操作中(例如 reduceByKey),即便是用户没有调用 persist 方法,Spark 也会自动缓存部分中间数据.

广播变量

immutable
先用action从sc(driver)上广播出去,然后用.value访问。

1
2
3
4
5
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

累加器

数值型。
可变。

1
2
3
4
5
6
7
8
9
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

自定义累加器需要实现的3个方法:(类似于combineByKey

1
2
3
reset : 清零。防止重算。
add: 累加
merge: 合并累加器。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

private val myVector: MyVector = MyVector.createZeroVector

def reset(): Unit = {
myVector.reset()
}

def add(v: MyVector): Unit = {
myVector.add(v)
}
...
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1") // 注册一下

redis设计与实现笔记12-慢查询日志与监视器

第23章 慢查询日志

两个配置:

1
2
slowlog-log-slower-than: 超过多少微秒的命令记录到慢查询日志;
slowlog-max-len: 最多保存多少条慢查询日志。(删除最旧的,FIFO)

查看慢查询日志

1
2
3
4
5
6
7
8
slowlog get
1) 1) (integer) 4 # 日志id: uid
2) (integer) 1578781447 # 日志执行时间戳
3) (integer) 13 # 执行了多少微秒
4) 1) "SET" # 具体命令
2) "database"
3) "Redis"
...

慢查询日志存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
struct redisServer{
// 下一条慢查询日志的id:(类似于自增id)
long long slowlog_entry_id;
// 保存了所有慢查询日志的链表:
list *slowlog;
// 配置:
long long slowlog_log_slower_than;
unsigned long slowlog_max_len;
}

typedef struct slowlogEntry{
long long id;
time_t time;
long long duration;
robj **argv; // 命令和参数
int argc; // 命令和参数数量

}

第24章 监视器

Monitor命令:

1
redis> MONITOR

然后客户端可以监视服务器当前处理的命令请求。

监视器实现

1
2
3
4
5
struct redisServer{
// 监视器链表:
// 链表,保存了所有从服务器,以及所有监视器
list *slaves, *monitors; /* List of slaves and MONITORs */
}

这本书戛然而止

redis设计与实现笔记11-排序与二进制数组

排序命令SORT:

1
2
3
4
5
6
7
8
RPUSH numbers 5 3 1 4 2
SORT numbers
# 结果:
1
2
3
4
5

也可以在末尾加上alpha参数,指定按字母序排序。
还可以在末尾加上By参数,指定按某个集合定义的权重进行排序。

sort 命令的实现

实现sort的时候为被排序的key创建一个与排序目标长度相同的数组,数组中的节点的obj字段存放实际数据指针,score字段是double,存放每个对象的排序值。
实际排序对这个新创建的数组进行即可。

sort By *-price命令的实现

1
2
MSEST apple-price 8 banana-price 5.5 cherry-price 7
SORT fruits BY *-price

不同选项顺序:(执行顺序和编程时书写顺序无关)

  1. by *-price
  2. alpha

limit命令的实现

sort fruits limit <offset> <count>
实现上是先排序后,然后在跳转到offset上选count个。

STORE命令

可以用STORE命令保存SORT的结果:

1
Sort fruits STORE sorted_fruits

实现上,首先排序,然后:

  1. 检查要保存的键sorted_fruits是否存在,存在则删除;
  2. 创建sorted_fruits列表;
  3. 将辅助排序数组依次压入sorted_fruits.

第22章-二进制数组

单个数组操作:

1
2
3
SETBIT bit 0 1 # 第0位设置为1
GETBIT bit 3 # 获取第3位的值
BITCOUNT bit # 获取bit数组中1的个数

多个数组之间操作:

1
2
BITOP AND and-result x y z # x,y,z求与的结果放and-result
类似的操作还有OR,XOR,NOT,也就是与、或、非、异或、取反。

底层存储

用SDS字符串存储二进制数组。
对二进制数组的操作: 也借用SDS的字符串函数。

因为SDS是二进制安全的(也就是存储的数据中有\0也没关系,因为SDS中有存长度),所以可以用来存储二进制数组。

逆序存储

这里保存的二进制数组是0100 1101;
图中SDS保存的是逆序的:1011 0010,之所以逆序存储的原因为了简化SETBIT命令的实现。

offset、高位、低位

这里要强调一下二进制的offset、高位、低位的概念,方便理解为什么要逆序存储以及因为逆序存储所以扩展时不需要移动。
二进制数组是0100 1101
高位、低位: 0100是高位,1101是低位,(想象十进制数字9876的高位是9).
offset: offset为0的是1,offset为1的为0.

GETBIT 7是0
SETBIT 11 1其中第11位不存在,因此扩展后会变成1000 0100 1101

GETBIT实现O(1)

由于前面是逆序存储的二进制数组,因此可以从左边开始数offset了(原来是从右边往左数,负负得正)。
由于用SDS存储二进制,当我们想要获取第n位0,1值的时候,相当于需要获取两个坐标:

  1. 位于哪个byte: n/8
  2. 位于byte的第几位:n%8+1
    换句话说所有值的坐标计算公式为: (n/8,n%8+1)GETBIT <bitarray> 10的结果是:
    (10/8,10%8+1)也就是byte[1][3-1]。
    (从左到右第3位)。
    其实如果统一从0开始计数,公式可以简化为:
    (n/8,n%8)

GETBIT命令算法复杂度为O(1)。

SETBIT命令实现O(1)

命令格式:

1
SETBIT <bitarray> <offset> <value>

这个命令会做3件事:

  1. 设置新值
  2. 返回旧值;
  3. 如果offset超出原有数组长度,会拓展原数组,并且把新扩展空间的值设置为0.
    这里的扩展应当注意到SDS的空间预分配策略:
    1
    2
    3
    总长度len<1MB: 总空间为2*len+1;
    总长度len>=1MB: 总空间为len+1MB+1。
    换句话说,预分配的空间上限是1MB,尽量为len。

逆序存储与扩展不需要移动

由于扩展是扩展高位,而高位经过逆序存储后,放在了buf数组的末尾,因此扩展时就不需要移动原来的数据了。

BITCOUNT命令实现

有几种实现算法:
(1) 遍历:最慢;
(2) 查表:空间换时间,二进制数组的排列是有穷的,可以预先存下不同排列对应的1数量;
(3) SWAR算法: 计算汉明码问题,Hamming Weight。
如果cpu不支持直接进行汉明码计算,可以使用SWAR算法:

1
2
3
4
5
6
7
uint32_t swar(uint32_t i){
i = (i& 0x55555555)+((i>>1)&0x55555555);
i = (i& 0x33333333)+((i>>2)&0x33333333);
i = (i& 0x0F0F0F0F)+((i>>4)&0x0F0F0F0F);
i = (i*(0x01010101)>>24);
return i;
}

比遍历快32倍,比查8位的表快4倍,比查16位的表快2倍,无需额外内存。
由于SWAR算法缓存的值较少而且规整,是缓存友好的。
(4)redis的二进制位统计算法:
结合查8位的表和SWAR算法。
如果n< 128: 直接用查表;
如果n>=128: 使用SWAR,每次载入128位,调用4次SWAR。

BITOP OR\XOR\NOT命令实现

对于每个byte调用c函数操作。

redis设计与实现笔记10-订阅,事物,lua

发布订阅

(书上第18章)
客户端可以订阅某个频道,或者订阅符合某种模式的频道们。

订阅关系保存

1
2
3
4
5
6
7
8
9
struct redisServer{
// 保存所有频道的订阅关系:
dict* pubsub_channels;
// key: 频道名字(string)
// value: 订阅的客户端们(链表)

// 模式订阅: 订阅符合某种模式的频道
dict* pubsub_patterns;
}

(如果有个客户端疯狂乱订阅,服务器是不是内存就爆了?)

第19章-事务

相关命令:

1
2
3
4
5
MULTI: 类似于事务开始start transaction;
# 中间一堆正常redis set命令。
EXEC: 类似于commit.
WATCH: 乐观锁,在exec执行前监视一些key是否被修改。
如果被修改,则拒绝执行事务(类似于CAS)

事务执行阶段不会执行别的客户端的命令。(相当于独占了)

事务开始

MULTI: 客户端状态打开REDIS_MULTI标识。

命令入队

除了EXEC,DISCARD,WATCH,MULTI之外的命令放入队列中;
否则立即执行。

Watch功能实现

1
2
3
4
5
6
typedef struct redisDb{
// 正在被watch监视的key:
dict *watched_keys;
// key: 被监视的key
// value: 监视的客户端们(list)
}redisDb;

存在redisDb中,可见每个数据库都保存一个这个字典。

每个修改操作都要检查watched_keys字典,通知对应的客户端。(打脏标记REIDS_DIRTY_CAS

事务的ACID

A: 原子性 Atomicity
C: 一致性 Consistency
I: 隔离性 Isolation
D: 耐久性 Durability

A:原子性

  • mysql:
    要么一个都不执行成功,要么都全部执行成功。

redis这里略有修改,它只保证执行,不保证执行成功:
要么一个都不执行,要么都全部执行。
(redis只检查编译错误,如命令不存在,不检查运行时错误)
redis执行事务过程中出错的话,不会回滚已经执行的命令。
(开发者表示这算程序员自己的锅)

C: 一致性

单实例:肯定一致。
主从: 由raft保证;
cluster: 分slot以后,相当于单实例+主从,因此一致。

I: 隔离性

也就是让并发执行达到串行一致性。
由于redis本来就是单线程串行执行事务,因此天然不需要做额外的事就能达到隔离性。

D: 耐久性

redis三种模式:
无持久化存储: 无耐久
RDB: 不能完全保证;
AOF: appendfsyncalways时,达到事务耐久性。

第20章-Lua脚本

主要涉及两个命令EVALEVALSHA
redis服务器端2.6开始有lua环境,因此客户端可以:

  1. 执行lua脚本:
    1
    redis> EVAL "return 'hello world'" 0
    最后的0表示输入参数的个数是0个。参见:http://doc.redisfans.com/script/eval.html
  2. 通过SHA1校验和,执行对应的lua脚本:
    1
    redis> EVALSHA "a27e72..........."
    这个校验和需要服务器认识才行,服务器认识的方法:
    (1)服务器以前执行过对应的lua脚本;
    (2)客户端用SCRIPT LOAD命令告诉过服务器:SCRIPT LOAD "return 2*2"

如果在cluster模式:

lua脚本如果要使用redis数据库中的键,一定要通过参数传递进去,才能被分析出来,方便兼容新版本的集群功能。

redis的lua环境

为了保证lua脚本之间不会互相影响,redis服务器需要保证luz脚本无副作用,它做了一下措施:

  1. 修改随机数函数,消除副作用;
  2. 禁止lua脚本创建全局变量;

但是好像遗漏了lua脚本对于已有全局变量的修改:

1
math.randomseed(10086) --change seed

应该是把这块儿交给程序员自行保证。

lua脚本特有的排序辅助

此外,为了获得确定性一致的结果,redis对集合的输出结果做了排序。
例如调用SMEMBERS后的结果,会经过排序辅助函数进行排序。
保证同样的数据集的输出结果相同。

lua_scripts字典

1
2
3
4
5
6
struct redisServer{
// 整个服务器全局的lua校验和
dict *lua_scripts;
// key: checksum
// value: lua脚本代码
}

所有执行过或要求记住的lua校验和都会存下来。

EVAL命令实现

3个步骤:

  1. 计算校验和,然后用校验和定义一个函数f_校验和;
  2. <校验和,脚本>保存到lua_scripts字典;
  3. 执行函数。

比如校验和是a0e1ffff,函数名就是f_a0e1fffffffff。
然后利用函数的局部性,避免全局变量。

超时检测

参数lua-time-limit
lua脚本的运行时间是有上限的,避免编程错误的死循环。

redis设计与实现笔记9-cluster模式

cluster功能是3.0及以后才有的。需要开启cluster模式才能让redis-server以cluster模式启动,这种模式下只有一个数据库(0号数据库)。

启动集群模式的redis客户端:

1
2
# 加上-c参数: 
redis-cli -c

集群纳入新成员过程

  1. 节点A接到Cluster meet B命令,节点A和B进行握手;
  2. 节点A会将节点B的信息通过Gossip协议传播给集群中的其他节点;
  3. 最终节点B被集群完全认识、接受。

Gossip协议消息类型

  1. Meet;
  2. Ping;
  3. Pong。

slot: 槽指派

某个节点负责哪些slot:

1
127.0.0.1> cluster ADDslots <slot> [slot ...]

Cluster Nodes命令

用redis-cli客户端可以查看当前集群的节点情况、id,以及slot的分派情况:

1
Cluster Nodes

clusterState信息

clusterState信息中有一项是slots_to_keys跳表(类似一个有序hashmap),保存slot和key之间的关联。
key=>跳表中的key;
slot号码=>跳表中的score。

key查节点、节点查key都可以快速完成:

  1. 每个节点可以根据key,查到对应的slot(crc算法),然后可以查到对应存在哪个节点(存在跳表);
  2. 可以查自己节点负责的slot:
    1
    2
    3
    4
    5
    struct clusterNode{
    // 总共16384(也就是2^14)位,每一位的1,0代表是否负责
    unsigned char slots[16384/8];
    int numslots;// 该节点总共负责多少个slot(1的数量)
    }

clusterState中存储了所有slots的指派情况:

1
2
3
4
typedef struct clusterState{
// ...
clusterNode* slots[16384];
}clusterState;

如果集群中所有的slot都有人负责,cluster进入上线状态。

Moved错误

客户端访问了错误的节点,需要的key在别的节点负责的slot里头:

1
MOVED <slot> <ip>:<port>

cluster模式的redis客户端能自动处理MOVED错误。

ASK错误

重新指派

集群可以将slot重新分派给另一个节点。
也就是: 源节点=迁移到=>目标节点。
其中节点id可以通过cluster nodes获得。
通过redis-trib工具:
1.在target_id节点上发送: (准备好导入)

1
Cluster setslot <slot> Importing <source_id>

2.向source_id发送:(准备好导出)

1
Cluster SetSlot <slot> Migrating <target_id>

3.向source_id发送:

1
Cluster GETkeysInSlot <slot> <count>

获得最多count个属于slot的key;
4.对于第3步中的每个key,redis-trib向source_id发送一个命令:

1
Migrate <target_ip> <target_port> <key_name> 0 <timeout>

实际进行迁移每一个key。
5.完成迁移,发布:

1
Cluster SetSlot <slot> Node <target_id>

任意节点收到后传播给整个集群。

ASK错误

迁移过程中,源节点遇到缺少的key会向客户端返回ASK错误:

1
ASK 16198 127.0.0.1:7003

表示这个key正在将16198号slot迁移到127.0.0.1:7003

cluster模式的客户端获得ASK错误后,带着ASK标记去访问目标节点,才能获得数据。

故障恢复

节点状态:
在线==>疑似下线==>下线
(类似于之前sentinel模式中的主观下线客观下线)。

cluster模式的各个主节点之间都有连接,相当于一张完全图了。
这个网络内,它们每隔一段时间就互相ping,看看对方是否活着(返回pong)。

疑似下线: 不返回pong,标记为疑似下线
下线 : 节点之间交流看法(集群状态),超过半数认为疑似下线则认为确实是下线,开始故障恢复。

如果离线的节点有从节点,则可以开始选举了。
从节点们发现主节点下线了,就开始向集群存活的主节点们请求投票,获得超过半数的票则当选。

Publish命令实现

订阅频道: subscribe
发布消息到频道:publish

客户端发布消息的流程:

  1. 客户端=>某个节点: publish命令: 发布xx消息到xxx频道;
  2. 该节点=>其他节点: publish的具体消息。

第2步中为什么不是直接转发命令到其他节点,而是转发消息呢?
这个主要是设计理念上,希望节点之间不是命令交互,而是消息交互。

nginx笔记-鉴权及转发配置

学习资料:
http://blog.jobbole.com/tag/nginx/
https://segmentfault.com/a/1190000013267839#articleHeader0

鉴权模块的官方文档:
http://nginx.org/en/docs/http/ngx_http_auth_request_module.html

易混淆点

URL尾部的/区别
url分为location配置中的url和实际用户访问的url

1. location中的url

无区别。末尾是否有/,含义一样。

2. 实际用户访问的url

也就是浏览器地址栏中的。

首先实际访问url的话:
(1)末尾有/: 表示目录,如localhost/dir/,服务器就会匹配目录下的默认文件(比如index.html);
(2)末尾无/: 表示文件,如localhost/file.

特殊情况:
根目录。
直接访问域名,如访问http://www.xxx.com
,这个时候浏览器知道用户访问的肯定不是文件,而且服务器一般会配置location /这个配置项,所以访问根目录有没有/都一样。
也就是以下两种访问url等效:

  1. 访问http://www.xxx.com
  2. 访问http://www.xxx.com/
    浏览器请求的时候自动给第1种加上/变成第2种。

nginx配置的逻辑

  • 特点:
  1. 声明式
    nginx的配置文件是声明式的,因此不能用过程式语言来理解它。
    换句话说:
    并不是写在前面的就先执行.

  2. 第三方模块
    nginx有核心模块和第三方模块(插件)。
    不同模块的配置可能写在一块儿,但执行顺序可能无关联,甚至互相影响。

举例一些模块的功能:

1
2
3
4
5
6
--with-http_dav_module: http文件管理;
--with-http_flv_module: flv流媒体支持;
--with-mail: 邮件支持;
--with-mail_ssl_module: 邮件加密;
--with-debug: debug日志支持;
--with-http_auth_request_module: 鉴权转发支持。

一种可能的鉴权转发配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#user  nobody;
worker_processes 3;

error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;

# pid logs/nginx.pid; # 这个必须停服改


events {
worker_connections 2048;
}


http {
include mime.types;
default_type application/octet-stream;

log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';

access_log logs/access.log main;

sendfile on;
#tcp_nopush on;

#keepalive_timeout 0;
keepalive_timeout 65;

#gzip on;
gzip on;
gzip_min_length 1k;
gzip_buffers 4 16k;
gzip_http_version 1.0;
gzip_comp_level 6;
gzip_types text/plain text/css text/javascript application/json application/javascript application/x-javascript application/xml;
gzip_vary on;

# http_proxy 设置
client_max_body_size 10m;
client_body_buffer_size 128k;
proxy_connect_timeout 75;
proxy_send_timeout 75;
proxy_read_timeout 75;
proxy_buffer_size 4k;
proxy_buffers 4 32k;
proxy_busy_buffers_size 64k;
proxy_temp_file_write_size 64k;
proxy_temp_path /usr/local/nginx/proxy_temp 1 2;

upstream backend_hexo {
server localhost:8081 max_fails=2 fail_timeout=30s ;
}

server {
listen 80;
server_name localhost;

#charset koi8-r;

access_log logs/host.access.log main;

location / {
auth_request /auth;
add_header Cache-Control no-store;
proxy_set_header Host $host;

proxy_pass http://backend_hexo;
proxy_redirect off;
# 后端的Web服务器可以通过X-Forwarded-For获取用户真实IP
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_next_upstream error timeout invalid_header http_500 http_502 http_503 http_504;
}

location = /auth {
proxy_pass http://auth_server:8080/api/hexo_permission;
proxy_pass_request_body off;
proxy_set_header Content-Length "";
proxy_set_header X-Original-URI $request_uri;
}


#error_page 404 /404.html;

# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}

nginx笔记-编译安装

参考资料:
https://www.jianshu.com/p/5eab0f83e3b4

架构

master+worker架构。

master: 管理nginx.conf,同步到worker;
worker: 单线程绑定cpu,实际处理/转发请求;

master咋同步配置到worker呢?
直接用新conf起新worker,旧worker处理完手头的活就kill掉。

常用命令

1
2
3
4
5
6
7
8
# 启动:
nginx -c nginx配置文件地址
# 重新载入配置:
nginx -s reload
# 检查配置(或查看配置地址):
nginx -t
# 停止:
nginx -s stop

比如如果想知道当前nginx的配置文件在哪里,可以运行nginx -t,就能看到了。

编译安装

下载解压缩:
http://nginx.org/en/download.html
配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
export KERNEL_BITS=64
./configure --user=mengqifeng \
--group=staff \
--prefix=/usr/local/nginx \
--pid-path=/var/run/nginx.pid \
--lock-path=/var/lock/nginx.lock \
--with-http_ssl_module \
--with-http_dav_module \
--with-http_flv_module \
--with-http_realip_module \
--with-http_gzip_static_module \
--with-http_stub_status_module \
--with-mail --with-mail_ssl_module \
--with-debug \
--with-http_auth_request_module \
--http-client-body-temp-path=/var/tmp/nginx/client \
--http-proxy-temp-path=/var/tmp/nginx/proxy \
--http-fastcgi-temp-path=/var/tmp/nginx/fastcgi \
--http-uwsgi-temp-path=/var/tmp/nginx/uwsgi \
--http-scgi-temp-path=/var/tmp/nginx/scgi \
--with-pcre=/Users/mengqifeng/Public/build_home/pcre-8.42 \
--with-openssl=/Users/mengqifeng/Public/build_home/openssl \
--with-zlib=/Users/mengqifeng/Public/build_home/zlib-1.2.11

其中最后3行要看情况,先不加。
报错以后下载pcre和openssl,加上参数提供给nginx.
编译

1
make

安装

1
sudo make install

最后设置一下环境变量: (/etc/profile.d/nginx.sh)

1
2
export NGINX_HOME=/usr/local/nginx
export PATH=$PATH:$NGINX_HOME/sbin

redis设计与实现笔记8-sentinel哨兵模式

使用场景:

用几个节点开启sentinel组成一个哨兵集群,负责监控另外一些redis的master/slave集群的健康状态,协助进行故障恢复(master挂了的时候,升级某个slave为新master)。

思考:

如果有1个master,10个slave,数据均已经完全同步。
这个时候,连续挂两次master,是不是所有数据就都没了?
第1次: 除新master以外的节点,执行新slaveof命令,清空数据准备同步新master;
第2次: 新master挂了,其他节点是空的。

答案:

不会这么脆弱。
两个机制提升了这个过程的可靠性:
(1)确认接受到完整rdb后,从库才清空旧数据库;
(2)确认所有从库完成同步后,才更新master的地址和端口,完成故障恢复流程。(master/slave换代操作会在故障恢复完全完成后进行。)

第1次: 除候选master以外的节点,执行slaveof命令,但清空数据会在确认接收到完整rdb文件后进行。(详见代码https://github.com/huangz1990/redis-3.0-annotated/blob/unstable/src/replication.c的1036行。)
第2次:候选master挂了,重新推举候选master,换代没有完全完成,则不会更新master字段,因此其他slave都还在候选集中。

相关参数:

1
2
# 执行故障转移操作时,可以同时对新master进行同步的从库数量:
SENTINEL parallel-syncs <master-name> <number>

哨兵模式:Sentinel

redis的高可用模式,一主多从+sentinel集群进行监控和故障恢复,主挂的时间达到设置,则选取一个从库升级为主库。

sentinel启动命令

一个节点用redis代码可以用3种身份(模式)启动:

  1. master: 负责写命令;
  2. slave: 负责同步、从库,可以执行读命令;
  3. sentinel:负责监控上述两者,进行故障恢复。
    使用哨兵模式:
    1
    2
    3
    redis-sentinel sentinel.conf
    # 或:
    redis-server sentinel.conf --sentinel
    之后发生的事情:
  4. 初始化服务器:不载入rdb,aof;(因为不需要负责实际数据)
  5. redis服务器切换成Sentinel专用代码;(默认端口26379,只载入部分命令表,客户端只能执行7个命令:
    1
    2
    3
    4
    5
    6
    7
    PING
    SENTINEL
    INFO
    Subscribe
    unSubscribe
    PSubscribe
    PUnSubscribe
  6. 初始化sentinel状态;
  7. 根据配置文件,初始化Sentinel的主库列表;
  8. 创建与主库的网络连接。

sentinel相关的网络连接图

引入sentinel后的redis主从架构网络连接较多:

  1. sentinel节点与master: 命令连接+订阅连接;
  2. sentinel节点与slave: 命令连接+订阅连接;
  3. sentinel节点与sentinel: 命令连接。
    相关的连接图如下:

master地址与端口需要配置
sentinel需要订阅master的心跳,同时在需要的时候向master发送命令,因此需要两种连接:订阅连接+命令连接。

slave地址与端口不需要配置
sentinel通过master获取到slave的地址与端口,因此不需要给sentinel配置slavel信息了。
sentinel需要订阅slave的心跳,同时在需要的时候向slave发送命令,因此需要两种连接:订阅连接+命令连接。(同master类似)

其他sentinel的地址与端口: 不需要配置
sentinel通过master获取到其他sentinel的地址与端口,因此不需要给sentinel配置信息了。sentinel订阅频道的信息里有连接到同一个master的sentinel信息。

由于心跳消息由master帮sentinel完成了,不需要再订阅其他sentinel的心跳了。
每两个sentinel之间都有双向的命令连接(完全图),方便互相发送命令。(客观下线、主观下线、选举leader等命令)

一份可能的sentinel配置文件

1
2
3
4
5
6
7
8
9
10
## master1 conf:
sentinel monitor master1 127.0.0.1 6379 2 # 需要2票(quorum)才能客观下线
sentinel down-after-milliseconds master1 30000 # 30秒才算主观下线(包括master/slave和其他sentinel)
sentinel parallel-syncs master1 1 # 同时可以有1个从库进行同步
sentinel failover-timeout master1 90000 # 刷新故障迁移状态的最大时限
## master2 conf:
sentinel monitor master2 127.0.0.1 12345 5 # 需要5票才能客观下线
sentinel down-after-milliseconds master2 50000
sentinel parallel-syncs master2 5
sentinel failover-timeout master2 450000

初始化sentinel状态

1
2
3
4
5
6
struct sentinelState{
// 当前纪元
uint64_t current_epoch;
// 保存所有被这个sentinel监视的master:
dict *master;// <master_name,sentinelRedisInstance>
}sentinel;

其中master值sentinelRedisInstance的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct sentinelRedisInstance{
// 实例类型、状态:
int flags;
char * name; // "127.0.0.1:26379"
char * runid;
uint64_t config_epoch;
// 实例的地址:
sentinelAddr *addr;// ip,port
// 无响应多少毫秒后判断为主观下线:
mstime_t down_after_period;
// 判断客观下线所需的支持票数:
int quorum;
// 故障转移时,可以同时对新主服务器进行同步的从服务器数量:
int parallel_syncs;
// 刷新故障迁移状态的最大时限:
mstime_t failover_timeout;
}sentinelRedisInstance;

创建网络连接

sentinel向每个监视的master创建两个连接:

  1. 命令连接: 用于向master发送、接受命令;
  2. 订阅连接: 订阅master的sentinel:hello消息。

命令连接与订阅连接

连向master/slave的命令连接: 4种命令:

  1. 每10秒一次的INFO命令:获取master和slave的最新配置信息;
  2. 每2秒一次的订阅命令: 获取sentinel:hello频道信息,得到其他sentinel的信息。
  3. 每1秒一次的PING命令:
    获取master/slave/sentinel的心跳信息。
  4. 故障恢复的时候的slaveof命令。

订阅连接:

master/slave向所有sentinel发送它们订阅的sentinel:hello频道信息。

sentinel之间的命令连接

用于检查客观下线、选举leader、故障恢复。
故障恢复流程:

  1. 主观下线: 某个sentinel用ping命令检查master: 超过down-after-milliseconds配置没有回复,该sentinel主观地认为:这个master挂了——它把这个master标记为主观下线状态;
  2. 客观下线: 这个sentinel通过is-master-down-by-addr命令询问其他sentinel的意见。超过quorum数量sentinel同意,则进入客观下线状态;
  3. 选举leader: leader负责接下来的故障恢复。每次选举结束后(无论成败),epoch纪元都会+1。进入客观下线分支的sentinel会要求其他人选自己,同时它会投第一个向自己要求选票的sentinel一票。所有sentinel会回复其他sentinel自己的选择,因此大家都能确定有谁的票数过半,或者都没有过半,也就是leader选举的成败是确定可知的。(奇数个sentinel的raft算法)
  4. leader选取新候选master:
    (1)下线原master;(但master结构中依然记录旧地址、端口,不急着更新)
    (2)断开候选者slaveof;
    (3)其他slave执行slaveof候选者;(同步并行度由参数决定)
    (4)当其他slave完成同步,正式任命候选者为master,更新信息到内存。见代码https://github.com/huangz1990/redis-3.0-annotated/blob/unstable/src/sentinel.c中`sentinelHandleDictOfRedisInstances`函数和`sentinelFailoverSwitchToPromotedSlave`函数。
    这个过程中如果候选者挂了,会重新选一个候选者。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;

/* There are a number of things we need to perform against every master. */
// 遍历多个实例,这些实例可以是多个主服务器、多个从服务器或者多个 sentinel
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {

// 取出实例对应的实例结构
sentinelRedisInstance *ri = dictGetVal(de);

// 执行调度操作
sentinelHandleRedisInstance(ri);

// 如果被遍历的是主服务器,那么递归地遍历该主服务器的所有从服务器
// 以及所有 sentinel
if (ri->flags & SRI_MASTER) {

// 所有从服务器
sentinelHandleDictOfRedisInstances(ri->slaves);

// 所有 sentinel
sentinelHandleDictOfRedisInstances(ri->sentinels);

// 对已下线主服务器(ri)的故障迁移已经完成
// ri 的所有从服务器都已经同步到新主服务器
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
// 已选出新的主服务器
switch_to_promoted = ri;
}
}
}

// 将原主服务器(已下线)从主服务器表格中移除,并使用新主服务器代替它
if (switch_to_promoted)
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);

dictReleaseIterator(di);
}

选取候选者的大致逻辑:

  1. 删除网络条件差的;
  2. 考虑因素的顺序:优先级、复制偏移量、运行ID小的。
    详见https://github.com/huangz1990/redis-3.0-annotated/blob/unstable/src/sentinel.c代码中的`sentinelSelectSlave`函数。

redis设计与实现笔记7-复制

第15章 复制

redis服务器B执行slave of命令后可变成另一台服务器的从库:

1
2
127.0.0.1:12345> SLAVEOF 127.0.0.1 6379
# 12345端口就变成6379的从库了。

旧版复制功能的实现

  1. 同步(sync): 更新从库状态;
  2. 传播(propagate): 持续维持一致性。

同步

主库发RDB文件给从库,从库载入一下。

旧版缺点:
每次断线后,重新同步,也就是重新生成RDB文件,重新全量载入。

新版复制功能的实现(2.8+版本)

引入PSYNC命令,它有两种模式:

  1. 完整重同步: 初次复制;
  2. 部分重同步:断线重连。

部分重同步:

  1. 主从库的复制偏移量;(replication offset)
  2. 主库的复制积压缓冲区;(replication backlog):默认1MB,FIFO队列。
  3. 服务器的运行id(run ID)。

如果重连时,需要的数据还在缓冲区,就部分同步;
如果重连时,需要的数据已经被删除,就完全同步。

所以缓冲区设置稍微大一些最好。

复制流程

  1. 从库:记录主库地址端口等信息到内存;
  2. 建立套接字连接;
  3. 从库: PING 主库: PONG,确认连接健康;
  4. 身份验证:(是否需要认证,密码),两个维度都需要相同才能继续;
  5. 从库=>主库: 请用xxx端口联系从库;
  6. 同步: 主库从库互为客户端:
    完全同步: 主库=>从库: 保存在缓冲区的写命令;
    部分同步: 主库=>从库: 保存在复制积压缓冲区的写命令。
  7. 命令传播: 主库=>从库: 新的写命令。

心跳检测

从库=>主库:

1
REPLCONF ACK <replication_offset> # 从库当前复制偏移量

三个作用:

  1. 主库确定各个从库的健康状态;
  2. 检测命令丢失:主库检测从库有没有漏的复制,漏则重发;
  3. min_slave_to_write参数:如果配置了这个,主库可以在从库太少的时候拒绝写命令。

slaveof命令源码细节

从库确认收到主库的完整rdb文件后,才清空旧数据库。
(而不是说不分青红皂白上来就把自己清空了,那就太傻了。)
相关代码(https://github.com/huangz1990/redis-3.0-annotated/blob/unstable/src/replication.c):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Check if the transfer is now complete */
// 检查 RDB 是否已经传送完毕
// 1036行:
if (server.repl_transfer_read == server.repl_transfer_size) {

// 完毕,将临时文件改名为 dump.rdb
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
replicationAbortSyncTransfer();
return;
}

// 先清空旧数据库
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
signalFlushedDb(-1);
emptyDb(replicationEmptyDbCallback);

redis设计与实现笔记6-客户端与服务器

客户端

服务端保存的客户端状态:(redisClient)

  1. 套接字;
  2. 客户端名字
  3. 标志值flag;
  4. 正在使用的数据库指针\号码;
  5. 客户端当前要执行的命令、参数..;
  6. 客户端输入、输出缓冲区;
  7. 复制状态信息;
  8. 事务状态;
  9. 身份验证标志:0未通过;1:已经通过身份验证;
  10. 创建时间、最后一次通信时间;
  11. 其他。
1
2
3
4
struct redisServer{
// 所有客户端状态的链表
list *clients;
}

相关命令

列出客户端:

1
2
3
127.0.0.1:6379> client list
id=489 addr=127.0.0.1:57480 fd=5 name= age=6151 idle=582 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=get
id=490 addr=127.0.0.1:38584 fd=6 name= age=15 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client

套接字fd

fd=-1: 伪客户端:加载AOF或执行Lua脚本;
fd>-1: 普通客户端。

名字(name)

默认都没有名字,可以使用client setname命令设置。

客户端关闭

关闭的原因:

  1. 客户端进程退出、被杀死(Client kill),网络连接断开;
  2. 客户端发送了不符合协议格式的命令,被关闭;
  3. 空转时间超过timeout配置;
  4. 发送命令大小超过输入缓冲区限制;
  5. 回复命令大小超过输出缓冲区限制。

输出缓冲区需满足的两个限制:

  1. 硬性限制:超出硬性限制大小,立即关闭;
  2. 软性限制:最多可以超出软性限制持续时长xxx秒。

示例配置:

1
2
3
4
5
6
# 普通客户端硬性限制和软性限制都不限制:
client-output-buffer-limit normal 0 0 0
# 从服务器客户端硬性限制为256MB,软性限制为64MB、60秒:
client-output-buffer-limit slave 256mb 64mb 60
# 执行发布与订阅功能的客户端:硬性限制32mb,软性限制8mb 60秒:
client-output-buffer-limit 32mb 8mb 60

服务端

命令的执行流程

  1. 用户=>客户端: 输入命令;
  2. 客户端=>服务端: 命令按通信协议传输到服务器输入缓冲区;
  3. 服务端: 等待可读事件发生后,读输入缓冲区,解析命令;
  4. 服务端:执行命令,将回复写入输出缓冲区;
  5. 服务端=>客户端:等待可写事件发生后,从输出缓冲区传输给客户端。
  6. 客户端=>用户: 回显命令结果。