本文主要介绍了Java线程通信的原理、目的、方法,在文章最后给出了多个线程通信案例。
1 为什么要线程通信?
多个线程并发执行时,在默认情况下CPU是随机切换线程的,当我们需要多个线程来共同完成一件任务,并且我们希望他们有规律的执行,那么多线程之间需要一些协调通信,以此来帮我们达到多线程共同操作一份数据。
狭义上来说:线程通信的目标是使线程间能够互相发送信号(通知),另一方面,线程通信使线程能够等待其他线程的信号(通知),也称为线程间的等待/通知机制,或者生产消费模式!
广义上说:能够协调线程调度运行的方法都属于线程通信的应用,不应是一个线程主动通知另外一个线程,这个通知还可能是一个公共信号。
2 线程通信的方式
synchronized 线程通信
synchronized + wait + notify + notifyAll
使用lock+Condition控制线程通信
JDK1.5开始,Lock可以代替synchronized 同步方法或同步代码块,Condition替代同步监视器的功能:lock + condition + await + signal + signalAll
使用阻塞队列(BlockingQueue)控制线程通信
BlockingQueue接口主要作为线程同步的工具。当生产者试图向BlockingQueue中放入元素,如果队列已满,则线程被阻塞;当消费者试图向BlockingQueue中取出元素时,若该队列已空,则线程被阻塞。这里通过共享一个队列的信息,实现生产者和消费者。
使用管道流PipedWriter/PipedReader
使用JDK1.5提供的信号量Semaphore、CountDownLatch、CyclicBarrier等工具类
volatile
volatile能保证所修饰的变量对于多个线程可见性,即只要被修改,其它线程读到的一定是最新的值。以此来实现线程通信,但是volatile不能保证操作原子性,是一种弱的同步机制。
3 线程通信的案例
3.1 synchronized实现生产消费
下面的案例使用synchronized实现了多生产者多消费者的案例
publicclassProductionAndConsumption{publicstaticvoidmain(String[]args){Resourceresource=newResource();Threadthread1=newThread(newProducer(resource),"生产者1");Threadthread2=newThread(newProducer(resource),"生产者2");Threadthread3=newThread(newConsumer(resource),"消费者1");Threadthread4=newThread(newConsumer(resource),"消费者2");thread1.start();thread2.start();thread3.start();thread4.start();}}/***表示产品资源*/classResource{//标号privateintcount;//名字privateStringname;//标志位,false表示没有产品,trur表示生产出了产品privatebooleanflag;/***生产产品*@paramname*/voidset(Stringname){//使用同步块synchronized(this){//判断false是否为true,如果是true说明有产品了,那么生产者线程应该等待if(flag){try{System.out.println("有产品了--"+Thread.currentThread().getName()+"生产等待");this.wait();}catch(InterruptedExceptione){e.printStackTrace();}}//走到这一步,说明没有产品,可以生产this.name=name;System.out.println(Thread.currentThread().getName()+"--"+this.name+(++count)+"生产");//设置产品标志为true,表示有产品了,可以消费了flag=true;//这里唤醒所有线程,有可能还会唤醒生产者this.notifyAll();}}/***消费产品*/voidget(){synchronized(this){//判断flag是否为false,如果是fasle说明有产品了,那么消费者线程应该等待if(!flag){try{System.out.println("没产品了--"+Thread.currentThread().getName()+"消费等待");this.wait();}catch(InterruptedExceptione){e.printStackTrace();}}//走到这一步,说明有产品,可以消费System.out.println(Thread.currentThread().getName()+"--"+this.name+count+"消费");//设置产品标志为false,表示没有产品了,可以生产了flag=false;//这里唤醒所有线程,有可能还会唤醒消费者this.notifyAll();}}}/***生产者线程*/classProducerimplementsRunnable{privateResourceresource;publicProducer(Resourceresource){this.resource=resource;}@Overridepublicvoidrun(){while(true){try{Thread.sleep(500);}catch(InterruptedExceptione){e.printStackTrace();}//调用生产方法resource.set("面包");}}}/***消费者线程*/classConsumerimplementsRunnable{privateResourceresource;publicConsumer(Resourceresource){this.resource=resource;}@Overridepublicvoidrun(){while(true){try{Thread.sleep(500);}catch(InterruptedExceptione){e.printStackTrace();}//调用消费方法resource.get();}}}
3.2 Lock实现生产消费
publicclassLockPC{publicstaticvoidmain(String[]args){Resourceresource=newResource();Producerproducer=newProducer(resource);Consumerconsumer=newConsumer(resource);ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(4,4,0,TimeUnit.SECONDS,newLinkedBlockingQueue<>(),Executors.defaultThreadFactory(),newThreadPoolExecutor.AbortPolicy());threadPoolExecutor.execute(producer);threadPoolExecutor.execute(consumer);threadPoolExecutor.execute(producer);threadPoolExecutor.execute(consumer);threadPoolExecutor.shutdown();}/***产品资源*/staticclassResource{privateStringname;privateintcount;booleanflag;//获取lock锁,lock锁的获取和释放需要代码手动操作ReentrantLocklock=newReentrantLock();//从lock锁获取一个condition,用于生产者线程在此等待和唤醒Conditionproducer=lock.newCondition();//从lock锁获取一个condition,用于消费者线程在此等待和唤醒Conditionconsumer=lock.newCondition();voidset(Stringname){//获得锁lock.lock();try{while(flag){try{System.out.println("有产品了--"+Thread.currentThread().getName()+"生产等待");//该生产者线程,在producer上等待producer.await();}catch(InterruptedExceptione){e.printStackTrace();}}++count;this.name=name;System.out.println(Thread.currentThread().getName()+"生产了"+this.name++count);flag=!flag;//唤醒在consumer上等待的消费者线程,这样不会唤醒等待的生产者consumer.signalAll();}finally{//释放锁lock.unlock();}}voidget(){lock.lock();try{while(!flag){try{System.out.println("没产品了--"+Thread.currentThread().getName()+"消费等待");//该消费者线程,在consumer上等待consumer.await();}catch(InterruptedExceptione){e.printStackTrace();}}System.out.println(Thread.currentThread().getName()+"消费了"+this.name+count);flag=!flag;//唤醒在producer监视器上等待的生产者线程,这样不会唤醒等待的消费者producer.signalAll();}finally{lock.unlock();}}}/***消费者*/staticclassConsumerimplementsRunnable{privateResourceresource;publicConsumer(Resourceresource){this.resource=resource;}@Overridepublicvoidrun(){while(true){try{Thread.sleep(500);}catch(InterruptedExceptione){e.printStackTrace();}//调用消费方法resource.get();}}}/***生产者*/staticclassProducerimplementsRunnable{privateResourceresource;publicProducer(Resourceresource){this.resource=resource;}@Overridepublicvoidrun(){while(true){try{Thread.sleep(500);}catch(InterruptedExceptione){e.printStackTrace();}//调用生产方法resource.set("面包");}}}}
3.3 Lock实现生产消费和产品仓库的功能
在上一个案例中,生产者线程生产的产品必须马上被消费,在下面的案例中,生产的产品可以被累积,使用Lock实现了多生产者多消费者的案例,同时实现了商品仓库的功能,使得生产者可以连续生产,消费者可以连续消费。
publicclassLockPCWhithStorage{publicstaticvoidmain(String[]args){Resourceresource=newResource();Consumerc=newConsumer(resource);Producerp=newProducer(resource);ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(4,4,0,TimeUnit.SECONDS,newLinkedBlockingQueue<>(),Executors.defaultThreadFactory(),newThreadPoolExecutor.AbortPolicy());threadPoolExecutor.execute(p);threadPoolExecutor.execute(p);threadPoolExecutor.execute(c);threadPoolExecutor.execute(c);threadPoolExecutor.shutdown();}staticclassResource{//获得锁对象finalLocklock=newReentrantLock();//获得生产监视器finalConditionnotFull=lock.newCondition();//获得消费监视器finalConditionnotEmpty=lock.newCondition();//定义一个数组,当作仓库,用来存放商品finalObject[]items=newObject[100];/**取消了falg标志,取而代之的是用仓库的数量来判断是否应该阻塞或者唤醒对应的线程*putpur:生产者使用的下标索引;*takeptr:消费者下标索引;*count:用计数器,记录商品个数*/intputptr,takeptr,count;//生产方法publicvoidput(Objectx){//获得锁lock.lock();try{//如果商品个数等于数组的长度,商品满了将生产将等待消费者消费while(count==items.length){try{System.out.println("仓库满了--"+Thread.currentThread().getName()+"生产等待");notFull.await();}catch(InterruptedExceptione){e.printStackTrace();}}//生产索引对应的商品,放在仓库中try{Thread.sleep(100);}catch(InterruptedExceptione){e.printStackTrace();}items[putptr]=x;//如果下标索引加一等于数组长度,将索引重置为0,重新开始if(++putptr==items.length){putptr=0;}//商品数加1++count;System.out.println(Thread.currentThread().getName()+"生产了"+x+"共有"+count+"个");//唤醒消费线程notEmpty.signal();}finally{//释放锁lock.unlock();}}//消费方法publicObjecttake(){//获得锁lock.lock();try{//如果商品个数为0.消费等待while(count==0){try{System.out.println("仓库空了--"+Thread.currentThread().getName()+"消费等待");notEmpty.await();}catch(InterruptedExceptione){e.printStackTrace();}}//获得对应索引的商品,表示消费了try{Thread.sleep(100);}catch(InterruptedExceptione){e.printStackTrace();}Objectx=items[takeptr];//如果索引加一等于数组长度,表示取走了最后一个商品,消费完毕if(++takeptr==items.length)//消费索引归零,重新开始消费{takeptr=0;}//商品数减一--count;System.out.println(Thread.currentThread().getName()+"消费了"+x+"还剩"+count+"个");//唤醒生产线程notFull.signal();//返回消费的商品returnx;}finally{//释放锁lock.unlock();}}}staticclassProducerimplementsRunnable{privateResourceresource;publicProducer(Resourceresource){this.resource=resource;}@Overridepublicvoidrun(){while(true){resource.put("面包");}}}staticclassConsumerimplementsRunnable{privateResourceresource;publicConsumer(Resourceresource){this.resource=resource;}@Overridepublicvoidrun(){while(true){resource.take();}}}}
3.4 输出ABCABC
编写一个程序,开启3 个线程,这三个线程的name分别为A、B、C,每个线程将自己的名字 在屏幕上打印10 遍,要求输出的结果必须按名称顺序显示。
publicclassPrintABC{ReentrantLocklock=newReentrantLock();ConditionA=lock.newCondition();ConditionB=lock.newCondition();ConditionC=lock.newCondition();privateintflag=1;publicvoidprintA(inti){lock.lock();try{while(flag!=1){try{A.await();}catch(InterruptedExceptione){e.printStackTrace();}}System.out.println(Thread.currentThread().getName()+""+i);flag=2;B.signal();}finally{lock.unlock();}}publicvoidprintB(inti){lock.lock();try{while(flag!=2){try{B.await();}catch(InterruptedExceptione){e.printStackTrace();}}System.out.println(Thread.currentThread().getName()+""+i);flag=3;C.signal();}finally{lock.unlock();}}publicvoidprintC(inti){lock.lock();try{while(flag!=3){try{C.await();}catch(InterruptedExceptione){e.printStackTrace();}}System.out.println(Thread.currentThread().getName()+""+i);System.out.println("---------------------");flag=1;A.signal();}finally{lock.unlock();}}publicstaticvoidmain(String[]args){PrintABCtestABC=newPrintABC();ThreadA=newThread(newA(testABC),"A");ThreadB=newThread(newB(testABC),"B");ThreadC=newThread(newC(testABC),"C");A.start();B.start();C.start();}staticclassAimplementsRunnable{privatePrintABCtestABC;publicA(PrintABCtestABC){this.testABC=testABC;}@Overridepublicvoidrun(){for(inti=0;i<10;i++){testABC.printA(i+1);}}}staticclassBimplementsRunnable{privatePrintABCtestABC;publicB(PrintABCtestABC){this.testABC=testABC;}@Overridepublicvoidrun(){for(inti=0;i<10;i++){testABC.printB(i+1);}}}staticclassCimplementsRunnable{privatePrintABCtestABC;publicC(PrintABCtestABC){this.testABC=testABC;}@Overridepublicvoidrun(){for(inti=0;i<10;i++){testABC.printC(i+1);}}}}
3.5 使用高级阻塞队列实现生产消费
前面的案例都是使用的比较原始的方法,适合初学者,这里使用高级阻塞队列实现通知等待/生产消费。
publicclassBlockingQueuePC{//定义一个阻塞队列staticLinkedBlockingQueue<Object>objects=newLinkedBlockingQueue<>();publicstaticvoidmain(String[]args){Resourceresource=newResource("面包");Consumerconsumer=newConsumer();Producerproducer=newProducer(resource);ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(5,5,0,TimeUnit.SECONDS,newLinkedBlockingQueue<>(),Executors.defaultThreadFactory(),newThreadPoolExecutor.AbortPolicy());//启动多个生产者\消费者线程threadPoolExecutor.execute(producer);threadPoolExecutor.execute(consumer);threadPoolExecutor.execute(producer);threadPoolExecutor.execute(consumer);threadPoolExecutor.execute(consumer);threadPoolExecutor.shutdown();}/***消费者*/staticclassConsumerimplementsRunnable{publicObjecttake()throwsInterruptedException{returnobjects.take();}@Overridepublicvoidrun(){while(true){try{Objecttake=take();System.out.println(Thread.currentThread().getName()+"消费了"+take+",还剩"+objects.size());}catch(InterruptedExceptione){e.printStackTrace();}}}}/***生产者*/staticclassProducerimplementsRunnable{Resourceresource;publicProducer(Resourceresource){this.resource=resource;}publicvoidput(Objecto)throwsInterruptedException{objects.put(o);}@Overridepublicvoidrun(){while(true){try{put(resource);System.out.println(Thread.currentThread().getName()+"生产了"+resource+",还剩"+objects.size());}catch(InterruptedExceptione){e.printStackTrace();}}}}/***产品/资源*/staticclassResource{Stringname;publicResource(Stringname){this.name=name;}@OverridepublicStringtoString(){returnname;}}}
我们的代码非常简单,并没有使用任何同步,那么如果做到线程安全和通信的呢,实际上这些活都被阻塞队列帮我们做了,对比上一个手动实现的生产消费+仓库案例,这个是不是简单得多,效率也更高呢?这些都是属于JUC包中的内容,想要学好Java并发,JUC是所有人绕不过去的坎!
3.6 使用Semaphore信号量
Semaphore是JDK1.5出现的类,翻译成字面意思为“信号量”,属于JUC,是synchronized的加强版,可以用来控制线程的并发数量。 Semaphore可以控制同时访问共享资源的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。它通过协调各个线程,以保证合理的使用公共资源。相比synchronized和lock锁一次只能允许一个线程访问资源,功能更加强大。 案例:若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现:
publicclassWorkerextendsThread{privateintnum;privateSemaphoresemaphore;publicWorker(intnum,Semaphoresemaphore){this.num=num;this.semaphore=semaphore;}publicstaticvoidmain(String[]args){//工人数intN=8;//机器数目许可=5Semaphoresemaphore=newSemaphore(5);for(inti=0;i<N;i++)newWorker(i,semaphore).start();}@Overridepublicvoidrun(){try{//获取permits个许可,若无许可能够获得,则会一直等待,直到获得许可。semaphore.acquire();System.out.println("工人"+this.num+"占用一个机器在生产...");Thread.sleep(2000);System.out.println("工人"+this.num+"释放出机器");//释放许可。注意,在释放许可之前,必须先获获得许可。semaphore.release();}catch(InterruptedExceptione){e.printStackTrace();}}}
3.7 使用管道流
对于Piped类型的流,必须先要进行绑定,即调用connect方法,如果没有绑定,那么将会抛出异常。管倒流通信类似于聊天室。用的比较少
publicclassPipeTest{publicstaticvoidmain(String[]args)throwsIOException{PipedWriterpipedWriter=newPipedWriter();PipedReaderpipedReader=newPipedReader();pipedWriter.connect(pipedReader);ThreadprintThread=newThread(newPrint(pipedReader),"PrintThread");printThread.start();intreceive=0;try{while((receive=System.in.read())!=-1){pipedWriter.write(receive);}}finally{pipedWriter.close();}}staticclassPrintimplementsRunnable{privatePipedReaderin;publicPrint(PipedReaderin){this.in=in;}@Overridepublicvoidrun(){intreceive=0;try{while((receive=in.read())!=-1){System.out.print((char)receive);}}catch(IOExceptione){}}}}
参考资料:
《实战Java高并发程序设计(第2版) 》
《Java并发编程的艺术》
《Java并发编程之美》
如果有什么不懂或者需要交流,可以留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!