Netty实战第三章-外部接口笔记

外部接口

Channel,EventLoop,ChannelFuture

Channel接口

封装Socket类,预定义的实现类包括:

1
2
3
4
5
6
EmbeddedChannel
LocalServerChannel
NioDatagramChannel
NioSctpChannel
NioSocketChannel
...

EventLoop

EventLoop: 只绑定一个专有线程。
Channel: 只注册于一个EventLoop

EventLoop与线程: 1对1;
EventLoopChannel: 1对多;
因此:线程与Channel:1对多。同一个Channel的IO操作一定由同一个线程完成。

ChannelFuture接口

用于查询Channel的操作结果。

ChannelHandler和ChannelPipeline

ChannelHandler接口

用来处理事件的方法。
子接口:
ChannelInboundHandler:入站事件接口
ChannelOutboundHandler:出站事件接口

ChannelPipeline接口

ChannelHandler接口的容器,就像一个List<ChannelHandler>
是一个处理链。
在链上定义了入站和出站事件流。

ChannelHandler安装过程:

1
2
3
4
1. 一个ChannelInitializer的实现注册到Bootstrap;
2. 调用initChannel,在pipeline中安装一组自定义的ChannelHandler;
(2.1)ChannelHandler被安装时,分配一个ChannelHandlerContext给它,保存它与pipeline之间的绑定。
3. ChannelInitializer将自己从pipeline中移除。

ChannelHandlerContext

可以用于获取底层的Channel
两种写消息方式:

  1. 写到ChannelHandlerContext中: 传递给pipeline中下一个handler;
  2. 直接写channel:跳过后续的handler,直接到达尾端。

适配器

实际编写ChannelHandler的时候,一般不会直接用ChannelInboundHandler或者ChannelOutboundHandler接口,而会直接用预定义的实现类,然后进行扩展,也就是直接继承extends下列适配器类:

1
2
3
4
ChannelHandlerAdapter
ChannelInboundHandlerAdapter
ChannelOutboundHandlerAdapter
ChannelDuplexHandler

好处是可以只重写自己关心的事件处理,其他的用默认的。

ChannelHandler的子类型: 编码器/解码器

编码解码: 入站出站时候字节和对象的数据转换。

抽象类SimpleChannelInboundHandler

1
SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter

要注意处理的数据类型。
由于上游可能有其他解码器,导致数据类型发生改变。

引用类型

强引用

普通对象,GCroot能引用到就不回收。

软引用

用于内存敏感的高速缓存。
内存不足时会被回收。
实际应用中使用ReferenceQueue,回收后引用进入ReferenceQueue

弱引用

也用于缓存,但更容易被回收。
内存充足时也会被回收。
实际应用中使用ReferenceQueue,回收后引用进入ReferenceQueue

案例: ThreadLocal的key

ThreadLocalkey是弱引用,被回收后,keynull,此时value无法被访问,但是有强引用。
但是每次发生get,set,remove的时候,会移除keynullentry,消除内存泄露。

虚引用

完全不影响回收,相当于没引用。
必须和引用队列 (ReferenceQueue)联合使用,回收后引用进入ReferenceQueue

案例:堆外内存管理

堆外内存DirectByteBuffer

堆外内存: 不进行GC,防止JNI访问错位。

使用方式

使用堆外内存的两种方式:
1.隐式:比如读写文件时:

读文件: 文件(disk)=>堆外内存=>堆内内存;(过程中JVM保证不GC)
写文件: 堆内内存=>堆外内存=>文件(disk)。

2.显式:使用DirectByteBuffer,直接在堆外分配空间,节省1倍空间,减去一倍拷贝操作。

1
2
// 底层源码:
unsafe.allocateMemory(size)

Unsafe

java直接管理内存用的类,之所以叫Unsafe,意思是这些原来设计是给sun公司的人专用的,不是开放给外面的人用的,希望普通用户不要依赖它的接口,随时随着jdk版本升级而更改接口。// 如果不升级jdk,就不用care了。
并不是说这个类不安全。

使用场景

(因为不gc)
生命期较长的大对象;
创建次数不会太多。
// 如直接的文件拷贝操作。

优点

1.对于大内存有良好的伸缩性
2.对垃圾回收停顿的改善可以明显感觉到
3.在进程间可以共享,减少虚拟机间的复制

配置

1
2
3
-XX:MaxDirectMemorySize
-Dsun.nio.MaxDirectMemorySize
directMemory = Runtime.getRuntime().maxMemory()

堆外内存回收

三种方法:
1.达到限制触发自动回收;(system.gc()) // 可能被配置-XX:+DisableExplicitGC关闭。

2.手动调用UnsafefreeMemory接口。

3.使用DirectByteBuffer,它在初始化的时候会创建Cleaner这个Cleaner对象会在合适的时候执行unsafe.freeMemory(address),从而回收这块堆外内存。

4.手动调用((DirectBuffer)bb).cleaner().clean();.
// 内部还是调用System.gc(),所以一定不要-XX:+DisableExplicitGC

相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static class Deallocator implements Runnable  {
private static Unsafe unsafe = Unsafe.getUnsafe();
private long address;
private long size;
private int capacity;
private Deallocator(long address, long size, int capacity) {
assert (address != 0);
this.address = address;
this.size = size;
this.capacity = capacity;
}

public void run() {
if (address == 0) {
// Paranoia
return;
}
unsafe.freeMemory(address);
address = 0;
Bits.unreserveMemory(size, capacity);
}
}

Forkjoin笔记

ForkJoin框架

forkjoin类似于一个单机版的mapreduce,只是把多节点多进程换成了多线程。

分治法(dfs): 把大任务划分成多个子任务,然后单线程执行、合并结果;
mapreduce: 把大任务划分成多个子任务,然后多节点多进程执行、合并结果;
forkjoin: 把大任务划分成多个子任务,然后多线程执行、合并结果。

优化:
mapreduce: 可以通过配置开启预测执行,如果有任务算得慢,会启动新的attempt,取算的快的结果,kill跑得慢的attempt;
forjoin: 通过双端队列存储每个线程的任务,如果有线程结束得慢,空闲的线程会进行工作窃取,从双端队列的尾部拿任务执行(但是不会重复计算同一个任务,这一点与MR不同)。

示例使用代码

范围求和:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class CountTask extends RecursiveTask<Integer> {
private final static int THREDSHOLD = 2;
private final int start;
private final int end;

public CountTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THREDSHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 任务太大, 分治
int mid = start + (end - start) / 2;
CountTask left = new CountTask(start, mid);
CountTask right = new CountTask(mid + 1, end);
left.fork();
right.fork();
int leftRes = left.join();
int rightRes = right.join();
sum = leftRes + rightRes;
}
return sum;
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();// 可以传入并行度参数, 不传则默认从RunningTime取核数作为并行度
CountTask task = new CountTask(1, 10);// RecursiveTask=> ForkJoinTask
Future<Integer> res = forkJoinPool.submit(task);// ForkJoinTask
System.out.print(res.get());


}
}

相关类

可以看到代码里主要是继承RecursiveTask定义一个计算任务类,定义分治和合并计算结果的操作,然后交给ForkJoinPool进行计算即可。(类似于归并排序)

实际上forkjoin框架中涉及到的类大致如下:

1
2
3
4
5
6
7
8
9
// 外部接口:
// 1. 需要返回值:
RecursiveTask => ForkJoinTask implements Future<V>, Serializable

// 2. 不需要返回值:
RecursiveAction => ForkJoinTask implements Future<V>, Serializable

// 3. 管理线程池和工作任务:
ForkJoinPool => extends AbstractExecutorService => implements ExecutorService

ForkJoinPool中使用的双端队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 内部组件 
@sun.misc.Contended
static final class WorkQueue {
// 核心部分:
volatile int scanState; // versioned, <0: inactive; odd:scanning
int stackPred; // pool stack (ctl) predecessor
volatile int qlock; // 1: locked, < 0: terminate; else 0
volatile int base; // 队尾,poll用,会被窃取线程更改。
int top; // 队首,push用,只会被当前线程更改,
// 因此没有volatile
ForkJoinTask<?>[] array; // 双端队列
volatile Thread parker; // == owner during call to park; else null
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
// 其他成员省略:
...
...
}

Future/FutureTask笔记

Future和Callable

Future是个接口:

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

可见它的功能主要包括:

  1. 下取消指令;
  2. 查询状态: 取消或者完成;
  3. 获取结果。// 这一点和callable一样。

回顾callable的源码:

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

可见两者区别主要在于Future并不定义任务详情,多了任务执行管理查询接口。
没有callable的call函数。

实际使用的时候它们一般的分工如下:

1
Future<String> future = executorService.submit(new MyCallable());

FutureTask

FutureTask是一个实际的类.

外部使用/接口:

1
2
3
4
5
6
7
Callable<Integer> callable = () -> new Random().nextInt(100);
FutureTask<Integer> future = new FutureTask<>(callable);
// 方法1: 可以由当前线程执行:
// future.run();
// 方法2: 可以由别的线程来执行:
new Thread(future).start();
System.out.println(future.get());

可以看出FutureTask可以由Callable构造,然后用于执行、异步获取结果。

内部组件:

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FutureTask<V> implements RunnableFuture<V> 
private volatile int state;// 可能的取值如下:
private static final int NEW = 0; // 初始状态
private static final int COMPLETING = 1; // 计算中
private static final int NORMAL = 2; // 正常结束 (终结状态)
private static final int EXCEPTIONAL = 3; // 异常结束 (终结状态)
private static final int CANCELLED = 4; // 已取消 (终结状态)
private static final int INTERRUPTING = 5; // 打断中
private static final int INTERRUPTED = 6; // 已打断 (终结状态)

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
// 省略很多方法: ...

其中状态可能的流转如下:(对应四个终结状态的生成)

1
2
3
4
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

// TODO 为啥异常状态不用负数标示呢?

其中RunnableFuture接口是这样的:

1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

可见FutureTaskRunnableFuture的结合, 拥有Future的查询结果特性和Runnable的定义计算任务的特性。

然而它的内部实现实际上是用了Callable而不是Runnable:

1
2
3
4
5
6
7
8
9
10
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

可以看到它只有两个构造函数,即使传入的Runnable也会被装配成Callable

RunnableFuture/FutureTask与AbstractExecutorService

线程池的submit方法默认有如下3个实现: (来自AbstractExecutorService类)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

因此线程池中传入任务后,实际上会转化为RunnableFuture.

其中构造RunnableFuture的方法如下.

1
2
3
4
5
6
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

因此RunnableFuture的实际实现,在线程池中默认是FutureTask

综上所述:
因此线程池中传入任务后,实际上会转化为FutureTask.

消息队列小结

为啥要使用消息队列,主要是两个目的:
(场景:生产者消费者模型)

  1. 生产者与消费者速度不匹配;(生产者太快了)
  2. 解耦生产者与消费者。(可以异构,不同语言,不同节点)

JMS(Java MessageService)

Sun公司早期提出的消息标准。
api规范(旧).支持点对点和发布订阅.

概念 解释
producer/publisher 生产者
consumer/subscriber 消费者
message 消息
queue 存放消息的地方
topic 提供多个订阅者消费消息的机制

JMS中的消息模式有两种:

  1. P2P: 点对点
  2. publish-subscribe: 发布订阅

AMQP (advanced message queuing protocol)

高级MQ协议. 不但约束了api,还使不同实现之间可以合作.
加入了Exchange,Binding,解耦了队列,
可以灵活实现除了点对点\发布订阅以外的模型.
消息模型:

AMPQ其中一个实现: rabbitmq

简单理解概念:

1
2
3
4
Channel : 信道.一个连接多个Channel. 节省开销.(复用tcp连接,每次发送消息算一个信道)
Exchange: 交换器、路由器.
Queue: 队列. 带名字的邮箱.
Binding: 绑定.// 路由规则

Exchange:
根据配置好的路由规则,转发收到的消息到符合的queue.
不同类型的exchange:

1
2
3
4
direct 直连
fanout 一个消息到多个队列
topic 多个消息到一个队列
header // 不实用,即将弃用

kafka

基本概念

1
2
3
4
5
6
7
Broker: kafka server的一个单位(brokerid);
Topic: 某个主题;
Partition: 一般对应磁盘个数,加大吞吐率;
Replica: 0.8后新增,高可用;(3/5...)
Consumer
Group: 管理多个consumer
Producer

内部组件

1
2
3
4
5
6
SocketServer: 接受消息,返回消息(客户端、内部通信),
RequestHandlerPool: 根据不同request类型,调用不同KafkaApi处理消息
KafkaApi
KafkaController->ReplicaStateManager+PartitionStateManager+Listener
ReplicaManager: 底层存储结构,每个broker上都有。接受读写操作。
Zookeeper:1. 发现服务,状态转移;2. 存储offeset。(1.0后转入topic)

KafkaController

Broker通过ZK抢注Controller。
Controller负责管理broker。它会注册很多Listener,监听zk上节点变化,来维护状态自动机变化。

  • 防止脑裂:
    旧controller epoch<新controller epoch

    Follower与Leader

    Follower与Leader均是针对某一个partition而言:
  1. Slave(Follower,对于某一个topic的partition是follower):同步数据。
  2. Slave(Leader,对于某一个topic的partition是leader): 接受kafkaApi(consumer和producer)的请求。

读写partition:
随机找一个机器->metadata(zk)->发送请求给这个partition的具体leader。

SocketServer

状态机

KafkaHA

ISR: in-sync Replica
leader: 最新offset;
follower: 最新offset就是ISR;(轮询offset)
zk: 都有谁是ISR,同步了。

leader挂了,选一个ISR;
如果没有ISR,则随机选一个最先起来的。(不一定选offset最高的,不确定后头还会不会起来follower)。

ACK(均到内存,均不保证落盘)
1: leader收到就收到;
2: 有一个follower收到就收到;
3: 所有follower收到才收到。

错误恢复

  • leader选举
    直接从ISR中选第一个(近似于随机)。

  • Controller选举:(某台机器)
    所有Broker抢注ZK。

Controller挂的时候: 所有replica和partition的状态不能改变。也不能reBanlance了。
如果Controller挂的时候,leader也挂,所在partition就不能读不能写了。

  • 倾斜(热点):
    一台最稳定的机器上,可能集中了所有leader.
    (可以手动reAssign)

实际案例

其他类kafka的消息队列改进:

  1. Consumer之间消费的分区互不重叠(防止kafka的重复消费);(tube)
  2. 严格有序:数据落地到单队列上,每次ACK。(hippo)
  3. controller改成双master热备,降低对zk依赖,引入nameserver.(rocketmq)

sparkstreaming调优小结

需求:

  1. 消费速度必须高于生产速度;(天哪)
  2. 即使有峰值也不能挂;
  3. 资源占用尽量少,其中cores比内存稀缺;
  4. 必须使用receiver模式,不提供direct接口。(天哪)

措施:

  1. receiver数量与分区数一致,与executor数一致;
  2. duartion调优;
  3. persist选memory_and_disk;
  4. 每个executor分3个core、6g内存;
  5. 代码对于每个partition保存一次数据;
  6. 关闭backpressure.enabled;
  7. receiver.maxRate设得超大。

可能挂的场景:

  1. 处理速度跟不上,缓存的数据失效了,rdd失联退出;
  2. delay越积越多,OOM。
  1. maxRatebackpressure.enabled false
    由于底层架构不可控,api被别的部门重新封装,只开放了receiver模式,而且也不给zk连,因此只能使用receiver模式。

这个模式的特点就是会先保存一份数据到wal,相当于所有数据会有两份。
当然也有优点就是编程简单,不用自己维护offset了。

为了保证消费速度高于生产速度,设置了与分区数一致的receiver数量,并且:

1
2
backpressure.enabled false
receiver.maxRate 149600

前者true的话,意思是发现处理不过来的时候,会帮忙降低消费的速度;
后者就是设置最大消费速度。

  1. 为了使处理速度够快,设置了3个核,6g内存,相当于每个核2g内存。
    由于receiver占用一个核,相当于每个executor上能有两个核同时处理task。

  2. 由于receiver模式接受到数据会首先缓存一份rdd。
    默认所有rdd会继承dstream的缓存级别,而dstream的缓存级别默认是MEMORY_ONLY_SER。(题外话,spark任务的rdd默认缓存级别是memory_only
    MEMORY_ONLY_SER级别的特点是占用内存少,而牺牲一点计算时间。
    由于我们比较不缺内存,因此将存储级别改成memory_and_disk
    两个改变:
    (1) 取消了反序列化的时间;
    (2) 存不下的放disk,应对峰值可能出现的极端情况。

  3. duration调优。
    duration设置得小的话,内存占用小(缓存rdd),但是提交任务频繁,默认的启动计算开销占比大。
    duration设置得大的话,内存占用大(缓存rdd),启动计算开销占比小。

最小资源占比:处理时间正好稍小于duration。
我的设置: 平时处理时间是duartion的1/3,以应对峰值。

  1. 保存数据时候的代码优化。
    (1)每个partition保存一次;
    (2)在executor上保存:
    试了一下collect到driver上保存,发现慢了很多,出现了单点瓶颈。于是还是改到在每个executor上保存,每个executor上的线程访问自己的threadlocal对象,减少竞态条件。

  2. gc时间:
    使用CMS收集器,查看exector的gc日志,2小时内没有full gc,3秒一次gc。
    gc总时间占task时间的4%:

    1
    XX:MaxPermSize=256m -XX:SurvivorRatio=4 -XX:+UseMembar -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+CMSScavengeBeforeRemark -XX:ParallelCMSThreads=4 -XX:+UseCMSCompactAtFullCollection -XX:+CMSClassUnloadingEnabled -XX:CMSInitiatingOccupancyFraction=50 -XX:+UseCompressedOops
  3. 序列化类
    spark.serializer已经是kyro了。

LinkedTransferQueue笔记

本文主要讨论两个容器: SynchronousQueueLinkedTransferQueue。重点是LinkedTransferQueue.
背景:

https://stackoverflow.com/questions/5102570/implementation-of-blockingqueue-what-are-the-differences-between-synchronousque
有了LinkedBlockingQueue,为啥还要使用SynchronousQueue呢?
队列元素较少情况下: SynchronousQueue优于LinkedBlockingQueue
队列元素较多情况下: LinkedBlockingQueue优于SynchronousQueue。

LinkedTransferQueue的意义在于兼顾了可读性,易用性和高性能。

高性能,无锁: SynchronousQueue,ConcurrentLinkedQueue
可读性,易用性,有锁: LinkedBlockingQueue
兼顾上述优点: LinkedTransferQueue

BlockingQueue的实现:
大多是锁整个队列,并发量大的时候,锁比较耗费资源和时间。

SynchronousQueue:
无锁,只是传递元素,性能远高于LinkedBlockingQueue

ConcurrentLinkedQueue:
无锁,性能高用CAS实现非阻塞的并发,但没有实现BlockingQueue接口,因此不能当阻塞队列使用(不能直接用于生产者消费者场景下).

具体来说:
ConcurrentLinkedQueueQueue接口的一个实现.
SynchronousQueueBlockingQueue接口的一个实现:
LinkedTransferQueueTransferQueue接口的一个实现:

1
LinkedTransferQueue->TransferQueue->BlockingQueue

具体定义(jdk1.8):

1
2
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable {

SynchronousQueue

它本身不存在容量,只能进行线程之间的元素传送。由于对于传递性场景进行了某种充分的优化,其中最重要的是不需要锁,因此在只需要同步,不需要大量存储元素的场景下吞吐量很高。

优势: 无锁,吞吐量大;
缺点: 无容量,只能用于传递性场景。

todo: 具体是什么样神奇的优化。

ConcurrentLinkedQueue

详见之前的博客:
java并发编程的艺术-第六章2LinkedQueue

LinkedTransferQueue

本质上是多了TransferQueue接口的实现:

1
2
3
4
5
6
7
8
9
void transfer(E e) throws InterruptedException;// 阻塞
boolean tryTransfer(E e);
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;// 一段时间阻塞
boolean hasWaitingConsumer();
int getWaitingConsumerCount();

/**一般来说,不抛`InterruptedException`异常的是不阻塞的方法;
抛`InterruptedException`异常的是阻塞,或者定时阻塞的方法.
*/

综合了ConcurrentLinkedQueueSynchronousQueue的高吞吐量优点和LinkedTransferQueue的能存储大量元素的优点。

  1. transfer(E e):若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。
  2. tryTransfer(E e):若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移/传输对象元素e;若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。
  3. tryTransfer(E e, long timeout, TimeUnit unit):若当前存在一个正在等待获取的消费者线程,会立即传输给它;否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉;若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。
  4. hasWaitingConsumer():判断是否存在消费者线程。
  5. getWaitingConsumerCount():获取所有等待获取元素的消费线程数量。

需要看看xfer的4个参数:

1
2
3
4
5
6
7
8
9
10
11
private static final int NOW   = 0; // 即使没有可以用的空间或者可用的数据,也立即返回。
// for 不计时的poll和tryTransfer

private static final int ASYNC = 1; // 异步/非阻塞/立即返回.
// for offer, put, add

private static final int SYNC = 2; // 同步/阻塞/无限制等待.
// for transfer, take

private static final int TIMED = 3; // 可以等待一定时间.
// for 计时的poll和 tryTransfer

这里要注意takepoll的区别,take阻塞到有元素,poll没元素的话,直接返回null即可,不会死等。

参考资料:
http://www.cnblogs.com/rockman12352/p/3790245.html
http://cmsblogs.com/?p=2433

java代理汇总

1. 静态代理

  1. 接口A;
  2. 实现a1,实现A中的方法;
  3. 代理a2,
    (AProxy implement A),成员中有a1,这样实际干活的是a1,但是a2可以夹带一些私货,比如打上日志什么的。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class AProxy implements AInterface {
    private A1 a1;

    public AProxy(A1 a1) {// 包裹一个实现
    this.a1 = a1;
    }

    public void doSomething() {
    System.out.println("代理类方法,进行了增强。。。");
    System.out.println("事务开始。。。");
    // 调用委托类的方法;
    a1.doSomething();
    System.out.println("处理结束。。。");
    }

    }

2. jdk动态代理

上述静态代理的特点是,如果有n个接口要代理,那么相应的静态代理a2也会有很多。
动态代理的方法就是用反射来动态生成一个代理。而不用每次写非常相似的代码。

  1. 接口A;
  2. 实现a1,实现A中的方法;
  3. 动态代理,传入a1,通过反射创建一个与a1接口一样的类A2,并生成一个对象a2。
    然后定义每次invoke的时候需要夹带的私货(打日志)。

外界使用的时候,比静态代理多一步。
静态代理: 1.传入a1,得到一个代理a2.
动态代理: 2.传入a1,得到一个handler,再从handler中获取代理a2.(getProxy)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60


import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

public class DynamicProxyTest {
private static class AHandler implements InvocationHandler {

// 目标对象
private Object a1Impl;

public AHandler(Object a1Impl) {
super();
this.a1Impl = a1Impl;
}

/**
* 创建代理实例
*
* @return proxy Object
* @throws Throwable e
*/
public Object getProxy() throws Throwable {
// 实现1: 进行代理:
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), this.a1Impl.getClass()
.getInterfaces(), this);
// 实现2: 不进行代理,依然使用默认实现:
// 这样写只返回了目标对象,没有生成代理对象。
// return a1Impl;
}

/**
* 实现InvocationHandler接口方法
* 执行目标对象的方法,并进行增强
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 使用代理proxy,调用方法method,参数为args
Object result;
System.out.println("代理类方法,进行了增强。。。");
System.out.println("事务开始。。。");
// 执行目标方法对象
// 调用a1Impl的method方法,参数为args
result = method.invoke(a1Impl, args);
System.out.println("事务结束。。。");
return result;
}
}

public static void main(String[] args) throws Throwable {
A a1 = new A1();
AHandler handler = new AHandler(a1);
// 根据目标生成代理对象
A a2 = (A) handler.getProxy();
a2.doSomething();
}

}

3. cglib动态代理

上述jdk的静态代理和动态代理,本质上都是用接口指针存放实现的对象,然后偷偷用包装了a1的代理a2,替换指针里的对象。
初始化时,代理a2的接口声明和a1一样即可。

因此jdk的静态代理和动态代理都要求被代理的实现对象声明实现了某一个接口。

cglib代理则不同,用的是基类指针存放子类对象,因此并不要求一定要声明实现某一个接口,但必须要是一个可以被继承的类。对于java来说,也就是不能是final类。

jdk代理: 接口引用存放代理对象; 要求实现某接口.
cglib代理: 原类的引用存放代理对象; 要求不是final,可以继承。

具体cglib代理的写法:

1
2
3
4
5
<!-- https://mvnrepository.com/artifact/cglib/cglib -->
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class AImpl {// 并不继承接口,但不是final的(可以有子类)

public void doSomething() {
System.out.println("AImpl doSomething");
}
}

import java.lang.reflect.Method;
import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;

/**
* @author xiaoyue26
*/
public class AInterceptor implements MethodInterceptor {
private Object target;

/**
* 创建代理实例
*
* @param target
* @return
*/
public Object getInstance(Object target) {
this.target = target;
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(this.target.getClass());// 继承传入的对象
// 设置回调方法
enhancer.setCallback(this);
// 创建代理对象
return enhancer.create();
}

/**
* 实现MethodInterceptor接口要重写的方法。
* 回调方法
*/
@Override
public Object intercept(Object obj, Method method
, Object[] args, MethodProxy proxy) throws Throwable {
System.out.println("事务开始。。。");
Object result = proxy.invokeSuper(obj, args);// 调用父类对象(调用原实现)
System.out.println("事务结束。。。");
return result;
}

public static void main(String[] args) {
AInterceptor aInterceptor = new AInterceptor();
AImpl a1 = (AImpl) aInterceptor.getInstance(new AImpl());
a1.doSomething();
}


}

与jdk动态代理很相似,只是概念术语换一下:
创建一个方法拦截器,传入实现a1,获取一个代理a2。

java并发编程的艺术-Executor框架-第十章

  • 两级调度:
    1
    2
    上层调度: Executor框架控制;
    下层调度: 操作系统内核控制。

Executor框架

3大部分:

1. 任务

Runnable/Callable接口。

2. 任务的执行

Executor、ExecutorService。
具体实现包括: ThreadPoolExecutor,ScheduledThreadExecutor…

3. 任务的结果(异步计算的结果)

Future接口/FutureTask类。

不同线程池适用场景

  • SingleThreadExecutor
    单线程,适用于需要保证顺序执行,而且只允许单线程执行的场景;
  • CachedThreadExecutor
    大小无界,适用于负载较轻,或执行很多短期异步小任务;
  • FixedThreadExecutor
    大小固定,适用于负载较重的服务器。

周期线程池

  • ScheduledThreadPoolExecutor
    大小固定;
  • SingleThreadScheduledExetor
    单线程。

Future接口

submit任务到Executor后,返回一个Future接口对象,目前jdk的实现是FutureTask类对象,以后不一定。

1
<T>Future<T>submit(Callable<T>task)

ThreadPoolExecutor

四大组件:

  1. corePool: 核心线程池大小
  2. maximumPool: 最大线程池大小
  3. BlockingQueue: 暂时保存任务的工作队列
  4. RejectedExecutionHandler: 线程池已经关闭或饱和(达到最大线程池大小且工作队列已满),execute方法调用的Handler。

线程池通用的工作流程:

1
2
3
4
1. 预热: 接到任务就新建线程,直到达到corePoolSize;
2. 正式工作: 接到任务先扔到BlockingQueue,核心线程池的线程不停从BlockingQueue中取任务执行;
3. 扩容: Blocking满,扩大corePool到maximunPool.
4. 饱和: Blocking满,且达到maximumPool,调用rejectedExecutorHandler.

创建线程池的方法:

  1. 使用ExecutorService中提供的定制好的线程池:
    1
    2
    3
    4
    Executors.newFixedThreadPool(1);
    Executors.newSingleThreadExecutor();
    Executors.newCachedThreadPool();
    ExecutorService es = Executors.newCachedThreadPool();
  2. 自己用ThreadPoolExecutor定制一个线程池:
    1
    2
    3
    4
    // 手动自定义详细参数:
    ExecutorService es2 = new ThreadPoolExecutor(10, 10
    , 101, TimeUnit.SECONDS
    , new LinkedBlockingQueue<Runnable>(2000));

其中的继承关系是:

1
2
3
ThreadPoolExecutor->AbstractExecutorService->ExecutorService
ThreadPoolExecutor extends AbstractExecutorService
abstract class AbstractExecutorService implements ExecutorService

FixedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ThreadPoolExecutor实现类中的通用方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// Executors工具类中的FixedThreadPool:
public static ExecutorService newFixedThreadPool(int nThread){
return new ThreadPoolExecutor(nThread,nThread,0L,TimeUnit.MILLISECENDS
,new LinkedBlockingQueue<Runnable>());
}

可以看出最大池和核心池都是n,也就是固定大小,不再扩容。
keekAlive时间为0,因此多余空闲线程立刻被终止。
最后一个参数用的是无界队列,因此没有饱和状态,只有shutdown状态。

SingleThreadExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

CachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE
,60L, TimeUnit.SECONDS
,new SynchronousQueue<Runnable>());
}

SynchronousQueue: 同步传递任务的阻塞队列。(容量为0,就是个传球手,总是满状态)

上述三个实现都没有饱和状态,前两者是因为BlockingQueue无界,
CachedThreadPool是因为maximumPool太大了(Integer.MAX_VALUE)。
最近太忙了…to be continue…