不能不学的JUC工具类: Semaphore详解

前言

在工作中我们经常需要考虑对资源的使用,避免资源被过度使用或者资源没有被利用到而造成的问题,那我们该如何去限制访问某些资源的线程数目,从而对完成资源的保护。

1. 限制多线程同时操作的方式

concurrent包为我们提供了多种防止多线程同时操作一个资源的方法

volatile
原子类
Synchronized和Lock
Semaphore

2. Semaphore是什么?

Semaphore被翻译为计数信号量,通常使用进行并发线程数量的限制,保证多个线程能够合理的使用资源。用大白话理解就是理解为红路灯。
官方的翻译:计数信号量。
从概念上讲,信号量维护一组许可证。 如有必要,每个acquire块都将阻塞直到获得许可为止,然后再获取它。 每个release添加一个许可证,从而有可能释放阻塞的获取者。 但是,没有使用实际的许可对象。 Semaphore只是保持可用数量的计数并采取相应措施。

3. 应用场景

公共资源有限的地方,我们就需要考虑限制的问题,防止过度的操作,带来的不良影响
例如:数据库连接

4. 如何使用Semaphore

没有进行控制的代码

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    IntStream.range(0,5).forEach(i -> executorService.submit(() ->{ try {
            System.out.println(Thread.currentThread().getName() + "gogogo");
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + "正在操作");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "释放操作");

    }));

    executorService.shutdown();
}

输出结果

pool-1-thread-1gogogo
pool-1-thread-3gogogo
pool-1-thread-4gogogo
pool-1-thread-2gogogo
pool-1-thread-5gogogo
pool-1-thread-1正在操作
pool-1-thread-5正在操作
pool-1-thread-5释放操作
pool-1-thread-2正在操作
pool-1-thread-2释放操作
pool-1-thread-3正在操作
pool-1-thread-3释放操作
pool-1-thread-4正在操作
pool-1-thread-4释放操作
pool-1-thread-1释放操作

进行改造使用我们的并发工具Semaphore

private static Semaphore semaphore = new Semaphore(1,false); public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    IntStream.range(0,5).forEach(i -> executorService.submit(() ->{ try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + "gogogo");
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + "正在操作");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        semaphore.release();
        System.out.println(Thread.currentThread().getName() + "释放操作");

    }));

    executorService.shutdown();
}

输出结果

pool-1-thread-1gogogo
pool-1-thread-1正在操作
pool-1-thread-1释放操作
pool-1-thread-2gogogo
pool-1-thread-2正在操作
pool-1-thread-2释放操作
pool-1-thread-3gogogo
pool-1-thread-3正在操作
pool-1-thread-3释放操作
pool-1-thread-4gogogo
pool-1-thread-4正在操作
pool-1-thread-4释放操作
pool-1-thread-5gogogo
pool-1-thread-5正在操作
pool-1-thread-5释放操作

我们在Semaphore的初始化参数中,设置了允许并发线程数量为1,表示只允许1个线程通过,当线程拿到许可证的时候进行执行,线程完成之后进行许可证的归还,给下一个进来的线程使用,直到任务结束。

5. Semaphore的主要方法和核心参数

核心参数


构造方法

代码如下:
默认是非公平锁,只要传入并发线程数量

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
} public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

核心方法

//从此信号量获取许可,先休眠,直到获得可用线程或者被中断 void acquire() throws InterruptedException //中断继续 void acquireUninterruptibly() //从此信号量获取设定线程数许可,先休眠,直到获得可用线程或者被中断 void acquire(int permits) //尝试获取许可,如果能够获取成功则立即返回true,否则,则返回false boolean tryAcquire() //和上面一样,设置了等待最长时间 boolean tryAcquire(long timeout, TimeUnit unit) //释放许可 void release() //返回当前可用的许可证数 int availablePermits() //等待许可证数 int getQueueLength() //返回正在等待线程的合集 Collection<Thread> getQueuedThreads() 

6. Semaphore的原理

实现的核心还是AQS的共享模式

Sync extends AbstractQueuedSynchronizer

acquire()

//以共享模式获取,如果中断则中止. public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //1. 通过首先检查中断状态,中断返回异常 if (Thread.interrupted()) throw new InterruptedException(); // 2. 以共享模式获取,获取到了锁,接下去,执行,没有就排队 if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
} //非公平模式共享锁获取 protected int tryAcquireShared(int acquires) { for (;;) { //判断当前节点在同步队列中是否有前驱节点的判断,获取不到返回-1 if (hasQueuedPredecessors()) return -1; //Semaphore用AQS的state变量的值代表可用许可数  int available = getState(); int remaining = available - acquires; //如果剩余许可数小于0或者CAS将当前可用许可数设置为剩余许可数成功,则返回成功许可数 if (remaining < 0 ||
            compareAndSetState(available, remaining)) return remaining;
    }
} private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //加入等待队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { //自旋过程中的退出条件是是当前节点的前驱节点是头结点并且tryAcquireShared(arg) //返回值大于等于0即能成功获得同步状态 for (;;) { //获取前驱节点 final Node p = node.predecessor(); if (p == head) { //争夺锁 int r = tryAcquireShared(arg); if (r >= 0) { //先把 head 给占了,然后唤醒队列中其他的线程 setHeadAndPropagate(node, r);
                    p.next = null; // help GC failed = false; return;
                }
            } if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) throw new InterruptedException();
        }
    } finally { //失败的话,取消状态清除该节点 if (failed)
            cancelAcquire(node);
    }
} //设置head的值,完成初始化工作 private Node enq(final Node node) { for (;;) {
        Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t; if (compareAndSetTail(t, node)) {
                t.next = node; return t;
            }
        }
    }
}

7. Semaphore需要注意的问题

release()不能随便调用,调用一次就增加一次

permits the initial number of permits available. This value may be negative, in which case releases must occur before any acquires will be granted.

public class SemaphoreTest { private static Semaphore semaphore = new Semaphore(1,false); public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        IntStream.range(0,5).forEach(i -> executorService.submit(() ->{ try {
                System.out.println("after:" + semaphore.availablePermits()); //semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "gogogo");
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + "正在操作");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
                System.out.println("before:" + semaphore.availablePermits());
                System.out.println(Thread.currentThread().getName() + "释放操作");
}
            semaphore.release();
            System.out.println("before:" + semaphore.availablePermits());
            System.out.println(Thread.currentThread().getName() + "释放操作");

        }));

        executorService.shutdown();
    }
}
after:1 pool-1-thread-1gogogo
pool-1-thread-1正在操作
before:2 pool-1-thread-1释放操作
after:2 pool-1-thread-1gogogo
pool-1-thread-1正在操作
before:3 pool-1-thread-1释放操作
after:3 pool-1-thread-1gogogo
pool-1-thread-1正在操作
before:4 pool-1-thread-1释放操作

信号量泄露,指的是申请了但是没有释放,这会导致进入临界区的线程数量就会越来越少,随着时间的推移,最后许可证数量不够用,会导致线程卡死。

建议:在操作的时候,我们尽可能在finally中进行 semaphore.release() 的操作。

总结

以上就是关于信号量的全部内容,总体看来,用法比较简单,在实际的工作中需要对线程进行控制的场景,我们可以将他作为一个方案。

#Java##程序员#
全部评论

相关推荐

10-31 13:04
南华大学 Java
嵌入式的小白:很多面试,面试前不会去打扰cto的,但一般cto不会在这些小事上刷人,只能说这个cto比较操心,啥重要不重要,紧急不紧急的,估计都会过问,平淡看待吧
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务