从InterruptedException深入理解

从InterruptedException深入理解AtomicReference的方方面面






前言

在近期的测试中遇到了一个java.lang.InterruptedException的异常,StackTrace如下:

java.lang.InterruptedException
    at java.base/java.lang.Object.wait(Native Method)
    at java.base/java.lang.Object.wait(Object.java:328)
    at bt.torrent.messaging.MetadataConsumer.waitForTorrent(MetadataConsumer.java:265)
    at bt.processor.magnet.FetchMetadataStage.doExecute(FetchMetadataStage.java:92)
    at bt.processor.magnet.FetchMetadataStage.doExecute(FetchMetadataStage.java:42)
    at bt.processor.TerminateOnErrorProcessingStage.doExecute(TerminateOnErrorProcessingStage.java:38)
    at bt.processor.RoutingProcessingStage.execute(RoutingProcessingStage.java:39)
    at bt.processor.ChainProcessor.doExecute(ChainProcessor.java:112)
    at bt.processor.ChainProcessor.executeStage(ChainProcessor.java:96)
    at bt.processor.ChainProcessor.executeStage(ChainProcessor.java:98)
    at bt.processor.ChainProcessor.lambda$process$0(ChainProcessor.java:81)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1736)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
debug定位到抛出异常的源码如下:

其中torrent.wait();的torrent的定义为private final AtomicReference<Torrent> torrent;。因此,本文将结合此次问题的排查过程,深入理解AtomicReference的方方面面。在文章最后也会将遇到的问题排查过程进行一个简短的分析和总结。

希望能够帮助大家更好的理解AtomicReference。

原子性

介绍AtomicReference之前,首先需要理解的一个关键概念:原子性。让我们先来看一个简单的例子:

当程序更新一个变量时,如果多线程同时更新这个变量,可能得到期望之外的值,比如变量i=1,A线程更新i+1,B线程也更新i+1,经过两个线程操作之后可能i并不等于3,而是等于2。

因为A和B线程在更新变量i的时候拿到的i都是1,这就是线程不安全的更新操作,一般情况下我们会使用synchronized来解决这个问题,synchronized会保证多线程不会同时更新变量i。

从JDK1.5开始Java提供了java.util.concurrent.atomic包,在这个包中为原子操作类提供了一种用法简单、性能高效、线程安全地更新一个变量的方法。本文中着重介绍的AtomicReference就是其中一个典型的代表。

AtomicReference

首先看下 JDK 中对AtomicReference的定义:

An object reference that may be updated atomically. See the java.util.concurrent.atomic package specification for description of the properties of atomic variables.

JavaDoc:docs.oracle.com/javase/8/do…

AtomicReference的作用是对对象进行原子操作。它提供了一种读和写都是原子性的对象引用变量。

同时,原子性意味着多个线程试图改变同一个 AtomicReference(例如比较和交换操作)将不会使得 AtomicReference 处于不一致的状态。

atomic 包

提到AtomicReference就一定会提到原子包:java.util.concurrent.atomic,在这里 JDK 提供了众多的针对原子操作的类。包括:

  • Number

    • AtomicInteger
      • 提供对int类型的原子操作,是对基本类型封装的一个代表;
    • AtomicBoolean
      • 提供对boolean类型的原子操作
    • AtomicLong
      • 提供对long类型的原子操作
    • DoubleAdder
      • 一个或多个变量一起保持最初为零的总和为double 。
      • 当跨线程争用更新(方法add(double) )时,变量集可以动态增长以减少争用。
      • 方法sum() (或等效地, doubleValue() )返回保持总和的变量的当前总和。
      • 线程内或线程之间的累积顺序无法保证。
      • 因此,如果需要数值稳定性,则该类可能不适用,尤其是在组合基本上不同数量级的值时。
    • LongAdder
      • 一个或多个变量,它们共同维持最初的零long总和。
      • 当跨线程争用更新(方法add(long) )时,变量集可以动态增长以减少争用。
      • 方法sum() (或等效地, longValue() )返回保持总和的变量的当前总和。
      • 当多个线程更新用于收集统计信息但不用于细粒度同步控制的目的的公共和时,此类通常优于AtomicLong 。
      • 在低更新争用下,这两个类具有相似的特征。 但在高争用的情况下,这一类的预期吞吐量明显更高,但代价是空间消耗更高。
    • Striped64
    • DoubleAccumulator
      • 使用提供的函数一起维护运行的double值的一个或多个变量。
      • 当更新(方法accumulate(double) )跨线程争用时,变量集可以动态增长以减少争用。
      • 方法get() (或等效地, doubleValue() )返回维护更新的变量的当前值。
      • 当多个线程更新公共值时,此类通常优于备选方案,该公共值用于诸如经常更新但读取频率较低的摘要统计信息之类的目的。
      • 提供的累加器功能应该是无副作用的,因为当尝试的更新由于线程之间的争用而失败时可以重新应用它。
      • 对于可预测的结果,累加器函数应该在使用上下文中所需的浮点容差内是可交换的和关联的。
      • 该函数应用现有值(或标识)作为一个参数,给定更新作为另一个参数。 例如,要保持运行的最大值,您可以提供Double::max以及Double.NEGATIVE_INFINITY作为标识。
      • 线程内或线程之间的累积顺序无法保证。 因此,如果需要数值稳定性,则该类可能不适用,尤其是在组合基本上不同数量级的值时。
    • LongAccumulator
      • 使用提供的函数一起维护运行的long值的一个或多个变量。 当跨线程争用更新(方法accumulate(long) )时,变量集可以动态增长以减少争用。 方法get() (或等效地, longValue() )返回维护更新的变量的当前值。
      • 当多个线程更新用于收集统计信息但不用于细粒度同步控制的目的的公共值时,此类通常优于AtomicLong 。 在低更新争用下,这两个类具有相似的特征。 但在高争用的情况下,这一类的预期吞吐量明显更高,但代价是空间消耗更高。
      • 线程内或线程之间的累积顺序无法保证且不能依赖,因此该类仅适用于累积顺序无关紧要的函数。 提供的累加器功能应该是无副作用的,因为当尝试的更新由于线程之间的争用而失败时可以重新应用它。 对于可预测的结果,累加器函数应该是关联的和可交换的。 该函数应用现有值(或标识)作为一个参数,给定更新作为另一个参数。 例如,要保持运行的最大值,您可以提供Long::max以及Long.MIN_VALUE作为标识。
      • 类LongAdder提供了此类功能的类比,用于维护计数和总和的常见特殊情况。 电话new LongAdder()相当于new LongAccumulator((x, y) -> x + y, 0L) 。
    • AtomicIntegerFieldUpdater
      • 原子地更新一个对象的int类型的field,这个int类型的field必须是被volatile修饰的;
      • 基于反射的实用程序,可以对指定类的指定volatile int字段进行原子更新。 此类设计用于原子数据结构,其中同一节点的多个字段独立地受原子更新的影响。
      • 请注意, compareAndSet方法的保证比其他原子类弱。 因为此类无法确保该字段的所有使用都适用于原子访问的目的,所以它只能保证在同一更新程序上对compareAndSet和set其他调用的原子性。
    • AtomicLongFieldUpdater
      • 原子地更新一个对象的long类型的field,这个long类型的field必须是被volatile修饰的;
      • 基于反射的实用程序,可以对指定类的指定volatile long字段进行原子更新。 此类设计用于原子数据结构,其中同一节点的多个字段独立地受原子更新的影响。
      • 请注意, compareAndSet方法的保证比其他原子类弱。 由于此类无法确保该字段的所有使用都适用于原子访问,因此只能在同一更新程序上对compareAndSet和set其他调用保证原子性。
  • Array

    • AtomicIntegerArray
      • 提供对int数组的原子操作,可以原子地更新数组里的某个index上的值;
    • AtomicLongArray
      • 提供对long数组的原子操作,可以原子地更新数组里的某个index上的值;
  • Reference

    • AtomicReference
      • 原子更新引用类型,提供对引用类型的原子操作
      • 但是并不是说可以原子地操作引用的对象里的字段,可以将引用原子地指向两一个对象;
    • AtomicReferenceArray
      • 原子地更新一个reference类型的数组。
    • AtomicReferenceFieldUpdater
      • 原子更新引用类型里的字段
    • AtomicMarkableReference
      • 原子更新带有标记位的引用类型。
      • 需要传入一个reference和一个boolean类型的标志位,可以原子地更新reference和标志位;
    • AtomicStampedReference
      • 与AtomicMarkableReference类似,不过把标志位换成了一个int值,原子的更新reference和int值;

继承自Number的原子类,如AtomicInteger,只能更新一个变量,如果要原子更新多个变量,就需要使用Reference等原子更新的引用类型提供的类来完成了。

这里只是简单介绍些每个类的作用,关于java.util.concurrent.atomic包并不是本文的重点,有兴趣的话,可以通过翻阅JDK源码进一步的了解。

源码

package java.util.concurrent.atomic; import java.util.function.UnaryOperator; import java.util.function.BinaryOperator; import sun.misc.Unsafe; public class AtomicReference<V> implements java.io.Serializable { private static final long serialVersionUID = -184****965231344442L; // 空歌白石:基于Unsafe实现原子操作 private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try {
            valueOffset = unsafe.objectFieldOffset(AtomicReference.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    } // 空歌白石:基于volatile实现原子操作 private volatile V value; public AtomicReference(V initialValue) {
        value = initialValue;
    } public AtomicReference() {
    } // 空歌白石:基于volatile实现原子操作 public final V get() { return value;
    } public final void set(V newValue) {
        value = newValue;
    } // 空歌白石:基于Unsafe实现原子操作 public final void lazySet(V newValue) {
        unsafe.putOrderedObject(this, valueOffset, newValue);
    } // 空歌白石:基于Unsafe实现原子操作 public final boolean compareAndSet(V expect, V update) { return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    } // 空歌白石:基于Unsafe实现原子操作 public final boolean weakCompareAndSet(V expect, V update) { return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    } // 空歌白石:基于Unsafe实现原子操作 @SuppressWarnings("unchecked") public final V getAndSet(V newValue) { return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
    } public final V getAndUpdate(UnaryOperator<V> updateFunction) {
        V prev, next; do {
            prev = get();
            next = updateFunction.apply(prev);
        } while (!compareAndSet(prev, next)); return prev;
    } public final V updateAndGet(UnaryOperator<V> updateFunction) {
        V prev, next; do {
            prev = get();
            next = updateFunction.apply(prev);
        } while (!compareAndSet(prev, next)); return next;
    } public final V getAndAccumulate(V x, BinaryOperator<V> accumulatorFunction) {
        V prev, next; do {
            prev = get();
            next = accumulatorFunction.apply(prev, x);
        } while (!compareAndSet(prev, next)); return prev;
    } public final V accumulateAndGet(V x, BinaryOperator<V> accumulatorFunction) {
        V prev, next; do {
            prev = get();
            next = accumulatorFunction.apply(prev, x);
        } while (!compareAndSet(prev, next)); return next;
    } public String toString() { return String.valueOf(get());
    }
}

原子操作

通过AtomicReference的源码可以看出,AtomicReference是基于volatile和sun.misc.Unsafe来实现对于引用的原子操作的。

volatile

一旦一个共享变量(类的成员变量、类的静态成员变量)被 volatile 修饰之后,那么就具备了两层语义:

  1. 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
  2. 禁止进行指令重排序。

sun.misc.Unsafe

通过上文可以看出一点,即java.util.concurrent.atomic包中的类基本上都是使用sun.misc.Unsafe实现的包装类,换句话说,java.util.concurrent.atomic是对sun.misc.Unsafe的封装。那么sun.misc.Unsafe就是何方神圣呢?

简单讲一下这个类。Java无法直接访问底层操作系统,而是通过本地(native)方法来访问。不过尽管如此,JVM还是开了一个后门,JDK中有一个类Unsafe,它提供了硬件级别的原子操作。

如果对优秀开源项目有过深入分析研究的同学应该对sun.misc.Unsafe并不陌生,为了追求极致性能,绝大部分的高性能框架或项目基本上都离不开sun.misc.Unsafe类,Unsafe并不是说这个类中的方法不安全,而是说想要使用好sun.misc.Unsafe并不容易,需要对sun.misc.Unsafe和底层原理有深刻的理解,因此并不推荐在一般项目中直接使用,这也从一个侧面说明了为何Unsafe类并不是在java包下,而是在sun包下。

这个类尽管里面的方法都是public的,但是并没有办法使用它们,JDK API文档也没有提供任何关于这个类的方法的解释。总而言之,对于Unsafe类的使用都是受限制的,只有授信的代码才能获得该类的实例,当然JDK库里面的类是可以随意使用的。

Unsafe提供了硬件级别的操作,比如说获取某个属性在内存中的位置,比如说修改对象的字段值,即使它是私有的。不过Java本身就是为了屏蔽底层的差异,对于一般的开发而言也很少会有这样的需求。正因为Unsafe的硬件级别的操作,使得Unsafe的性能极高,在追求高性能的场景下使用极为广泛。

CAS

CAS,Compare and Swap即比较并交换,设计并发算法时常用到的一种技术,java.util.concurrent包完全建立在CAS之上,没有CAS也就没有此包,在上文的AtmoicReference中也可以看到众多的compareAndSwap方法。

此外,当前的处理器基本都支持CAS,只不过不同的厂家的实现不一样罢了。CAS有三个操作数:

  1. 内存值V
  2. 旧的预期值A
  3. 要修改的值B

当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做并返回false。

使用CAS的好处就是不需要使用传统的加锁方式保证线程安全,而是依赖于CAS的忙等算法,依赖于底层硬件实现来保证线程安全。相对于其他锁的实现没有现场切换和阻塞,也就没有了额外的开销,并且可以支持较大的并发性。(当然CAS也有一个缺点就是忙等,如果一直没有获取到将会处于死循环中。)

AtomicReference的使用

AtomicReference的初始化过程。

// 空歌白石:在获取和验证元数据后立即设置原子集 private final AtomicReference<Torrent> torrent; public MetadataConsumer(IMetadataService metadataService,
                        TorrentId torrentId,
                        Config config,
                        EventSource eventSource) { // 空歌白石:略去无关代码 this.torrent = new AtomicReference<>(); // 空歌白石:略去无关代码 }

以下代码为实际开发中AtomicReference的代码:

private void processMetadataBlock(int pieceIndex, int totalSize, byte[] data) { // 空歌白石:略去无关代码 synchronized (torrent) {
        torrent.set(fetchedTorrent);
        states.clear();
        torrent.notifyAll();
    } // 空歌白石:略去无关代码 } @Produces public void produce(Consumer<Message> messageConsumer, MessageContext context) { // 空歌白石:如果元数据已经被获取,则停止到此。 if (torrent.get() != null) { return;
    } // 空歌白石:略去无关代码 } /**
* 空歌白石:如果Torrent还没有被获取,便阻塞调用线程,等待获取到Torrent。
*/ public Torrent waitForTorrent() { while (torrent.get() == null) { synchronized (torrent) { if (torrent.get() == null) { try {
                    torrent.wait();
                } catch (InterruptedException e) { throw new RuntimeException(e);
                }
            }
        }
    } return torrent.get();
}

new AtomicReference<>() is null

为更好的展示原理,仅仅将最核心代码编写了一个demo,如下,可以看出AtomicReference的值是null,这是又为什么呢?


原因是以上代码仅仅是将new AtomicReference<>();,实际上并没有真正将具体的reference传递给AtomicReference,因此,看到的integer的值仍为null。

我们更换一种写法,在创建AtomicReference时,随即将引用对象赋值,代码如下:

// 创建一个对象 String str = "testAtomicReference"; // 将引用传给AtomicReference对象 AtomicReference<String> atomicReference = new AtomicReference<String>(str); // 获取到AtomicReference中的引用对象 String value = atomicReference.get(); // 创建一个新的对象 String newStr = "new testAtomicReference"; // compareAndSet方***比较当前AtomicReference对象中的引用的是否是str;如果是,则会更新为newStr,如果不是,则不会更新。 boolean exchanged = atomicReference.compareAndSet(str, newStr);
复制代码

当new AtomicReference<String>(str);时,实际只是将str的引用传递给了AtomicReference,因此debug看到的仍然为null,实际在get方法是可以获取到具体的value,通过debug可以看出符合我们的预期,如下图:


java.lang.Object

在上述waitForTorrent方法中,会使用到torrent.wait()方法,wait方法是在Object类中定义的。对于Object类相信大家都并不陌生,特别是其中的equals、hashCode、toString等方法,但是如果不涉及到原子性操作或多线程开发,对于wait、notify、notifyAll等方法可能使用的并不多。

package java.lang; import jdk.internal.HotSpotIntrinsicCandidate; public class Object { private static native void registerNatives(); static {
        registerNatives();
    } @HotSpotIntrinsicCandidate public Object() {} @HotSpotIntrinsicCandidate public final native Class<?> getClass(); @HotSpotIntrinsicCandidate public native int hashCode(); public boolean equals(Object obj) { return (this == obj);
    } @HotSpotIntrinsicCandidate protected native Object clone() throws CloneNotSupportedException; public String toString() { return getClass().getName() + "@" + Integer.toHexString(hashCode());
    } @HotSpotIntrinsicCandidate public final native void notify(); @HotSpotIntrinsicCandidate public final native void notifyAll(); public final void wait() throws InterruptedException {
        wait(0L);
    } public final native void wait(long timeoutMillis) throws InterruptedException; public final void wait(long timeoutMillis, int nanos) throws InterruptedException { if (timeoutMillis < 0) { throw new IllegalArgumentException("timeoutMillis value is negative");
        } if (nanos < 0 || nanos > 999999) { throw new IllegalArgumentException( "nanosecond timeout value out of range");
        } if (nanos > 0) {
            timeoutMillis++;
        }

        wait(timeoutMillis);
    } @Deprecated(since="9") protected void finalize() throws Throwable { }
}

wait

这里特别将Object的wait方法单独介绍,通过以下源码可以看出,多个重载的wait方法最终都会调用native标注的wait(long timeoutMillis)方法,此方法可能会抛出InterruptedException中断异常。

public final void wait() throws InterruptedException {
    wait(0L);
} public final void wait(long timeoutMillis, int nanos) throws InterruptedException { if (timeoutMillis < 0) { throw new IllegalArgumentException("timeoutMillis value is negative");
    } if (nanos < 0 || nanos > 999999) { throw new IllegalArgumentException( "nanosecond timeout value out of range");
    } if (nanos > 0) {
        timeoutMillis++;
    }

    wait(timeoutMillis);
} public final native void wait(long timeoutMillis) throws InterruptedException;

那么何时会抛出异常呢?

如果任何线程在当前线程等待通知之前或期间中断了当前线程,便抛出此异常时清除当前线程的中断状态。

InterruptedException

本小节着重介绍InterruptedException,我们从InterruptedException的源码入手。

package java.lang; /**
 * Thrown when a thread is waiting, sleeping, or otherwise occupied,
 * and the thread is interrupted, either before or during the activity.
 * Occasionally a method may wish to test whether the current
 * thread has been interrupted, and if so, to immediately throw
 * this exception.  The following code can be used to achieve
 * this effect:
 * <pre>
 *  if (Thread.interrupted())  // Clears interrupted status!
 *      throw new InterruptedException();
 * </pre>
 *
 * @author Frank Yellin
 * @see java.lang.Object#wait()
 * @see java.lang.Object#wait(long)
 * @see java.lang.Object#wait(long, int)
 * @see java.lang.Thread#sleep(long)
 * @see java.lang.Thread#interrupt()
 * @see java.lang.Thread#interrupted()
 * @since 1.0
 */ public class InterruptedException extends Exception { private static final long serialVersionUID = 6700697376100628473L; /**
     * Constructs an <code>InterruptedException</code> with no detail  message.
     */ public InterruptedException() { super();
    } /**
     * Constructs an <code>InterruptedException</code> with the
     * specified detail message.
     *
     * @param s   the detail message.
     */ public InterruptedException(String s) { super(s);
    }
}

InterruptedException线程堵塞异常,加上上文中已经介绍过的Object的wait方法,这里再列举几个可能会抛出InterrptdeException异常方法:

  1. java.lang.Object.wait()及其重载方法
    • 会进入等待区等待。
  2. java.lang.Thread.sleep()及其重载方法
    • 会睡眠执行参数内所设置的时间。
  3. java.lang.Thread.join()及其重载方法
    • 会等待到指定的线程结束为止。

以上这些方法执行结束后,该线程会重新启动,因此可能会出现线程堵塞。故这些方法需要处理可能抛出的InterrptdeException异常。

Thread.interrupt()

wait()、sleep()、join()等方法都会使得当前线程进入阻塞状态,若另外的一个线程调用被阻塞线程的Thread.interrupt(),则会打断这种阻塞,抛出InterruptedException。InterruptedException就像一个signal(信号)一样通知当前线程被打断了。但是打断一个线程并不等于该线程的生命周期结束,仅仅是打断了当前线程的阻塞状态。

问题排查

进一步分析

通过上次对AtomicReference相关逻辑的理解,回到最初的问题,可以确定一定是某种原因导致中断的发生,表现就是当AtomicReference调用wait()方法后引起了InterruptedException。

经过进一步分析log,发现了新的StackTrace,如下:

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@3267902e rejected from java.util.concurrent.ThreadPoolExecutor@d4ab1ed[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
    at java.base/java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1714)
    at java.base/java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1931)
    at bt.net.ConnectionSource.getConnectionAsync(ConnectionSource.java:132)
    at bt.torrent.messaging.TorrentWorker.onPeerDiscovered(TorrentWorker.java:412)
    at bt.torrent.messaging.TorrentWorker.lambda$new$0(TorrentWorker.java:108)
    at bt.event.EventBus.lambda$addListener$3(EventBus.java:249)
    at bt.event.EventBus.doFireEvent(EventBus.java:183)
    at bt.event.EventBus.fireEvent(EventBus.java:171)
    at bt.event.EventBus.firePeerDiscovered(EventBus.java:62)
    at bt.peer.PeerRegistry.addPeer(PeerRegistry.java:214)
    at bt.processor.magnet.FetchMetadataStage.lambda$doExecute$0(FetchMetadataStage.java:81)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at bt.processor.magnet.FetchMetadataStage.doExecute(FetchMetadataStage.java:80)
    at bt.processor.magnet.FetchMetadataStage.doExecute(FetchMetadataStage.java:42)
    at bt.processor.TerminateOnErrorProcessingStage.doExecute(TerminateOnErrorProcessingStage.java:38)
    at bt.processor.RoutingProcessingStage.execute(RoutingProcessingStage.java:39)
    at bt.processor.ChainProcessor.doExecute(ChainProcessor.java:112)
    at bt.processor.ChainProcessor.executeStage(ChainProcessor.java:96)
    at bt.processor.ChainProcessor.executeStage(ChainProcessor.java:98)
    at bt.processor.ChainProcessor.lambda$process$0(ChainProcessor.java:81)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1736)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

抛出异常的源码:

private <E extends BaseEvent> void addListener(Class<E> eventType, TorrentId torrentId, Consumer<E> listener) {
        Collection<Consumer<? extends BaseEvent>> listeners; if (torrentId == null) {
            listeners = this.listeners.computeIfAbsent(eventType, key -> ConcurrentHashMap.newKeySet());
        } else {
            listeners = this.listenersOnTorrent.computeIfAbsent(torrentId, key -> new ConcurrentHashMap<>())
                    .computeIfAbsent(eventType, key -> ConcurrentHashMap.newKeySet());
        }

        eventLock.writeLock().lock(); try {
            Consumer<E> safeListener = event -> { try {
                    listener.accept(event);
                } catch (Exception ex) {
                    LOGGER.error("Listener invocation failed", ex);
                }
            };
            listeners.add(safeListener);
        } finally {
            eventLock.writeLock().unlock();
        }
    }

Debug断点截图:


猜想

以上堆栈可以看到[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]这句描述,说明当前的使用的ExecutorService并未创建线程或者创建的线程数量为0。这里我们可以大胆猜测,由于可用线程数量都是0,AtomicReference调用wait()方法后引起了InterruptedException。

这里有进一步产生一个问题,我们并没有在实际使用中特意指定ExecutorService,而是用的组件默认的ExecutorService。那为什么为有Thread数量为0的情况呢?

通过分析源码发现,通过系统使用Google的guice这个IOC框架实现DI,其中@Inject注解注入了TorrentProcessorFactory,使用@ClientExecutor指定了具体的ExecutorService。

private ExecutorService executor;

@Inject public TorrentProcessorFactory( // 空歌白石:略去无关的参数 @ClientExecutor ExecutorService executor, // 空歌白石:略去无关的参数 ) { // 空歌白石:略去无关代码 this.executor = executor; // 空歌白石:略去无关代码 }

使用guice的Binder绑定ClientExecutor,代码如下。

binder.bind(ExecutorService.class).annotatedWith(ClientExecutor.class) .toProvider(ExecutorServiceProvider.class).in(Singleton.class);

从上述代码可以看出在ExecutorServiceProvider完成ExecutorService的创建。

package bt.service; import bt.runtime.Config; import com.google.inject.Inject; import com.google.inject.Provider; import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /**
 * @since 1.0
 */ public class ExecutorServiceProvider implements Provider<ExecutorService> { private volatile ExecutorService executorService; private final Object lock; private final Config config; @Inject public ExecutorServiceProvider(Config config) { this.lock = new Object(); this.config = config;
    } @Override public ExecutorService get() { if (executorService == null) { synchronized (lock) { if (executorService == null) {
                    executorService = Executors.newCachedThreadPool(new ThreadFactory() { private AtomicInteger threadId = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(r, Objects.requireNonNull(getNamePrefix()) + "-" + threadId.getAndIncrement());
                        }
                    });
                }
            }
        } return executorService;
    } /**
     * @since 1.6
     */ protected String getNamePrefix() { return String.format("%d.bt.service.executor-thread", config.getAcceptorPort());
    }
}

由于newCachedThreadPool实现的ThreadPool的keepAliveTime为60s,猜测可能是这个时间过短引起。于是重新实现了newCachedThreadPool但是问题仍然没有解决。

/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
                                    threadFactory);
}

继续分析

上述思路受阻后,重新回到最原始的问题点,即java.lang.InterruptedException的异常,再次查看wait()方法的JDK注释,如下:

Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object. In other words, this method behaves exactly as if it simply performs the call wait(0).
The current thread must own this object's monitor. The thread releases ownership of this monitor and waits until another thread notifies threads waiting on this object's monitor to wake up either through a call to the notify method or the notifyAll method. The thread then waits until it can re-obtain ownership of the monitor and resumes execution. As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop:
           synchronized (obj) { while (<condition does not hold>)
                   obj.wait();
               ... // Perform action appropriate to condition }

This method should only be called by a thread that is the owner of this object's monitor. See the notify method for a description of the ways in which a thread can become the owner of a monitor.
Throws:
IllegalMonitorStateException – if the current thread is not the owner of the object's monitor.
InterruptedException – if any thread interrupted the current thread before or while the current thread was waiting for a notification. The interrupted status of the current thread is cleared when this exception is thrown.
See Also:
notify(), notifyAll()

如果torrent.wait();不抛出InterruptedException,那么根据wait()方法的定义可以看出,可以使用notify()或notifyAll()停止wait。因此,决定重新分析MetadataConsumer类,将相对完整的源码贴出来,同时去掉非相关的部分代码。

public class MetadataConsumer { // 空歌白石:略去无关代码 private final TorrentId torrentId; // set immediately after metadata has been fetched and verified private final AtomicReference<Torrent> torrent; // 空歌白石:略去无关代码 public MetadataConsumer(IMetadataService metadataService,
                            TorrentId torrentId,
                            Config config,
                            EventSource eventSource) { // 空歌白石:略去无关代码 this.torrent = new AtomicReference<>(); // 空歌白石:略去无关代码 } @Consumes public void consume(UtMetadata message, MessageContext context) {
        Peer peer = context.getPeer(); // being lenient herer and not checking if the peer advertised ut_metadata support StateContext stateContext = states.get(peer); switch (message.getType()) { case DATA: { int totalSize = message.getTotalSize().get(); if (totalSize >= metadataExchangeMaxSize) { throw new IllegalStateException("Declared metadata size is too large: " + totalSize + "; max allowed is " + metadataExchangeMaxSize);
                } if (stateContext != null) {
                    stateContext.requestedIndexes.remove(message.getPieceIndex()); if (stateContext.state != State.HAS_METADATA) {
                        stateContext.state = State.HAS_METADATA;
                    }
                }
                processMetadataBlock(message.getPieceIndex(), totalSize, message.getData().get());
            } break; case REJECT: {
                stateContext.rejectedTime = System.currentTimeMillis();
                stateContext.state = State.REJECTED;
            } break; default: { // ignore }
        }
    } private void processMetadataBlock(int pieceIndex, int totalSize, byte[] data) { if (metadata == null) { synchronized (this) { if (metadata == null) {
                    metadata = new ExchangedMetadata(totalSize, metadataExchangeBlockSize);
                    blocksNeedRequest = createBlockIndexQueue(metadata.getBlockCount());
                }
            }
        } if (!metadata.isBlockPresent(pieceIndex)) {
            metadata.setBlock(pieceIndex, data); if (metadata.isComplete()) { byte[] digest = metadata.getSha1Digest(); if (Arrays.equals(digest, torrentId.getBytes())) {
                    Torrent fetchedTorrent = null; try {
                        fetchedTorrent = metadataService.fromByteArray(metadata.getBytes());
                    } catch (Exception e) {
                        LOGGER.error("Processing of metadata failed: " + torrentId, e);
                        metadata = null;
                    } if (fetchedTorrent != null) { synchronized (torrent) {
                            torrent.set(fetchedTorrent);
                            states.clear();
                            torrent.notifyAll();
                        }
                    }
                } else { // 空歌白石:略去无关代码 }
            }
        }
    } @Produces public void produce(Consumer<Message> messageConsumer, MessageContext context) { // stop here if metadata has already been fetched if (torrent.get() != null) { return;
        } // 空歌白石:略去无关代码 } /**
     * @return Torrent, blocking the calling thread if it hasn't been fetched yet
     */ public Torrent waitForTorrent() { while (torrent.get() == null) { synchronized (torrent) { if (torrent.get() == null) { try {
                        torrent.wait();
                    } catch (InterruptedException e) { throw new RuntimeException(e);
                    }
                }
            }
        } return torrent.get();
    } // 空歌白石:略去无关代码 }

通过重新分析MetadataConsumer类,我们发现AtomicReference<Torrent>的整个代码中只有一个地方使用了notifyAll通知其他线程状态,截取代码如下:

Torrent fetchedTorrent = null; try {
    fetchedTorrent = metadataService.fromByteArray(metadata.getBytes());
} catch (Exception e) {
    LOGGER.error("Processing of metadata failed: " + torrentId, e);
    metadata = null;
} if (fetchedTorrent != null) { synchronized (torrent) {
        torrent.set(fetchedTorrent);
        states.clear();
        torrent.notifyAll();
    }
}

通过关注这部分的log日志,发现了如下NPE问题。

java.lang.NullPointerException
    at bt.torrent.MetadataService.fromByteArray(MetadataService.java:31)
    at bt.torrent.messaging.MetadataConsumer.processMetadataBlock(MetadataConsumer.java:157)
    at bt.torrent.messaging.MetadataConsumer.consume(MetadataConsumer.java:125)
    at bt.torrent.messaging.DefaultMessageRouter$CollectingCompilerVisitor$1.consume(DefaultMessageRouter.java:167)
    at bt.torrent.messaging.DefaultMessageRouter.lambda$doConsume$2(DefaultMessageRouter.java:129)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at bt.torrent.messaging.DefaultMessageRouter.doConsume(DefaultMessageRouter.java:126)
    at bt.torrent.messaging.DefaultMessageRouter.consume(DefaultMessageRouter.java:118)
    at bt.torrent.messaging.RoutingPeerWorker.accept(RoutingPeerWorker.java:67)
    at bt.torrent.messaging.RoutingPeerWorker.accept(RoutingPeerWorker.java:41)
    at bt.torrent.messaging.TorrentWorker$PieceAnnouncingPeerWorker.accept(TorrentWorker.java:380)
    at bt.torrent.messaging.TorrentWorker.lambda$consume$8(TorrentWorker.java:183)
    at java.base/java.util.Optional.ifPresent(Optional.java:183)
    at bt.torrent.messaging.TorrentWorker.consume(TorrentWorker.java:183)
    at bt.torrent.messaging.TorrentWorker.lambda$addPeer$6(TorrentWorker.java:177)
    at bt.MultiThreadMessageDispatcher$MessageDispatchingLoop.processConsumer(MultiThreadMessageDispatcher.java:148)
    at bt.MultiThreadMessageDispatcher$MessageDispatchingLoop.run(MultiThreadMessageDispatcher.java:108)
    at bt.MultiThreadMessageDispatcher.lambda$createAndSubmitTask$3(MultiThreadMessageDispatcher.java:292)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

进一步分析是在fromByteArray类中对于边界值的处理有个bug。经过修复后,不再出现InterruptedException了,问题得到解决。

上文中我们会提到过一个问题那么何时会抛出异常呢?,当时的回答是:如果任何线程在当前线程等待通知之前或期间中断了当前线程,便抛出此异常时清除当前线程的中断状态。正是由于NullPointerException中断了正在运行的Thread,导致AtomicReference在调用wait方法时抛出了InterruptedException异常。

总结

本文从最初的InterruptedException开始详细分析了AtomicReference相关的方方面面,最终也将实际工作中遇到的问题加以解决。

#Java##后端##计算机##编程#
全部评论
谢谢大佬分享的Interrupted Exception,学到了
点赞 回复 分享
发布于 2022-07-27 15:12

相关推荐

上周组里招人,我面了六个候选人,回来跟同事吃饭的时候聊起一个让我挺感慨的现象。前三个候选人,算法题写得都不错。第一道二分查找,五分钟之内给出解法,边界条件也处理得干净。第二道动态规划,状态转移方程写对了,空间复杂度也优化了一版。我翻他们的简历,力扣刷题量都在300以上。后三个呢,就有点参差不齐了。有的边界条件没处理好,有的直接说这道题没刷过能不能换个思路讲讲。其中有一个女生,我印象特别深——她拿到题之后没有马上写,而是先问我:“面试官,我能先跟你确认一下我对题目的理解吗?”然后她把自己的思路讲了一遍,虽然最后代码写得不是最优解,但整个沟通过程非常顺畅。这个女生的代码不是最优的,但当我问她“如果这里是线上环境,你会怎么设计’的时候,她给我讲了一套完整的方案——异常怎么处理、日志怎么打、怎么平滑发布。她对这是之前在实习的时候踩过的坑。”我在想LeetCode到底在筛选什么?我自己的经历可能有点代表性。我当年校招的时候,也是刷了三百多道题才敢去面试。那时候大家都刷,你不刷就过不了笔试关。后来工作了,前三年基本没再打开过力扣。真正干活的时候,没人让你写反转链表,也没人让你手撕红黑树。更多的是:这个接口为什么慢了、那个服务为什么OOM了、线上数据对不上了得排查一下。所以后来我当面试官,慢慢调整了自己的评判标准。算法题我还会出,但目的变了。我出算法题,不是想看你能不能背出最优解。而是想看你拿到一个陌生问题的时候,是怎么思考的。你会先理清题意吗?你会主动问边界条件吗?你想不出来的时候会怎么办?你写出来的代码,变量命名乱不乱、结构清不清楚?这些才是工作中真正用得到的能力。LeetCode是一个工具,不是目的。它帮你熟悉数据结构和常见算法思路,这没问题。但如果你刷了三百道题,却说不清楚自己的项目解决了什么问题、遇到了什么困难、你是怎么解决的,那这三百道题可能真的白刷了。所以还要不要刷LeetCode?要刷,但别只刷题。刷题的时候,多问自己几个为什么:为什么用这个数据结构?为什么这个解法比那个好?如果换个条件,解法还成立吗?把刷题当成锻炼思维的方式,而不是背答案的任务。毕竟面试官想看到的,从来不是一台背题机器,而是一个能解决问题的人。
牛客51274894...:意思是光刷力扣还不够卷
AI时代还有必要刷lee...
点赞 评论 收藏
分享
刚刷到字节跳动官方发的消息,确实被这波阵仗吓了一跳。在大家还在纠结今年行情是不是又“寒冬”的时候,字节直接甩出了史上规模最大的转正实习计划——ByteIntern。咱们直接看几个最硬的数,别被花里胡哨的宣传词绕晕了。首先是“量大”。全球招7000多人是什么概念?这几乎是把很多中型互联网公司的总人数都给招进来了。最关键的是,这次的资源分配非常精准:研发岗给了4800多个Offer,占比直接超过六成。说白了,字节今年还是要死磕技术,尤其是产品和AI领域,这对于咱们写代码的同学来说,绝对是今年最厚的一块肥肉。其次是大家最关心的“转正率”。官方直接白纸黑字写了:整体转正率超过50%。这意味着只要你进去了,不划水、正常干,每两个人里就有一个能直接拿校招Offer。对于2027届(2026年9月到2027年8月毕业)的同学来说,这不仅是实习,这简直就是通往大厂的快捷通道。不过,我也得泼盆冷水。坑位多,不代表门槛低。字节的实习面试出了名的爱考算法和工程实操,尤其是今年重点倾斜AI方向,如果你简历里有和AI相关的项目,优势还是有的。而且,转正率50%也意味着剩下那50%的人是陪跑的,进去之后的考核压力肯定不小。一句话总结:&nbsp;27届的兄弟们,别犹豫了。今年字节这是铁了心要抢提前批的人才,现在投递就是占坑。与其等到明年秋招去千军万马挤独木桥,不如现在进去先占个工位,把转正名额攥在手里。
喵_coding:别逗了 50%转正率 仔细想想 就是转正与不转正
字节7000实习来了,你...
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务