首页>>后端>>java->ReentrantLock 源码分析

ReentrantLock 源码分析

时间:2023-12-06 本站 点击:0

ReentrantLock 特征

特点:

可重入

公平/非公平

可中断

支持条件等待

可设置锁超时

常用 API

java.util.concurrent.locks.ReentrantLock#lock 获取锁 java.util.concurrent.locks.ReentrantLock#tryLock(long, java.util.concurrent.TimeUnit) 支持设置获取锁的超时时间 java.util.concurrent.locks.ReentrantLock#lockInterruptibly 获取锁后允许可中断 java.util.concurrent.locks.ReentrantLock#newCondition 创建条件等待队列 java.util.concurrent.locks.ReentrantLock#unlock 解锁

使用例子:

public class ReentrantLockTest {    static ReentrantLock lock = new ReentrantLock(true);    static class ClientThread extends Thread {        @Override        public void run() {            System.out.println(Thread.currentThread() + "开始尝试获取锁");            lock.lock();            try {                System.out.println(Thread.currentThread() + "成功获取锁");                TimeUnit.SECONDS.sleep(5);            } catch (InterruptedException e) {                e.printStackTrace();            } finally {                lock.unlock();                System.out.println(Thread.currentThread() + "完成释放锁");            }        }    }    public static void main(String[] args) throws InterruptedException {        ClientThread t1 = new ClientThread();        ClientThread t2 = new ClientThread();        ClientThread t3 = new ClientThread();        t1.start();        t2.start();        t3.start();        TimeUnit.SECONDS.sleep(10);    }}

源码分析

获取锁

如果我使用下面的代码进行获取锁:

ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();

ReentrantLock 默认调用的就是非公平锁 调用栈: java.util.concurrent.locks.ReentrantLock#lock

java.util.concurrent.locks.ReentrantLock.NonfairSync#lock

final void lock() {  // 直接尝试加锁  if (compareAndSetState(0, 1))      setExclusiveOwnerThread(Thread.currentThread());  else      // 如果获取锁失败进入 AQS acquire 逻辑      acquire(1);}

如果 compareAndSetState(0, 1)能够直接执行成功,那么将直接结束方法的执行。 如果失败,那么就会调用acquire 方法如下:

public final void acquire(int arg) {  // tryAcquire(arg) 尝试获取锁  // acquireQueued 获取锁失败进行等待队列  if (!tryAcquire(arg) &&      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))      selfInterrupt();}

我们先看  tryAcquire方法: java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire

java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire

他会直接调用到 nonfairTryAcquire非公平锁的加锁逻辑 里面有两个逻辑:

如果当前状态无锁,直接尝试加锁,加锁成功返回 true

如果当前时锁重入,那么直接修改 AQS 锁状态共享变量值 state 等于 c + acquires, 加锁成功返回 ture

如果都不满足,那么返回加锁失败返回 false

// 非公平锁的逻辑// 如何理解插队, 这里的插队是当前队列中被唤醒的线程, 和当前加入的线程都可以被执行// 如果当前加入线程比队列中唤醒的线程先获取到锁, 就是插队现象final boolean nonfairTryAcquire(int acquires) {  final Thread current = Thread.currentThread();  int c = getState();  // 无锁状态, 尝试竞争  if (c == 0) {      if (compareAndSetState(0, acquires)) { //是否获取到锁          setExclusiveOwnerThread(current);          return true;      }  }  // 当前线程持有锁, state 计数 +1  else if (current == getExclusiveOwnerThread()) { //判断是否是重入      int nextc = c + acquires;      if (nextc < 0) // overflow          throw new Error("Maximum lock count exceeded");      setState(nextc);      return true;  }  return false;}

如果 **tryAcquire** 调用完成后是获取锁成功 **acquire**方法执行结束,最后代表 **lock** 方法执行结束。

获取锁失败进入同步队列

如果获取锁失败,那么就会执行 acquire代码后面段 if 逻辑的执行 acquireQueued(addWaiter(Node.**_EXCLUSIVE_**), arg) 这里其实可以分为两个方法来看

addWaiter(Node.**_EXCLUSIVE_**)

acquireQueued(xxx, arg)

按照执行顺序,我们先看 addWaiter(Node.**_EXCLUSIVE_**) 这里主要是入队的逻辑。 addWaiter:  java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter

private Node addWaiter(Node mode) {    // 将当前线程转换为 AQS Node 节点    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure    // 尝试直接加入到尾节点    Node pred = tail;    if (pred != null) {        // 当前节点的前驱节点指向尾节点        node.prev = pred;        // cas 修改 tail 节点,如果成功返回 node         if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }    // 如果失败,调用 enq    enq(node);    return node;}

enq是将当前节点插入队列,必要的时候会进行初始化

//将节点插入队列,必要时进行初始化。private Node enq(final Node node) {    for (;;) {        Node t = tail;        // 如果没有尾节点,那么需要进行初始化        if (t == null) { // Must initialize            if (compareAndSetHead(new Node()))                tail = head;        }         // 如果有尾节点/其实就是有头节点/已经被初始化,通过 CAS 入队        else {            node.prev = t;            if (compareAndSetTail(t, node)) {                t.next = node;                return t;            }        }    }}

前面我们看看完了,当前获取锁的线程当获取锁失败的时候,成功进入 AQS 队列,接下来我们继续看  acquireQueued又做了什么呢?

如果是队列头节点,会再次尝试获取锁

如果修改 java.util.concurrent.locks.AbstractQueuedSynchronizer.Node状态位

final boolean acquireQueued(final Node node, int arg) {  boolean failed = true;  try {      // 是否中断      boolean interrupted = false;      for (;;) {          // 获取 node 的前驱节点          final Node p = node.predecessor();          // 如果是头节点,再次尝试获取锁          if (p == head && tryAcquire(arg)) {              // 将 node 设置为 头节点              setHead(node);              p.next = null; // help GC              failed = false;              return interrupted;          }          // 判断是否需要进行阻塞当前线程          if (shouldParkAfterFailedAcquire(p, node) &&              // 阻塞线程              parkAndCheckInterrupt())              interrupted = true;      }  } finally {      // 是否失败      if (failed)          // 如果失败,取消获取锁          cancelAcquire(node);  }}

上面我们可以看到,for (;;)中有两个判断

如果是头节点,就调用tryAcquire尝试获取锁 (之前我们已经分析过 tryAcquire 了,我们主要看后面个 if )

如果不是就进入 shouldParkAfterFailedAcquire 方法

在调用 acquireQueued这个过程中可能调用多次 shouldParkAfterFailedAcquire 方法。shouldParkAfterFailedAcquire 会执行一下几个操作。

可以用来修改当前节点的状态,

和对链表上无效的节点出队

/**

当获取锁失败后, 检查更新新节点状态如果是需要阻塞返回, true

一个前继节点 waitStatus = 0, 第一次将继续设置为 SIGNAL, 告诉当前线程准备进入阻塞, 此时依旧获取不到, 当前线程进入阻塞

@param pred 前继节点

@param node 当前节点

@return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前继节点的状态, 第一次进入的话, 一定是 0 if (ws == Node.SIGNAL) return true; if (ws > 0) { do { // 出队, 剔除无效的节点 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 第一次进来, pred.waitStatus = 0 执行这个分支 // 将前继节点的状态修改为 SIGNAL, 表示 pred.next 节点需要被唤醒(此时准备进入阻塞, 但是还未被阻塞, 再次获取锁失败之后才会被阻塞) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

当 Node 被修改 `Node.SIGNAL`状态后,第一个 if 返回 true , 我们再次回到 `acquireQueued` 方法,就会执行 `parkAndCheckInterrupt` 方法,就是将当前的线程 park 然后返回当前线程的中断状态。```javaprivate final boolean parkAndCheckInterrupt() { // 阻塞线程 LockSupport.park(this); // 返回线程中断状态 return Thread.interrupted();}

注意:这里线程 park 过后,其实获取锁就结束了前半段的操作,完成同步队列的入队,并且进入等待。我们就需要等待解锁唤醒。

释放锁

释放锁的代码如下:

ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();0

释放锁做了什么呢?

释放当前锁的状态

在 AQS 队列中去唤醒排队的头节点

调用栈如下: java.util.concurrent.locks.ReentrantLock#unlock

java.util.concurrent.locks.AbstractQueuedSynchronizer#release

我们可以从 release方法开始

ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();1

释放锁,主要是调用 tryRelease, 首先就是考虑之前的重入问题,直接对 state 进行 -1 ,然后如果 c == 0表示当前线程不再持有锁,我们就可以修改 ownerThread == null . 这个时候,最后修改 state 为新值。

ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();2

释放锁成功后,再次回到 release方法,会再次判断,如果 AQS 队列不为空,那么就进行排队线程唤醒。 主要是调用 java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor

ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();3

其实这里最关键的就是 LockSupport.unpark(s.thread); 这里就会回到 acquireQueued,执行唤醒后强锁的逻辑,依然在 acquireQueued里面。

释放锁后唤醒等待节点

当前节点被唤醒逻辑,首先会在 shouldParkAfterFailedAcquire 方法中出队,然后尝试加锁如果加锁成功就返回 true.

ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();4

再次竞争锁,主要是在acquireQueued方法中调用 tryAcquire方法进行获取锁。如果获取锁失败,就又再次获取锁,如果获取锁成功返回。

测试和实践

支持锁中断

如果通过 _lock_.lockInterruptibly(); 方式加锁,如果当前线程出现中断过后,会抛出 java.lang.InterruptedException线程中断异常,所以 ReentrantLock支持可中断。 相关源码:

/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */private final boolean parkAndCheckInterrupt() {    // LockSupport.park 会清除中断信号    LockSupport.park(this);    return Thread.interrupted();}// private void doAcquireInterruptibly(int arg)    throws InterruptedException {    final Node node = addWaiter(Node.EXCLUSIVE);    boolean failed = true;    try {        for (;;) {            final Node p = node.predecessor();            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null; // help GC                failed = false;                return;            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                // 抛出中断异常                throw new InterruptedException();        }    } finally {        if (failed)            cancelAcquire(node);    }}

实验代码:

public class ReentrantLockTest {    static ReentrantLock lock = new ReentrantLock(true);    static class ClientThread implements Runnable {        @SneakyThrows        @Override        public void run() {            System.out.println(Thread.currentThread() + "开始尝试获取锁");            lock.lockInterruptibly();            try {                System.out.println(Thread.currentThread() + "成功获取锁");                TimeUnit.SECONDS.sleep(5);            } catch (InterruptedException e) {                e.printStackTrace();            } finally {                ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();0                System.out.println(Thread.currentThread() + "完成释放锁");            }        }    }    public static void main(String[] args) throws InterruptedException {        Thread t1 = new Thread(new ClientThread(), "t1");        Thread t2 = new Thread(new ClientThread(), "t2");        Thread t3 = new Thread(new ClientThread(), "t3");        t1.start();        t2.start();        // 锁中断        //lock.lockInterruptibly();        TimeUnit.SECONDS.sleep(1);        t3.start();        TimeUnit.SECONDS.sleep(1);        t3.interrupt();        TimeUnit.SECONDS.sleep(10);    }}

输出结果:

ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();7

获取锁设置超时

lock.tryLock(2, TimeUnit.SECONDS)可以支持设置获取锁的超时时间,可以有效的避免线程饥饿问题 测试代码:

public class ReentrantLockTryTest {    static ReentrantLock lock = new ReentrantLock(true);    static class ClientThread implements Runnable {        @Override        public void run() {            System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t开始尝试获取锁");            try {                if (lock.tryLock(2, TimeUnit.SECONDS)) {                    System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t获取锁成功");                    TimeUnit.SECONDS.sleep(5);                } else {                    System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t获取锁失败");                }            } catch (InterruptedException e) {                e.printStackTrace();            } finally {                if (lock.isHeldByCurrentThread() && lock.isLocked()) {                    ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();0                    System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t完成释放锁");                }            }        }    }    public static void main(String[] args) throws InterruptedException {        Thread t1 = new Thread(new ClientThread(), "t1");        Thread t2 = new Thread(new ClientThread(), "t2");        Thread t3 = new Thread(new ClientThread(), "t3");        t1.start();        t2.start();        t3.start();        //t1.interrupt();        TimeUnit.SECONDS.sleep(20);    }}

输出结果

ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();9

条件等待队列使用

Condition 是在 java 1.5 中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用 Condition,阻塞队列实际上是使用了 Condition 来模拟线程间协作。 Condition 是个接口,基本的方法就是 await() 和 signal() 方法; Condition 依赖于 Lock 接口,生成一个 Condition 的基本代码是 lock.newCondition() 调用 Condition 的 await() 和 signal() 方法,都必须在 lock 保护之内,就是说必须在 lock.lock() 和 lock.unlock 之间才可以使用:

Conditon中的await()对应Object的wait();

Condition中的signal()对应Object的notify();

Condition中的signalAll()对应Object的notifyAll()。

测试场景: 下面一个场景,需要ABC3个线程,A线程打印1次,然后是B线程打印2次,再是C线程打印3次,线程交替打印。 ABC线程需要交替执行,我们需要控制,线程的执行先后顺序 我们可以使用多条件Condition来控制,每一个线程拥有一个condition对象,调用各种的await方法,可以使线程等待,然后让别的线程调用这个condition对象的signal方法,唤醒线程。 代码如下:

public class ReentrantLockConditionTest {    private int data = 1;    private Lock lock = new ReentrantLock();    Condition condition1 = lock.newCondition();    Condition condition2 = lock.newCondition();    Condition condition3 = lock.newCondition();    public void printA() {        lock.lock();        try {            while (data != 1) {                condition1.await();            }            // 打印5次            for (int i = 0; i < 5; i++) {                System.out.println(Thread.currentThread().getName() + " ->" + data);            }            data = 2;            // 通知B线程            condition2.signal();        } catch (Exception e) {            e.printStackTrace();        } finally {            ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();0        }    }    public void printB() {        lock.lock();        try {            while (data != 2) {                condition2.await();            }            // 打印10次            for (int i = 0; i < 10; i++) {                System.out.println(Thread.currentThread().getName() + " ->" + data);            }            data = 3;            // 通知C            condition3.signal();        } catch (Exception e) {            e.printStackTrace();        } finally {            ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();0        }    }    public void printC() {        lock.lock();        try {            while (data != 3) {                condition3.await();            }            // 打印15次            for (int i = 0; i < 15; i++) {                System.out.println(Thread.currentThread().getName() + " ->" + data);            }            data = 1;            // 通知A            condition1.signal();        } catch (Exception e) {            e.printStackTrace();        } finally {            ReentrantLock lock = new ReentrantLock();lock.lock();lock.unlock();0        }    }    public static void main(String[] args) throws InterruptedException {        ReentrantLockConditionTest conditionTest = new ReentrantLockConditionTest();        // A,B,C 交替执行        new Thread(conditionTest::printA, "A").start();        new Thread(conditionTest::printB, "B").start();        new Thread(conditionTest::printC, "C").start();    }}

输出结果如下:

final void lock() {  // 直接尝试加锁  if (compareAndSetState(0, 1))      setExclusiveOwnerThread(Thread.currentThread());  else      // 如果获取锁失败进入 AQS acquire 逻辑      acquire(1);}1
原文:https://juejin.cn/post/7101956094137729031


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/15946.html