- Notifications
You must be signed in to change notification settings - Fork161
Fadezed/concurrency
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
内容整理自《Java并发编程实战》《java-concurrency-patterns》相关工具类《vjtools》
- 硬件驱动
- 人才稀缺
- 分工(如何高效地拆解任务并分配给线程)Fork/Join 框架
- 同步(指的是线程之间如何协作)CountDownLatch
- 互斥(保证同一时刻只允许一个线程访问共享资源)可重入锁
- 跳出来,看全景,站在模型角度看问题(避免盲人摸象)例如:synchronized、wait()、notify()不过是操作系统领域里管程模型的一种实现而已
- 钻进去,看本质
- 探究 Doug Lea 大师在J.U.C 包创造的章法
- 知识体系全景图

- CPU 增加了缓存,以均衡与内存的速度差异;
- 操作系统增加了进程、线程,以分时复用 CPU,进而均衡 CPU 与 I/O 设备的速度差异;
- 编译程序优化指令执行次序,使得缓存能够得到更加合理地利用。
- 一个线程对共享变量的修改,另外一个线程能够立刻看到,我们称为可见性
- 代码示例


- count+=1 操作分析
- 双重检查创建单例对象
public class Singleton { private static Singleton instance; private Singleton(){} public static Singleton getInstance(){ //一重判断 if (instance == null) { synchronized(Singleton.class) { //二重判断防止多线程同时竞争锁的情况多次创建 if (instance == null) instance = new Singleton(); } } return instance; }}按需禁用缓存以及编译优化代码来源
JVM 中的同步是基于进入和退出管程(Monitor)对象实现的。每个对象实例都会有一个 Monitor,Monitor 可以和对象一起创建、销毁。Monitor 是由 ObjectMonitor 实现,而 ObjectMonitor 实现,而 ObjectMonitor 是由C++ 的 ObjectMonitor.hpp 文件实现
当多个线程同时访问时,多个线程会被先放在EntryList集合,处于block状态的线程,都会被加入到该列表。接下来当线程获取到对象的Monitor时,Monitor依靠底层操作系统的Mutex Lock来实现互斥,线程申请Mutex成功则持有,其它线程无法获取到Mutex
ObjectMonitor() { _header = NULL; _count = 0; // 记录个数 _waiters = 0, _recursions = 0; _object = NULL; _owner = NULL; _WaitSet = NULL; // 处于 wait 状态的线程,会被加入到 _WaitSet _WaitSetLock = 0 ; _Responsible = NULL ; _succ = NULL ; _cxq = NULL ; FreeNext = NULL ; _EntryList = NULL ; // 处于等待锁 block 状态的线程,会被加入到该列表 _SpinFreq = 0 ; _SpinClock = 0 ; OwnerIsThread = 0 ;}wait():如果线程调用 wait() 方法,就会释放当前持有的 Mutex,并且该线程会进入 WaitSet 集合中,等待下一次被唤醒。如果当前线程顺利执行完方法,也将释放 Mutex。

锁升级优化
class X { // 修饰非静态方法 锁对象为当前类的实例对象 this synchronized void get() { } // 修饰静态方法 锁对象为当前类的Class对象 Class X synchronized static void set() { } // 修饰代码块 Object obj = new Object(); void put() { synchronized(obj) { } }}
- 代码示例
- 修饰变量时,初衷是告诉编译器:这个变量生而不变,非immutable,即只能表示对象引用不能被赋值(例如List);
- 修饰方法则方法不能被重写
- 修饰类则不能被扩展继承。
final int x; // 错误的构造函数 public FinalFieldExample() { x = 3; y = 4; // 此处就是讲 this 逸出, global.obj = this; }- 程序的顺序性规则
程序前面对某个变量的修改一定是对后续操作可见的。- volatile 变量规则
对一个 volatile 变量的写操作, Happens-Before 于后续对这个 volatile 变量的读操作。传递性规则
A Happens-Before B,且 B Happens-Before C,那么A Happens-Before C。
管程(synchronized)中锁的规则
对一个锁的解锁 Happens-Before 于后续对这个锁加锁synchronized (this) { // 此处自动加锁 // x 是共享变量, 初始值 =10 if (this.x < 12) { this.x = 12; } } // 此处自动解锁线程 start() 规则
主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的操作。Thread B = new Thread(()->{ // 主线程调用 B.start() 之前 // 所有对共享变量的修改,此处皆可见 // 此例中,var==77});// 此处主线程A对共享变量 var 修改var = 77;// 主线程启动子线程B.start();线程 join() 规则
主线程 A 等待子线程 B 完成(主线程 A 通过调用子线程B 的 join() 方法实现),当子线程 B 完成后(主线程 A 中 join() 方法返回),主线程能够看到子线程的操作。Thread B = new Thread(()->{ // 此处对共享变量 var 修改 var = 66;});// 例如此处对共享变量修改,// 则这个修改结果对线程 B 可见// 主线程启动子线程B.start();B.join()// 子线程所有对共享变量的修改// 在主线程调用 B.join() 之后皆可见// 此例中,var==66
初始状态
指的是线程已经被创建,但是还不允许分配 CPU 执行。这个状态属于编程语言特有的,不过这里所谓的被创建,仅仅是在编程语言层面被创建,而在操作系统层面,真正的线程还没有创建。可运行状态
指的是线程可以分配 CPU 执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配 CPU 执行。运行状态
当有空闲的 CPU 时,操作系统会将其分配给一个处于可运行状态的线程,被分配到 CPU 的线程的状态就转换成了运行状态。休眠状态
运行状态的线程如果调用一个阻塞的 API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放 CPU 使用权,休眠状态的线程永远没有机会获得 CPU 使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。终止状态
线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。
- RUNNABLE - BLOCKED
- 线程等待synchronized隐式锁(线程调用阻塞式API依然是RUNNABLE状态)
- RUNNABLE - WAITING
- 获得synchronized隐式锁的线程,调用Object.wait();
- Thread.join();
- LockSupport.park();
- RUNNABLE - TIMED_WAITING
- Thread.sleep(long millis)
- 获得 synchronized 隐式锁的线程调用 Object.wait(long timeout)
- Thread.join(long millis)
- LockSupport.parkNanos(Object blocker, long deadline)
- LockSupport.parkUntil(long deadline)
- NEW - RUNNABLE
- Thread.start()
- RUNNABLE - TERMINATED
- run()执行完后自动转为 TERMINATED
- stop()(@Deprecated 直接结束线程,如果线程持有ReentrantLock锁并不会释放)
- interrupt()
- 异常通知
- 主动监测
- 降低延迟(发出请求到收到响应这个过程的时间;延迟越短,意味着程序执行得越快,性能也就越好。)
- 提高吞吐量(指的是在单位时间内能处理请求的数量;吞吐量越大,意味着程序能处理的请求越多,性能也就越好。)
- 优化算法
- 发挥硬件性能(CPU、IO)
- CPU 计算和 I/O 操作的耗时是 1:2

- 公式:
- 单核CPU :最佳线程数 =1 +(I/O 耗时 / CPU 耗时)
- 多核CPU :最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)
#6. 若干反例
class SafeCalc { long value = 0L; long get() { synchronized (new Object()) { return value; } } void addOne() { synchronized (new Object()) { value += 1; } }}class Account { // 账户余额 private Integer balance; // 账户密码 private String password; // 取款 void withdraw(Integer amt) { synchronized(balance) { if (this.balance > amt){ this.balance -= amt; } } } // 更改密码 void updatePassword(String pw){ synchronized(password) { this.password = pw; } } }void addIfNotExist(Vector v, Object o){ if(!v.contains(o)) { v.add(o); }}- 能够响应中断
synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。- 支持超时
如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。- 非阻塞地获取锁
如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。// 支持中断的 APIvoid lockInterruptibly() throws InterruptedException;// 支持超时的 APIboolean tryLock(long time, TimeUnit unit) throws InterruptedException;// 支持非阻塞获取锁的 APIboolean tryLock();- 当线程 T1 执行到 ① 处时,已经获取到了锁 rtl ,当在 ① 处调用 get() 方法时,会在 ② 再次对锁rtl 执行加锁操作。此时,如果锁 rtl 是可重入的,那么线程 T1 可以再次加锁成功;如果锁 rtl 是不可重入的,那么线程 T1 此时会被阻塞。
class X { private final Lock rtl =new ReentrantLock(); int value; public int get() { // 获取锁 rtl.lock(); ② try { return value; } finally { // 保证锁能释放 rtl.unlock(); } } public void addOne() { // 获取锁 rtl.lock(); try { value = 1 + get(); ① } finally { // 保证锁能释放 rtl.unlock(); } }}- 指线程在等待队列中被唤醒的策略,如果是公平锁,则按照等待时间排序,优先唤醒时间长的
// 无参构造函数:默认非公平锁public ReentrantLock() { sync = new NonfairSync();}// 根据公平策略参数创建锁public ReentrantLock(boolean fair){ sync = fair ? new FairSync() : new NonfairSync();}- 永远只在更新对象的成员变量时加锁
- 永远只在访问可变的成员变量时加锁
- 永远不在调用其他对象的方法时加锁
- init():设置计数器的初始值。
- down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。(acquire())
- up():计数器的值加 1;如果此时计数器的值小于或者等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。(release())
class Semaphore{ // 计数器 int count; // 等待队列 Queue queue; // 初始化操作 Semaphore(int c){ this.count=c; } // void down(){ this.count--; if(this.count<0){ // 将当前线程插入等待队列 // 阻塞当前线程 } } void up(){ this.count++; if(this.count<=0) { // 移除等待队列中的某个线程 T // 唤醒线程 T } }}允许多个线程同时读共享变量
只允许一个线程写共享变量
如果一个写线程正在执行写操作,此时禁止读线程读共享变量
- List list = Collections.synchronizedList(new ArrayList());
- Set set = Collections.synchronizedSet(new HashSet());
- Map map = Collections.synchronizedMap(new HashMap());
- 如下组合操作非原子操作,故通过synchronized保证原子操作
List list = Collections.synchronizedList(new ArrayList());synchronized (list) { Iterator i = list.iterator(); while (i.hasNext()) foo(i.next());}CopyOnWriteArrayList写的时候会将共享变量新复制一份出来,这样做的好处是读操作完全无锁)
内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组。

若遍历 array 的同时,新增元素,CopyOnWriteArrayList 会将 array 复制一份,然后在新复制处理的数组上执行增加元素的操作,执行完之后再将 array 指向这个新的数组。读写是可以并行的,遍历操作一直都是基于原 array 执行,而写操作则是基于新 array 进行。

总结:
- 仅适用于写操作非常少的场景,而且能够容忍读写的短暂不一致。例如上面的例子中,写入的新元素并不能立刻被遍历到。
- 迭代器是只读的,不支持增删改。因为迭代器遍历的仅仅是一个快照而对快照进行增删改是没有意义的。
ConcurrentHashMap、ConcurrentSkipListMap(SkipList跳表) 区别在于Key是否有序
CopyOnWriteArraySet、ConcurrentSkipListSet
单端阻塞队列
- 可有界
- ArrayBlockingQueue(数组)
- LinkedBlockingQueue(链表)默认大小为Integer最大值
- 无界
- SynchronousQueue(无队列,Producer的入队必须等待Consumer的出队)
- LinkedTransferQueue(融合LinkedBlockingQueue和SynchronousQueue,性能优于LinkedBlockingQueue)
- PriorityBlockingQueue(支持按照优先级出队)
- DelayQueue(延时队列)
- 可有界
双端阻塞队列
单端非阻塞队列
- ConcurrentLinkedQueue
双端非阻塞队列
- ConcurrentLinkedDeque
- AtomicBoolean
- AtomicInteger
- AtomicLong
getAndIncrement() // 原子化 i++getAndDecrement() // 原子化的 i--incrementAndGet() // 原子化的 ++idecrementAndGet() // 原子化的 --i// 当前值 +=delta,返回 += 前的值getAndAdd(delta) // 当前值 +=delta,返回 += 后的值addAndGet(delta)//CAS 操作,返回是否成功compareAndSet(expect, update)// 以下四个方法// 新值可以通过传入 func 函数来计算getAndUpdate(func)updateAndGet(func)getAndAccumulate(x,func)accumulateAndGet(x,func)- AtomicReference
- AtomicStampedReference(版本号解决ABA问题)
- AtomicMarkableReference(版本号解决ABA问题)
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
- AtomicReferenceFieldUpdater
- DoubleAccumulator
- DoubleAdder
- LongAccumulator
- LongAdder
- 创建对象,仅仅是在 JVM 的堆里分配一块内存而已;
- 而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了。
- 代码示例
- Java ThreadPoolExecutor
ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)- 参数意义
- corePoolSize:最小或核心线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
- maximumPoolSize: 最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
- keepAliveTime & unit:线程空闲回收时间,大小和单位,也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,就回收。
- workQueue:工作队列。
- threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
- handler: 拒绝策略。若线程池内所有线程都是忙碌,并且工作队列(有界队列)也满,线程池就会触发拒绝策略,以下为ThreadPoolExecutor提供的四种策略
- CallerRunsPolicy:提交任务的线程自己去执行该任务。
- AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException
- DiscardPolicy:直接丢弃任务,没有任何异常抛出。
- DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
- jdk 1.6之后加入allowCoreThreadTimeOut(boolean value) 核心线程也可释放。
- 不建议使用Executors创建线程池(很多都是无界队列)
- 慎用默认拒绝策略RejectedExecutionException不强制处理容易忽略,建议自定义拒绝策略配合策略降级使用
- 异常处理不会通知所有需要按需捕获处理异常
ThreadPoolExecutor提供了三个submit方法
Future<?> submit(Runnable task);// 提交 Runnable 任务
这个方法的参数是一个 Runnable 接口,Runnable 接口的 run() 方法是没有返回值的,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join()。- Future submit(Callable task);// 提交 Callable 任务
Callable只有一个 call() 方法,并且这个方法是有返回值的,所以这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果。- Future submit(Runnable task, T result);// 提交 Runnable 任务及结果引用
假设这个方法返回的 Future 对象是 f,f.get()=的返回值就是传给 submit() 方法的参数 result。- Future接口提供的方法
// 取消任务boolean cancel(boolean mayInterruptIfRunning);// 判断任务是否已取消 boolean isCancelled();// 判断任务是否已结束boolean isDone();// 获得任务执行结果get();// 获得任务执行结果,支持超时get(long timeout, TimeUnit unit);- 代码示例
- 构造函数类似线程池submit
FutureTask(Callable<V> callable);FutureTask(Runnable runnable, V result);About
Java 并发编程知识梳理以及常见处理模式 features and patterns
Topics
Resources
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Releases
Packages0
Uh oh!
There was an error while loading.Please reload this page.













