java并发基础
本系列文章主要讲解Java并发相关的内容,包括同步、锁、信号量、阻塞队列、线程池等,整体思维导图如下:
本文主要讲解Java内存模型、同步块、重入锁ReentrantLock和读写锁ReentrantReadWriteLock的应用及源码实现。
同步和锁
我们之所以要用多线程并发,是为了更好地利用服务器资源,使程序的响应速度更好。然而,如果多个线程访问了相同的资源,如同一内存区的变量、数组和对象,以及外部数据库或者文件等,可能导致程序的运行结果和我们预期的不一致。
假设你家庭成员都用同一个银行账户,每天都有收入或者支出,我们来看如下的代码:
package com.molyeo.java.concurrent;
/**
Created by zhangkh on 2018/8/24.
*/
public class SynchronizedDemo {
public static void main(String[] args){Account account=new Account(); account.setBalance(100000); System.out.printf("Account : Initial Balance: %f",account.getBalance()); Spender spender=new Spender(account); Thread spenderThread=new Thread(spender); spenderThread.start(); Earner earner=new Earner(account); Thread earnerThread=new Thread(earner); earnerThread.start(); try { spenderThread.join(); earnerThread.join(); System.out.printf("Account : Final Balance: %f",account.getBalance()); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
SynchronizedDemo类先创建一个账户,然后将账户实例传递给Spender线程(每个月花钱的败家子)和Earner线程(挣钱养家的家庭成员),然后启动这两个线程。
其中Account的代码如下:
public class Account{
private double balance;public double getBalance() {
return balance;
}
public void setBalance(double balance) {
this.balance = balance;
}
public void addAmount(double amount) {
try{ Thread.sleep(10); }catch (Exception e){ e.printStackTrace(); } balance=balance+amount;
}
public void subtractAmount(double amount){try{ Thread.sleep(10); }catch (Exception e){ e.printStackTrace(); } balance=balance-amount;
}
}
Account拥有一个成员变量balance,用于记录账户余额,对外提供一个增加余额addAmount()和减少余额subtractAmount()的方法。
Spender和Earner代码如下:
class Spender implements Runnable {
private Account account;
public Spender(Account account) {this.account=account;
}
@Override
public void run() {for (int i=0; i<30; i++){ account.subtractAmount(1000); }
}
}
class Earner implements Runnable{
private Account account;
public Earner(Account account) {
this.account=account;
}
@Override
public void run() {
for (int i=0; i<30; i++){
account.addAmount(1000);
}
}
}
Spender线程每个月花钱30次,每次花1000人民币;Earner线程挣钱30次,每次挣1000人民币。
SynchronizedDemo程序中先设置账户余额为10万元,可以认为是家庭存款,按照预期的结果,这个月败家子花了3万,养家的爸妈一起挣了3万,这个月后结算,家里存款还是10万。虽然爸妈这个月白忙了,但是败家子还是先别内疚了,先看看程序的输出。
第一次运行输出结果如下(由于有一定的随机性,结果不一定完全相同):
Account : Initial Balance: 100000.000000
Account : Final Balance: 99000.000000
第二次运行输出结果如下(由于有一定的随机性,结果不一定完全相同):
Account : Initial Balance: 100000.000000
Account : Final Balance: 112000.000000
程序到底怎么了呢,好好地每次运行结果还不一致了?要深入分析这个问题,我们要先了解下Java的内存模型。
Java内存模型
Java内存模型规范了Java虚拟机和计算机内存的协同工作机制,简单的说就是说明了如何和何时可以看到由其他线程修改后的共享变量的值,以及在必须时如何同步地访问共享变量。
Java内存模型逻辑视图
Java内存模型把Java虚拟机划分为线程栈和堆。
每个运行在Java虚拟机中的线程拥有自己的线程栈,包含线程调用的方法当前执行点相关的信息。同时一个线程也仅仅能访问自己的线程栈,线程创建的本地变量仅自己可见,其他线程不可见。
原始类型(boolean, byte, short, char, int, long, float, double)的本地变量都存放在线程栈上,而Java程序中创建的对象都存在在堆上,不管是自定义的自定义对象,还是原始类型的包装对象(如Byte, Integer, Long等)。下面这张图演示了调用栈和本地变量存放在线程栈上,对象存放在堆上。
变量到底是存放在线程栈上还是堆上,有如下几点说明:
本地变量如果是原始类型,则始终存在线程栈上
本地变量如果是对象的引用,则这个引用存在现在线程栈上,而对象存在堆上
一个对象的方法中的本地变量存在线程栈上,即使这个对象存放在堆上
对象的成员变量和对象本身一起存放在堆上,不管成员变量是原始类型还是引用类型
静态成员变量也是存放在堆上
存放在堆上的对象可以被所有持有对这个对象引用的线程访问。当一个线程可以访问一个对象时,它也可以访问这个对象的成员变量。如果两个线程同时调用同一个对象上的同一个方法,它们将会都访问这个对象的成员变量,但是每一个线程都拥有这个本地变量的私有拷贝。
硬件内存架构
现代硬件内存模型与Java内存模型有一些不同。理解内存模型架构以及Java内存模型如何与它协同工作也是非常重要的。
现代计算机硬件架构简单图示如下:
现代计算机通常由两个或者多个CPU组成,拥有寄存器,缓存,和主存,其中访问速度寄存器最高,缓存次之,再者就是主存。
通常情况下,当一个CPU需要读取主存时,它会将主存的部分读到CPU缓存中。它甚至可能将缓存中的部分内容读到它的内部寄存器中,然后在寄存器中执行操作。当CPU需要将结果写回到主存中去时,它会将内部寄存器的值刷新到缓存中,然后在某个时间点将值刷新回主存。
需要注意的是,当CPU需要在缓存层存放一些东西的时候,存放在缓存中的内容通常会被刷新回主存。CPU缓存可以在某一时刻将数据局部写到它的内存中,和在某一时刻局部刷新它的内存。它不会再某一时刻读/写整个缓存,通常是一个或者多个缓存行被读取或者写入。
硬件内存架构没有区分线程栈和堆。对于硬件,所有的线程栈和堆都分布在主内中。部分线程栈和堆可能有时候会出现在CPU缓存中和CPU内部的寄存器中。如下图所示:
当对象和变量被存放在计算机中各种不同的内存区域中时,就有可能会出现一些问题,主要包括两个方面:
共享变量修改的可见性
多线程读写共享变量时出现竞态条件
我们以上面的SynchronizedDemo例子,从硬件内存架构来分析这两个问题。
可见性
Account的成员变量balance被初始化为100000后,保存在主存中。Spender线程首次将这个共享对象读取到缓存后,调用subtractAmount()方法减少了balance的值。如果此时缓存数据没有被刷新到主存,则更改后的值对其他线程是不可见的。
而此时Earner线程也将共享对象拷贝到缓存中,注意此时的共享对象的值还是100000,然后调用addAmount()方法增加balance的值。这样两个线程多次更改后,导致最后balance的结果是不确定的,进而出现每次运行的结果可能不太一致。
为了解决共享变量的可见性问题,可以使用Java中的volatile关键字,其可以保证直接从主存中读取一个变量,如果这个变量被修改后,总是会被写到主存中去。
看到这里后,我们可以尝试对Account的成员变量balance使用volatile修饰,然后多次运行上述程序,看结果是否保持一致。
Account修改内容如下:
private volatile double balance;
为了避免多次运行,我们在SynchronizedDemo的主程序中,直接循环10次:
public class SynchronizedDemo {
public static void main(String[] args){
for(int i=0;i<10;i++){
Account account = new Account();
account.setBalance(100000);
System.out.printf(“Account : Initial Balance: %s”,account.getBalance());
Spender spender=new Spender(account);
Thread spenderThread=new Thread(spender);
spenderThread.start();
Earner earner=new Earner(account);
Thread earnerThread=new Thread(earner);
earnerThread.start();
try {
spenderThread.join();
earnerThread.join();
System.out.printf(“Account : Final Balance: %s”,account.getBalance());
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“——————————————————“);
}
}
}
让我们来见证奇迹的发生,看看程序的输出结果:
Account : Initial Balance: 100000.0
Account : Final Balance: 105000.0
Account : Initial Balance: 100000.0
Account : Final Balance: 99000.0
Account : Initial Balance: 100000.0
Account : Final Balance: 99000.0
Account : Initial Balance: 100000.0
Account : Final Balance: 100000.0
Account : Initial Balance: 100000.0
Account : Final Balance: 99000.0
Account : Initial Balance: 100000.0
Account : Final Balance: 98000.0
Account : Initial Balance: 100000.0
Account : Final Balance: 96000.0
Account : Initial Balance: 100000.0
Account : Final Balance: 94000.0
Account : Initial Balance: 100000.0
Account : Final Balance: 97000.0
Account : Initial Balance: 100000.0
Account : Final Balance: 98000.0
这是怎么回事呢,为什么每次运行的结果不完全一致呢,因为这里还涉及到操作原子性的问题。
原子性
所谓原子性,即一个操作或者多个操作要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。
我们来看Account中用于修改余额balance的两个方法:
public void addAmount(double amount) {
try{
Thread.sleep(10);
}catch (Exception e){
e.printStackTrace();
}
balance=balance+amount;
}
public void subtractAmount(double amount){
try{
Thread.sleep(10);
}catch (Exception e){
e.printStackTrace();
}
balance=balance-amount;
}
不管是addAmount还是subtractAmount方法对balance的操作都不具备原子性,其包括读取变量的原始值,进行加法或者减法的操作,然后写入工作内存。
假设Account初始值为100000,Earner线程先启动,读取了balance的原始值,还没有对balance进行修改前,这是线程被阻塞了。
然后Spender线程后启动,也读取了balance的值,由于该变量还没有被修改,故Spender线程的缓存还是有效的,然后进行减少amount(amount=1000)的操作,操作后balance的值为99000,将其写入工作内存,最后写入到主存。
然后Earner线程被激活了,虽然此时Spender线程对balance进行了修改,并使Earner线程中的缓存行无效,但由于Earner线程先前已经得到balance的值了,故balance的值还是先前的100000,继续进行相关的加法操作,操作后的最后balance值为101000,最后将结果写入到主存。此处涉及到Java语言的指令集架构,数据计算时基于栈的,有兴趣的可以去深入研究。
在Java中,只有基本数据类型的变量的读取和赋值是原子性操作的,即这些操作是不可中断的,要么执行,要么不执行。
如下对long型的balance赋值是原子性的:
balance=100000
但对balance进行算术运算则不是原子性的,因为其包括读取balance的值、加1000、写入新的值等三个操作。
balance=balance+1000
如果我们要实现更大范围操作的原子性,则需要通过synchronized和Lock来实现。同步块或者锁可以保证同一个时刻仅有一个线程可以进入代码的临界区,同时代码块中所有被访问的变量将会从主存中读入,退出同步块时,不管这个变量本身是否被声明为volatile,所有被更新的变量会被刷新到主存。
有了这些理论知识后,我们再看上面的示例如何修改,才能是最终的运行结果正确且完全一致。
我们只需要将要在多线程中访问的方法addAmount和subtractAmount声明为synchronized即可。由于synchronized能够保证唯一访问和修改后的数据刷新到主存,则成员变量balance是否被声明为volatile都无所谓了。我们看修改后的Account:
class Account {
private double balance;
public double getBalance() {
return balance;
}
public void setBalance(double balance) {
this.balance = balance;
}
public synchronized void addAmount(double amount) {
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
balance = balance + amount;
}
public synchronized void subtractAmount(double amount) {
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
balance = balance - amount;
}
}
然后不管我们运行多少次,都可以看到最后balance的值都是100000。
同步块
Java中同步块主要用来标记方法或者代码块时同步的,同步在同一个对象上的同步块同一时刻只能被一个进程进入并执行相关操作,其他线程被阻塞,直到执行该同步块的线程退出。
Java中主要要四种不同的同步块:
实例方法
静态方法
实例方法中的同步块
静态方法中的同步块
###实例方法同步
我们上面Account类中的两个方法就是实例方法上的同步(去掉线程睡眠的代码):
public synchronized void addAmount(double amount) {
balance = balance + amount;
}
public synchronized void subtractAmount(double amount) {
balance = balance - amount;
}
Java实例方法同步是同步在拥有该方法的实例上的,只有一个线程能够在实例方法同步块中运行。如果有多个实例存在,则一个线程一次可以在一个实例同步块中执行操作,但是不同的线程针对不同的实例则是可以同时执行的。
###实例方法中的同步
有时候不需要同步整个方法,而是方法中的一部分。我们将上面的addAmount()方法改写如下:
public void addAmount(double amount) {
synchronized (this){
balance = balance + amount;
}
}
public synchronized void subtractAmount(double amount) {
balance = balance - amount;
}
在上例中,使用了关键字this,即为调用addAmount方法的实例本身。在同步构造器中用括号括起来的对象叫做监视器对象。上述代码使用监视器对象同步,同步实例方法使用调用方法本身的实例作为监视器对象。
如果实例方法同步和实例方法中的同步,是针对同一个实例的话,则同一时刻只有一个线程在这两个同步块中的任意一个方法内执行。
静态方法同步
静态方法的同步是同步在该方法所在的类对象上的,因为JVM中一个类只能对应一个类对象,故同一时刻只允许一个线程执行同一个类中的静态同步方法。
静态方法中的同步块
静态方法中的同步块和静态方法作用类似,都是同步在类对象上。
下面的两个静态方法的同步块是不允许同时被线程访问的。
public synchronized static void addAmount(double amount) {
balance = balance + amount;
}
public static void subtractAmount(double amount) {
synchronized (Account.class){
balance = balance - amount;
}
}
线程安全
上面的章节中,我们深入探讨了线程同步以及同步块的多种形式,但是不是所有被多线程访问方法都需要同步呢?这就需要我们了解代码是到底是不是线程安全的。
所谓线程安全,是只允许被多个线程同时执行的代码,其不包含竞态条件。而多个线程同时更新共享资源时会引发竞态条件,因而需要知道Java线程执行时共享了什么资源。
局部变量
局部变量存储在线程栈中,即局部变量永远不会被多个线程共享,基础类型的局部变量是线程安全的。如下面的代码,即使多个线程调用,其结果总是返回固定值1。
public long fixedNumer(){
long result=0;
result++;
return result;
}
局部的对象引用
局部的对象引用和局部变量不一样,虽然引用自身存在线程栈上,但是引用所值得对象存在共享堆中。
如果这个方法中创建的对象不能逃逸出该方法,即实例不能被别的线程获取到,也不会被非局部变量引用到,则其是线程安全的,否则不是线程安全的。
在下面的代码中,我们在makeAccountAndInit方法中新建Account对象,并赋值给account引用。由于account没有被方法makeAccountAndInit返回,也没有被其他的方法返回,故这里的引用account是线程安全的。
public void makeAccountAndInit(){
Account account=new Account();
initBalance(account);
}
public void initBalance(Account account){
account.setBalance(100000);
}
但是如果account作为结果被返回,则其可以被其他线程获取到,则不是线程安全的。
成员变量
成员变量存储在堆上,如果多个线程同时更新同一个对象的同一个成员,则代码不是线程安全的。就如同我们最开始的SynchronizedDemo例子。
至于如何判断我们的代码是否是线程安全的,可以根据线程控制逃逸规则。
线程控制逃逸规则
如果一个资源的创建、使用、销毁都是在同一个线程内完成的,且永远不会脱离该线程的控制,则该资源的使用就是线程安全的。
资源可以是对象,数组,文件,数据库连接,套接字等等。Java中你无需主动销毁对象,所以“销毁”指不再有引用指向对象。
此外即使对象本身是线程安全的,但对象中如果包含其他资源,也会导致应用不是线程安全的。
如2个线程都创建了各自的数据库连接,每个连接自身是线程安全的,但是如果连接到同一个数据库,并且更新同一行记录,如update table set num=num+1 where id=10000;。可能最后的结果并没有增加2,而是只增加1`。
因而在实际编程时,一定要注意区分线程控制的对象是资源本身,还是只是资源的引用。
锁
Java中锁Lock和synchronized同步块一样,是一种线程同步机制,但提供了更灵活的结构,更细粒度的控制。
###Lock和synchronized的区别
Lock实现类提供了细粒度的控制。synchronized方法或者语句提供了对每个对象相关的隐式监视器锁的访问,但是强制锁的获取和释放均要出现在一个块结构中。而Lock实现类允许锁在不同的作用范围内获取和释放,并允许以任何的顺序获取和释放多个锁。
Lock实现类提供了synchronized方法不包含的功能。如获取锁尝试、获取可中断的锁尝试、获得可超时锁的尝试
//仅在调用时锁为空闲状态才获取该锁。
boolean tryLock();
//如果当前线程未被中断,则获取锁。
void lockInterruptibly() throws InterruptedException;
//如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
Lock实现类提供了更丰富的语义。如保证排序、非重入用法或死锁检测等。
Lock使用示例
为了演示Lock的使用,我们使用Lock改写之前利用synchronized的SynchronizedDemo,新的程序代码如下:
package com.molyeo.java.concurrent;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
- Created by zhangkh on 2018/8/27.
*/
public class ReentrantLockDemo {
public static void main(String[] args) {
}for (int i = 0; i < 10; i++) { AccountWithLock account = new AccountWithLock(); account.setBalance(100000); System.out.printf("Account : Initial Balance: %s", account.getBalance()); Spender spender = new Spender(account); Thread spenderThread = new Thread(spender); spenderThread.start(); Earner earner = new Earner(account); Thread earnerThread = new Thread(earner); earnerThread.start(); try { spenderThread.join(); earnerThread.join(); System.out.printf("Account : Final Balance: %s", account.getBalance()); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------------------------------------------------------"); }
}
class AccountWithLock extends Account {
private final Lock lock = new ReentrantLock();
private double balance;
public double getBalance() {
return balance;
}
public void setBalance(double balance) {
this.balance = balance;
}
public void addAmount(double amount) {
try {
lock.lock();
balance = balance + amount;
} finally {
lock.unlock();
}
}
public void subtractAmount(double amount) {
try {
lock.lock();
balance = balance - amount;
} finally {
lock.unlock();
}
}
}
只需要改写Account类,将addAmount()方法和subtractAmount()中采用获取锁,然后操作balance变量,最后释放锁即可。
由于setBalance()方法并没有在多个线程中使用,故此处没有改写,实际应用中需要注意。
可以看到程序中使用的是Lock的实现类ReentrantLock,即可重入锁。
Lock的类图
Java中主要有Lock和ReadWriteLock(读写锁)这两个接口,其中Lock的实现有可重入锁ReentrantLock、以及ReentrantReadWriteLock(可重入读写锁)的两个子类ReadLock和WriteLock.
而接口ReaderWriteLock实现只有ReentrantReadWriteLock(可重入读写锁)。类图如下:
可重入锁
在上面的示例中我们在类AccountWithLock中使用可重入锁ReentrantLock实现了类似同步块的功能。
可以说可重入锁是java.util.concurrent包的基石,众多Java并发工具类将其作为成员变量,以解决多线程的资源共享问题。
源码实现
通过查看ReentrantLock的源码,会知道其有三个子类。一个抽象子类Sync,以及Sync的的两个具体类NonfairSync、FairSync。而Sync是又继承了AbstractQueuedSynchronizer。整体类图如下:
其中AbstractQueuedSynchronizer简称AQS,为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。
AQS和CAS
AbstractQueuedSynchronizer(简称AQS)有一个成员变量state,
private volatile int state;
private transient volatile Node tail;
private transient volatile Node head;
由其子类去完成更改此状态的方法。
为了将AQS这个类作为同步器的基础,其子类要重新定义如下几个方法。
tryAcquire(int)
tryRelease(int)
tryAcquireShared(int)
tryReleaseShared(int)
isHeldExclusively()
而这5个方法主要是调用AQS的如下三个方法以原子方式去获取或者更新state值。
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
//如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
可以看到,从头到尾都没有synchronized关键字,没有使用java中任何锁的机制,而代码能保证多线程时具有线程安全性,所依靠的是Unsafe中的compareAndSwapInt方法。我们最开始提到过,在java中加减法操作不是原子性的,而compareAndSwapInt又是如何保证原子性的?
我们看一下Unsafe中这个方法的定义:
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
var2为变量的值
var4为预期的值
var5为更新的值
可以看到该方法是用native修饰的,即不是通过Java实现的,其借助C语言调用底层的CPU特殊指令实现的,即CAS(CompareAndSwap)指令。
CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。
指令的操作过程如下:首先,CPU 会将内存中将要被更改的数据与期望的值做比较。当这两个值相等时,说明没有其他线程修改该值,当前线程可以修改,CPU 才会将内存中的数值替换为新的值。否则说明其他线程已经修改过,不执行更新操作,但可以选择重新读取变量值再尝试修改或者直接放弃操作。原理图如下:
CAS虽然是很高效的解决原子操作,但也存在一些问题。
ABA问题。即值原来是A,然后变成B,又变成A,那么使用CAS检查是其值没有发生变化,但实际上值已经被修改过了。可以采用版本号解决该问题,如利用类AtomicStampedReference。
如果自旋CAS如果长时间不成功,循环时间开销大,会给CPU带来非常大的执行开销,此外程序测试相对复杂。
加锁
有CAS的基础知识后,了解了Lock原子性的实现,我们再来看ReentrantLock具体的代码。
默认情况下,创建的ReentrantLock是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
公平锁中线程获取锁的顺序和调用lock的顺序医院,FIFO;但为了保证线程规规矩矩排队,需要增加阻塞和唤醒的时间开销,故基于性能考虑,默认情况是非公平锁。
在ReentrantLockDemo的例子中,如果Spender和Earner线程同时调用subtractAmount和addAmount方法,进而同时调用lock.lock(),即调用NonfairSync中的方法:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
则两个线程同时尝试去将AbstractQueuedSynchronizer类的state修改为1。如果Spender更改成功,compareAndSetState(0, 1)返回为true,则Spender线程获得该锁,完成后state修改为1,并setExclusiveOwnerThread将线程记录为独占锁的线程,然后执行了相关业务操作。
而失败的Earner线程执行acquire(1);
AbstractQueuedSynchronizer中acquire()代码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
AQS的tryAcquire不能直接调用,因为是否获取锁成功是由子类决定的,直接看ReentrantLock中内部类NonfairSync的tryAcquire的实现。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//首次获得
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error(“Maximum lock count exceeded”);
setState(nextc);
return true;
}
return false;
}
如果Spender线程获得该锁(包括重入),通过CAS将state为1,返回为true。如果通过CAS抢占失败,也要去排队。执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),加入等待队列中。加入前将线程封装成Node对象,Node主要有三个成员变量:
volatile Node prev;
volatile Node next;
volatile Thread thread;
每个Node维护了线程、前后Node的指针和等待状态等参数。Node需要标记是独占的还是共享的,由传入的mode决定,ReentrantLock自然是使用独占模式Node.EXCLUSIVE。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
如果队列为空,即pred为null,则调用enq(Node node)方法。否则将node包装完成后,调用compareAndSetTail将该node添加到队列最后。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq()方法中如果队列为空,先为head创建空node,然后调用compareAndSetHead方法设置到队列头部。后续调用compareAndSetTail将node设置为尾部。外部循环执行,直到设置成功。
线程加入队列后,下一步是调用acquireQueued阻塞线程。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
其中方法shouldParkAfterFailedAcquire传入当前节点和前节点,根据前节点的状态,判断线程是否需要阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
判断逻辑如下:
前节点状态是SIGNAL时,当前线程需要阻塞;
前节点状态是CANCELLED时,通过循环将当前节点之前所有取消状态的节点移出队列;
前节点状态是其他状态时,需要设置前节点为SIGNAL。
总结一下获取锁的过程:线程去竞争一个锁,可能成功也可能失败。成功就直接持有资源,不需要进入队列;失败的话进入队列阻塞,等待唤醒后再尝试竞争锁。
释放锁
释放锁的过程如下:先将state减1,再通知后面的节点获取锁。
public void unlock() {
sync.release(1);
}
release方法先调用tryRelease将state减1,直到为0时将独占线程变量设置为空。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
设置成功后,调用unparkSuccessor将头节点的下个节点唤醒:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
读写锁
基础属性
ReadWriteLock维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 writer,读取锁可以由多个 reader 线程同时保持。写入锁是独占的。
与互斥锁相比,读-写锁允许对共享数据进行更高级别的并发访问,尤其当数据的读取远大于数据的写入时。
读写锁的互斥机制:
“读-读” 不互斥
“读-写” 互斥
“写-写” 互斥
ReentrantReadWriteLock是ReentrantReadWriteLock的实现,该类具有以下属性:
获取顺序
非公平模式下,不指定读写锁的顺序,可能会推迟一个或者多个reader或者writer线程,但是吞吐量通常高于公平锁。
公平模式下,线程利用一个近似到达顺序的策略来争夺进入。当释放当前保持的锁时,可以为等待时间最长的单个 writer 线程分配写入锁,如果有一组等待时间大于所有正在等待的 writer 线程 的 reader 线程,将为该组分配写入锁。
重入
此锁允许 reader 和 writer 按照 ReentrantLock 的样式重新获取读取锁或写入锁。在写入线程保持的所有写入锁都已经释放后,才允许重入 reader 使用它们。
此外,writer 可以获取读取锁,但反过来则不成立。在其他应用程序中,当在调用或回调那些在读取锁状态下执行读取操作的方法期间保持写入锁时,重入很有用。如果 reader 试图获取写入锁,那么将永远不会获得成功。
锁降级
重入还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。
中断
读取锁和写入锁都支持锁获取期间的中断。
Condition
写入锁提供了一个 Condition 实现,对于写入锁来说,该实现的行为与 ReentrantLock.newCondition() 提供的 Condition 实现对 ReentrantLock 所做的行为相同。当然,此 Condition 只能用于写入锁。
读取锁不支持 Condition,readLock().newCondition() 会抛出 UnsupportedOperationException。
使用示例
在我们在开始的账户余额管理例子中,针对的是一个账户进行更改,并且基本上都是修改操作。假设我们现在有不同用户的账户需要管理,同时有读取和修改操作,同时读取的线程数较多,这样我们则可以考虑用读写锁来实现。
在类WriteReadAccountMap中我们有一个成员变量balanceMap去存储不同账户的余额,同时有ReentrantReadWriteLock实例lock去负责读写时的并发控制:
class WriteReadAccountMap {
Logger logger = LoggerFactory.getLogger(WriteReadAccountMap.class);
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
private Map<String, Double> balanceMap = new TreeMap<String, Double>();
public Double getBalance(String key) {
logger.info("{} get readLock", Thread.currentThread().getName());
readLock.lock();
try {
Double cacheValue = balanceMap.get(key);
if(cacheValue==null){
cacheValue=0D;
}
logger.info("{} cacheValue {}", Thread.currentThread().getName(), cacheValue);
return cacheValue;
} finally {
readLock.unlock();
logger.info("{} unlock readLock", Thread.currentThread().getName());
}
}
public Double setBalance(String key, Double value) {
writeLock.lock();
try {
return balanceMap.put(key, value);
} finally {
writeLock.unlock();
}
}
public Map getMap() {
readLock.lock();
try {
return balanceMap;
} finally {
readLock.unlock();
}
}
public void addAmount(String key, Double value) {
writeLock.lock();
logger.info("{} get writeLock", Thread.currentThread().getName());
readLock.lock();
logger.info("{} get readLock", Thread.currentThread().getName());
Double cacheValue = balanceMap.get(key);
if (cacheValue == null) {
cacheValue = 0D;
}
Double newValue = cacheValue + value;
logger.info("{} cacheValue={},add value={},newValue={}", Thread.currentThread().getName(), cacheValue, value, newValue);
readLock.unlock();
logger.info("{} unlock readLock", Thread.currentThread().getName());
balanceMap.put(key, newValue);
writeLock.unlock();
logger.info("{} unlock writeLock", Thread.currentThread().getName());
}
public void subtractAmount(String key, Double value) {
writeLock.lock();
readLock.lock();
Double cacheValue = balanceMap.get(key);
if (cacheValue == null) {
cacheValue = 0D;
}
Double newValue = cacheValue - value;
readLock.unlock();
balanceMap.put(key, newValue);
writeLock.unlock();
}
public void clear() {
writeLock.lock();
try {
balanceMap.clear();
} finally {
writeLock.unlock();
}
}
}
可以看到addAmount方法中由于我们要修改账户余额,我们先获得写锁,同时我们也有获得账户缓存余额,然后锁降级,在写锁释放前获得读锁,得到缓存值和新值相加的结果后,释放读锁,最后写入缓存Map,释放写锁。
读写线程很简单,Writer线程负责增加my账户的余额,Reader线程负责读取my账户的余额:
class Reader implements Runnable {
Logger logger = LoggerFactory.getLogger(Reader.class);
private WriteReadAccountMap accountMap;
public Reader(WriteReadAccountMap accountMap) {
this.accountMap = accountMap;
}
@Override
public void run() {
Double accountMapBalance = accountMap.getBalance("my");
}
}
class Writer implements Runnable {
Logger logger = LoggerFactory.getLogger(Writer.class);
private WriteReadAccountMap accountMap;
public Writer(WriteReadAccountMap accountMap) {
this.accountMap = accountMap;
}
@Override
public void run() {
accountMap.addAmount("my", 10.0);
}
}
主程序创建10个Writer线程和30个Reader线程,去完成数据的读写,最后在main线程中获取my账户的余额。
public class ReentrantReadWriteLockDemo {
static Logger logger = LoggerFactory.getLogger(ReentrantReadWriteLockDemo.class);
public static void main(String[] args) throws InterruptedException {
WriteReadAccountMap account = new WriteReadAccountMap();
ExecutorService executorService = Executors.newFixedThreadPool(20);
for (int i = 0; i < 10; i++) {
Writer writer = new Writer(account);
executorService.submit(writer);
}
for (int i = 0; i < 30; i++) {
Reader reader = new Reader(account);
executorService.submit(reader);
}
Thread.sleep(5000);
executorService.shutdown();
logger.info(account.getBalance("my").toString());
}
}
程序部分运行结果如下:
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-1 get writeLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-1 get readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-11 get readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-12 get readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-13 get readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-1 cacheValue=0.0,add value=10.0,newValue=10.0
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-1 unlock readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-1 unlock writeLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-2 get writeLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-1 get readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-2 get readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-2 cacheValue=10.0,add value=10.0,newValue=20.0
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-2 unlock readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-2 unlock writeLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-2 get readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-3 get writeLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-3 get readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-3 cacheValue=20.0,add value=10.0,newValue=30.0
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-3 unlock readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-3 unlock writeLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-3 get readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-4 get writeLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-4 get readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-4 cacheValue=30.0,add value=10.0,newValue=40.0
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-4 unlock readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-4 unlock writeLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-5 get writeLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-5 get readLock
18/08/29 14:21:39 INFO concurrent.WriteReadAccountMap: pool-1-thread-5 cacheValue=40.0,add value=10.0,newValue=50.0
….
18/08/29 14:21:44 INFO concurrent.ReentrantReadWriteLockDemo: 100.0
从输出日志我们也能看到账户余额一次从0累加到100,符合我们的预期结果。
源码实现
下面简单我们分析一下ReentrantReadWriteLock的源码实现。
整体结构和锁状态维护
ReentrantReadWriteLock整体结构如下:
读写锁比ReentrantLock多出了两个内部类:ReadLock和WriteLock, 用来定义读锁和写锁,然后在构造函数中,会构造一个读锁和一个写锁实例保存到成员变量 readerLock 和 writerLock。
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
在互斥锁ReentrantLock中我们利用AQS中的state字段去维护锁的状态,但是读写锁ReentrantReadWriteLock也是利用AQS,而其是如何用一个变量去维护两个内部锁的状态的呢?
这里要看一下AQS的实现类Sync的定义,Sync有如下4个成员变量和两个静态方法:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/ Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/ Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
我们知道AQS中的state是int类型的,即32位的。
在读写锁中,读锁是共享的,写锁是独占的。从上面的读写锁用其高16位表示占用读锁的线程数,低16位表示写锁的状态。
获取共享锁数量时,调用sharedCount()方法,直接将state无符号右移16位,得到高16位的值。
获取独占锁数量时,调用exclusiveCount()方法,将其和65535(upper16=0000000000000000,lower16=1111111111111111)按位与运算后,高16位全部为0,后16位即state的低16位。
还不明白运算规则的可以用下面的这个例子去琢磨
package com.molyeo.java.concurrent;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
Created by zhangkh on 2018/8/29.
*/
public class ShiftOperationsDemo {
static Logger logger = LoggerFactory.getLogger(ShiftOperationsDemo.class);static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }public static void main(String[] args){
formatPrintInt(EXCLUSIVE_MASK); int state=851969; formatPrintInt(state); int sharedCount=sharedCount(state); logger.info("sharedCount={}",sharedCount); formatPrintInt(sharedCount); int exclusiveCount=exclusiveCount(state); logger.info("exclusiveCount={}",exclusiveCount); formatPrintInt(exclusiveCount);
}
public static void formatPrintInt(int num){
logger.info("num={},upper16={},lower16={}",num, StringUtils.leftPad(Integer.toBinaryString(num), 32, "0").substring(0,16),StringUtils.leftPad(Integer.toBinaryString(num), 32, "0").substring(16) );
}
}
线程本地变量
继续看Sync代码下面的部分,可以看到4个成员变量,这几个变量的作用主要是将获取读锁的线程放入线程本地变量,方便从整个上下文根据当前线程获取持有锁的次数信息。
private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
其中firstReader与firstReadHoldCount保存第一个获取读锁的线程的信息。
cachedHoldCounter用来缓存的是最后一个获取线程的HolderCount信息,该变量主要是在如果当前线程多次获取读锁时,减少从readHolds中获取HoldCounter的次数。
写锁的获取
写锁的获取通过调用WriteLock的lock()方法
public void lock() {
sync.acquire(1);
}
进而调用AbstractQueuedSynchronizer的acquire()方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
}
而tryAcquire()方法是由AQS的子类,即上面提到过的ReentrantReadWriteLock的内部Sync去实现
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
//1.即读锁或者写锁数量不为0
if (c != 0) {/** *2.w=0即没有写锁,而state不为0,故读锁被占用 *或者写锁被其他线程持有,返回false */ if (w == 0 || current != getExclusiveOwnerThread()) return false; //3.写锁计数将会越界,即大于65535,抛出异常,返回false if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //4.这里表示写锁不是0,并且当前线程持有锁,即重入,故修改写锁的数量即可 setState(c + acquires); return true;
}
//如果通过一次CAS去获取锁的时候失败,说明被别的线程占用,也返回false,排队去重试获取锁。
if (writerShouldBlock() ||!compareAndSetState(c, c + acquires)) return false;
setExclusiveOwnerThread(current);
return true;
}
如果tryAcquire方法返回为true,则尝试获取锁方法结束。如果该方法返回false,则进入到acquireQueue方法去排队获取写锁,排队获取写锁的过程和ReentrantLock的获取过程一样。
通过写锁的获取过程,我们知道
如果当前没有写锁或者读锁,第一个获取锁的线程会获取成功不管是获得写锁还是读锁
如果当前已经有了读锁,则获取写锁会失败,而获取读锁有可能成功也可能失败
如果当前已经有了写锁,则如果写锁的持有者为当前线程,由于锁可重入,则获取读锁或者写锁可成功;反之如果写锁的持有者不是当前线程,则失败。
写锁的释放
释放锁要做的事情是将AQS中的state值进行修改,并唤醒队列中的等待线程来获取锁。
WriteLock通过调用unlock()方法
public void unlock() {
sync.release(1);
}
Sync中的release方法如下
public final boolean release(int arg) {
if (tryRelease(arg)) {Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true;
}
return false;
}
Sync作为AQS的子类,自己要去实现tryRelease()方法
protected final boolean tryRelease(int releases) {
//如果写锁的持有者并非当前线程,抛出异常
if (!isHeldExclusively())throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
//如果释放锁后,写锁数据(state高16位)为0,则free返回为true,同时将AQS的写锁持有者线程置null,更改state的值。否则返回为false。
if (free)setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
如果tryRelease方法成功释放锁,则需要调用unparkSuccessor()方法去唤醒等待队列中线程,并修改状态值。
private void unparkSuccessor(Node node) {
/*- If status is negative (i.e., possibly needing signal) try
- to clear in anticipation of signalling. It is OK if this
fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);/*
- Thread to unpark is held in successor, which is normally
- just the next node. But if cancelled or apparently null,
- traverse backwards from tail to find the actual
- non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
}if (t.waitStatus <= 0) s = t;
if (s != null)
LockSupport.unpark(s.thread);
}
可以看到仅当写锁的持有者为当前线程,并且释放锁后的写锁数据为0才认为成功释放写锁。
读锁的获取
读锁的获取过程比写锁相对复杂一些,使用读锁时,先调用ReadLock的lock()方法
public void lock() {
sync.acquireShared(1);
}
使用AQS的共享模式
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared方法的实现还是在Sync中
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//如果写锁被其他线程持有,直接返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
//读锁线程不需要阻塞,并且获取读锁的线程数没有超过最大值,并且使用 CAS更新共享锁线程数量成功,则设置线程本地变量信息,否则调用fullTryAcquireShared()方法
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//r==0,表示当前线程为第一个获取读锁的线程,更新firstReader和firstReaderHoldCount
if (r == 0) {
//如果firstReader是当前线程,则更新firstReaderHoldCount,即读锁重入firstReader = current; firstReaderHoldCount = 1;
} else if (firstReader == current) {
//如果firstReader不是当前线程,则将该线程持有锁的次数信息放入线程本地变量中,方便整个请求上下文中使用firstReaderHoldCount++;
} else {
}HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++;
return 1;
}
return fullTryAcquireShared(current);
}
我们先看看fullTryAcquireShared方法
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
//如果当前线程不是写锁的持有者,返回-1,获取读锁失败。需要排队去申请读锁
if (exclusiveCount(c) != 0) {
//读线程需要阻塞,具体见readerShouldBlock的分析if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock.
} else if (readerShouldBlock()) {
}// Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { //读锁别其他线程占用,从readHolds中移除当前线程的持有数,返回-1,获取读锁失败。需要排队去申请读锁 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; }
if (sharedCount(c) == MAX_COUNT)
//尝试获取锁成功的话则更新readHolds内部变量throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
}if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1;
}
}
其中读锁或者是否需要阻塞,是区分公平锁和非公平锁的关键。以读锁为例,非公平锁NonfairSync的实现如下:
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
公平锁的实现如下
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
对于公平锁,只要队列中有线程在等待,那么将会返回true,也就意味着读线程需要阻塞;对于非公平锁,如果当前线程持有写锁外,还有其他线程已经排队在申请写锁,则只能结束本次申请读锁的请求,转而去排队。这样处理,主要是避免写锁无限期的饥饿(avoid indefinite writer starvation)。
一旦不阻塞,那么读线程将会有机会获得读锁。
完成尝试获取锁步骤 tryAcquireShared 方法后,我们再次回到 acquireShared,如果返回1则读锁获取成功。如果返回-1,则需要排队申请,具体请看 doAcquireShared(arg)
private void doAcquireShared(int arg) {
//根据当前线程构建Node,并添加到队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
}//获取节点的前置节点 final Node p = node.predecessor(); //如果前置节点是head时,再次尝试获取锁(因为head节点的初始化在第一次产生锁争用时初始化,刚开始初始化的head节点不代表线程,故可以再次尝试获取锁)。如果获取失败则进入到shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法中,线程阻塞,等待被唤醒。 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //如果锁获取成功,则设置head节点并传播 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
} finally {
if (failed)
}cancelAcquire(node);
}
所谓传播,只要获取成功到读锁,那就要传播到下一个节点。如果一下个节点继续是读锁的申请,只要成功获取,就再下一个节点,直到队列尾部或为写锁的申请,则停止传播。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); //如果读锁获取成功,或头部节点为空,或头节点取消,或刚获取读锁的线程的下一个节点为空,或在节点的下个节点也在申请读锁,则在CLH队列中传播下去唤醒线程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
}(h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared();
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
}int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS
if (h == head) // loop if head changed
}break;
}
读锁的释放
读锁的释放比较简单,调用ReadLock的unlock方法,进而调用Sync的releaseShared方法
public void unlock() {
sync.releaseShared(1);
}
releaseShared方法主要是释放减少读锁state的值,直到完全释放后,进行doReleaseShared()操作。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared()方法如下
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//主要是将当前线程所持有的锁的数量信息得到(从firstReader或cachedHoldCounter,或readHolds中获取 ),然后将数量减少1,如果持有数为1,则直接将该线程变量从readHolds ThreadLocal变量中移除,避免垃圾堆积。
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
elsefirstReader = null;
} else {firstReaderHoldCount--;
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
int count = rh.count;rh = readHolds.get();
if (count <= 1) {
}readHolds.remove(); if (count <= 0) throw unmatchedUnlockException();
–rh.count;
}
//在无限循环中将共享锁的数量逐步减少,直到该读锁state为0(低16位)。释放完成后去执行doReleaseShared方法。
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
}// Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0;
}
至此,ReentrantReadWriteLock锁的分析到此结束。
本文主要以一个账户余额管理的示例,引入锁的概念及相关实现,主要包括Java内存模型、同步块、重入锁ReentrantLock和读写锁ReentrantReadWriteLock的应用及源码实现。
本文参考
Java 7 Concurrency Cookbook
concurrency-modle-seven-week
java-concurrency
java-util-concurrent
java se 8 apidoc