Java并发编程实战“J.U.C”:CyclicBarrier

在上篇博客(【Java并发编程实战】--“J.U.C”:Semaphore)中,LZ介绍了Semaphore,下面LZ介绍CyclicBarrier。在JDK API中是这么介绍的:

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。

对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。

CyclicBarrier分析

CyclicBarrier结构如下:

Java并发编程实战“J.U.C”:CyclicBarrier

 

从上图可以看到CyclicBarrier内部使用ReentrantLock独占锁实现的。其构造函数如下:

CyclicBarrier(int parties):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。

CyclicBarrier(int parties, Runnable barrierAction):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

public CyclicBarrier(int parties) {
        this(parties, null);
    }
    
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

在CyclicBarrier中,最重要的方法就是await(),在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。其源代码如下:

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }

await内部调用dowait():

private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            //独占锁
            final ReentrantLock lock = this.lock;
            //获取独占锁
            lock.lock();
            try {
                //保存当前"Generation"
                final Generation g = generation;
                //当前generation“已损坏”,抛出BrokenBarrierException异常
                //抛出该异常一般都是某个线程在等待某个处于“断开”状态的CyclicBarrier
                if (g.broken)
                    throw new BrokenBarrierException();

                //当前线程中断,通过breakBarrier终止终止CyclicBarrier
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
               
               //计数器-1
               int index = --count;
               //如果计数器 == 0
               //表示所有线程都已经到位,触发动作(是否执行某项任务)
               if (index == 0) {  // tripped
                   boolean ranAction = false;
                   try {
                       //barrierCommand线程要执行的任务
                       final Runnable command = barrierCommand;
                       //执行的任务!=null,执行任务
                       if (command != null)
                           command.run();
                       ranAction = true;
                       //唤醒所有等待线程,并更新generation。
                       nextGeneration();
                       return 0;
                   } finally {
                       if (!ranAction)
                           breakBarrier();
                   }
               }

               //循环一直执行,直到下面三个if一个条件满足才会退出循环
               for (;;) {
                    try {
                        //如果不是超时等待,则调用await等待
                        if (!timed)
                            trip.await();
                        //调用awaitNanos等待
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        //
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            Thread.currentThread().interrupt();
                        }
                    }

                    //当前generation“已损坏”,抛出BrokenBarrierException异常
                    //抛出该异常一般都是某个线程在等待某个处于“断开”状态的CyclicBarrier
                    if (g.broken)
                        throw new BrokenBarrierException();

                    //generation已经更新,返回index
                    if (g != generation)
                        return index;

                    //“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                //释放独占锁
                lock.unlock();
            }
        }

在dowait方法中其实处理逻辑还是比较简单的:

1、首先判断该barrier是否已经断开了,如果断开则抛出BrokenBarrierException异常;

2、判断计算器index是否等于0,如果等于0,则表示所有的线程准备就绪,已经到达某个公共屏障点了,barrier可以进行后续工作了(是否执行某项任务(构造函数决定));然后调用nextGeneration方法进行更新换代工作(其中会唤醒所有等待的线程);

3、通过for循环(for(;;))使线程一直处于等待状态。直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生。

在dowait中有Generation这样一个对象。该对象是CyclicBarrier的一个成员变量:

private static class Generation {
        boolean broken = false;
    }

Generation描述着CyclicBarrier的更显换代。在CyclicBarrier中,同一批线程属于同一代。当有parties个线程到达barrier,generation就会被更新换代。其中broken标识该当前CyclicBarrier是否已经处于中断状态。

对于中断,CyclicBarrier是通过breakBarrier()实现的:

private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

在breakBarrier()中除了将broken设置为true,还会调用signalAll将在CyclicBarrier处于等待状态的线程全部唤醒。

在超时的判断中,CyclicBarrier根据timed的值来执行不同的wait。await、awaitNanos都是Condition中的方法。

当index = --count等于0时,标识“有parties个线程到达barrier”,临界条件到达,则执行相应的动作。执行完动作后,则调用nextGeneration进行更新换代:

private void nextGeneration() {
        //唤醒所有处于等待状态的线程
        trip.signalAll();
        //初始化计数器
        count = parties;
        //产生新的Generation对象
        generation = new Generation();
    }

示例

1、线程等待到一定条件后才会继续进行。

public class CyclicBarrierTest_1 {
    private static CyclicBarrier barrier;
    
    static class threadTest1 extends Thread{
        public void run() {
            System.out.println(Thread.currentThread().getName() + "达到...");
            try {
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "执行完成...");
        }
    }
    
    public static void main(String[] args) {
        barrier = new CyclicBarrier(5);
        for(int i = 1 ; i <= 5 ; i++){
            new threadTest1().start();
        }
    }
}

执行结果:

Thread-0达到...
Thread-1达到...
Thread-3达到...
Thread-2达到...
Thread-4达到...
Thread-4执行完成...
Thread-0执行完成...
Thread-1执行完成...
Thread-2执行完成...
Thread-3执行完成...

2、线程等待到一定条件后,执行某项任务。比如说我们等车,只有当车坐满后,汽车才会发动。

这个只需要对上面的代码进行小动作的改动即可:

public class CyclicBarrierTest_2 {
    private static CyclicBarrier barrier;
    
    static class threadTest1 extends Thread{
        public void run() {
            System.out.println(Thread.currentThread().getName() + "达到...");
            try {
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "执行完成...");
        }
    }
    
    public static void main(String[] args) {
        barrier = new CyclicBarrier(5,new Runnable() {
            
            @Override
            public void run() {
                System.out.println("执行CyclicBarrier中的任务.....");
            }
        });
        for(int i = 1 ; i <= 5 ; i++){
            new threadTest1().start();
        }
    }
}

-执行结果:

Thread-0达到...
Thread-1达到...
Thread-3达到...
Thread-4达到...
Thread-2达到...
执行CyclicBarrier中的任务.....
Thread-2执行完成...
Thread-0执行完成...
Thread-3执行完成...
Thread-1执行完成...
Thread-4执行完成...

 

参考文献:

1、Java多线程系列--“JUC锁”10之 CyclicBarrier原理和示例

更多相关文章
  • >> 点击进入 ChinaJoy2014 专题,查看更多报道 易网科技讯 7月30日消息,中国国际数码互动娱乐展览会开幕式暨产业高峰论坛今天在上海举行,国家新闻出版广电总局.数字出版司司长张毅君和上海市新闻出版局局长徐炯做出开幕致辞.张毅君表示:"本届展会以塑造世界游戏产业新格 ...
  • 链接在这
  • Highcharts翻译系列之十五:title标题和subtitle副标题 subtitle副标题   参数  描述  默认值   align  水平布局方式.可以是left,right,center.  center   floating  当子标题是浮动的时候,绘图区将不会给它留空间  fals ...
  • 前言     但凡一个略有规模的项目都需要一个持续集成环境的支撑,为什么需要持续集成环境,我们来看一个例子.假如一个项目,由A.B两位程序员来协作开发,A负责前端模块,B负责后端模块,前端依赖后端.A和B都习惯使用SVN作为代码管理工具,他们分别开始工作,一个功能完成后会提交到SVN,以便对方能够使 ...
  • 之前日本媒体曾表示,目前曝光的iPhone 6外形并非最终版,其最大的变化就是后壳三段式设计与5S类似.现在一家长期做定制iPhone的Feld & Volk公司送上了iPhone 6的新部件,没错,也是后壳.从曝光的图片看,该机的后壳真实度颇高,内部构造非常繁杂.此外,让人比较失望的是,该 ...
  • 烂泥:Windows server 2008开启远程桌面 为了以后的学习和进步,从今天开始所有的实验尽量都是在server 2008上完成.今天打算学习server 08 却发现远程无法连接,经过本人一个多小时的折腾终于搞定了,具体请看下面的截图. 第一.首先打开"服务器管理器" ...
一周排行
  • 据韩国SBS电视台网站11月17日消息,全球最大在线旅游公司Expedia日前面向25个国家和地区的8556名上班族进行的问卷调查显示,韩国上班族智能手机使用比重达94%,居全球首位.资料图参与本次调查的共有304名 ...
  • 从1998年开始,广东发展银行最初的网络安全体系就依据思科SAFE蓝图部署.SAFE主张,网络安全建设不能一蹴而就,而应该是一个动态的过程.所以在最初的部署中,思科主要协助广东发展银行解决了最突出的网络安全问题--网 ...
  •   在JavaScript开发中,被人问到:null与undefined到底有啥区别?   一时间不好回答,特别是undefined,因为这涉及到undefined的实现原理.于是,细想之后,写下本文,请各位大侠拍砖 ...
  • 宠物店老板向客人介绍架子上他的鹦鹉,鹦鹉两只脚上都有丝带,分别拉左.右脚丝带鹦鹉会说不同的招呼语,客人问,那我两条丝带一起拉,会怎样?鹦鹉叫:"笨蛋!那样我会掉下去的!"这个客人的想法就是典型的黑 ...
  • Drawable资源 Drawable资源是对图像的一个抽象,你可以通过getDrawable(int)得到并绘制到屏幕上.这里有几种不同类型的Drawable: Bitmap File 一个Bitmap图像文件(. ...
  • 本报讯 (记者段郴群) 据媒体报道,苹果本周一宣布,在过去三天的时间里,共售出500多万部iPhone 5智能手机.虽然超过了去年iPhone 4S的纪录,但仍低于分析师此前的预期目标.受此消息影响,多日股价保持在每 ...
  • 5月10日消息,据国外媒体报道,调研公司ChangeWave最新调查结果显示,超过50%的Verizon用户愿意使用苹果iPhone手机. 当前,苹果是否将推出Verizon版iPhone手机成为业内关注的焦点.有分 ...
  • 在<maven学习之一>中介绍了M2_HOME指向了maven的安装目录,如下图:   weiwan..................
  • package test.com.erayt.eds.risk.service;     import java.util.Map; import java.util.concurrent.ConcurrentHas ...
  • 开发人员常用的两个工具RevitLookup以及Revit AddinManager 在Revit 2014 SDK第一版中没有提供.许多开发人员问起这两个工具在那里.没有他们Revit二次开发很困难.在Revit ...