本文目錄一覽:
JAVA怎麼編多個生產者多個消費者代碼啊
public class ProduceConsumerDemo {
public static void main(String[] args) {
// 1.創建資源
Resource resource = new Resource();
// 2.創建兩個任務
Producer producer = new Producer(resource);
Consumer consumer = new Consumer(resource);
// 3.創建線程
/*
* 多生產多消費產生的問題:重複生產、重複消費
*/
Thread thread0 = new Thread(producer);
Thread thread1 = new Thread(producer);
thread0.setName(“生產者(NO0)”);
thread1.setName(“生產者(NO1)”);
Thread thread2 = new Thread(consumer);
Thread thread3 = new Thread(consumer);
thread2.setName(“消費者(NO2)”);
thread3.setName(“消費者(NO3)”);
thread0.start();
thread1.start();
thread2.start();
thread3.start();
}
}
class Resource {
private String name;
private int count = 1;
// 定義標記
private boolean flag;
// 提供給商品賦值的方法
public synchronized void setName(String name) {// thread0, thread1在這裡運行
while (flag)// 判斷標記為true,執行wait等待,為false則生產
/*
* 這裡使用while,而不使用if的理由如下:
*
* thread0有可能第二次也搶到鎖的執行權,判斷為真,則有麵包不生產,所以接下來執行等待,此時thread0在線程池中。
* 接下來活的線程有3個(除了thread0),這三個線程都有可能獲取到執行權.
* 假設thread1獲得了執行權,判斷為真,則有麵包不生產,執行等待。此時thread1又進入到了線程池中。
* 接下來有兩個活的線程thread2和thread3。 假設thread2又搶到了執行權,所以程序轉到了消費get處……
*/
try {
this.wait();//這裡wait語句必須包含在try/catch塊中,拋出異常。
} catch (InterruptedException e) {
e.printStackTrace();
}
this.name = name + count;// 第一個麵包
count++;// 2
System.out.println(Thread.currentThread().getName() + this.name);// thread0線程生產了麵包1
// 生產完畢,將標記改成true.
flag = true;// thread0第一次生產完麵包以後,將標記改為真,表示有麵包了
// 喚醒消費者(這裡使用notifyAll而不使用notify的原因在下面)
this.notifyAll();// 第一次在這裡是空喚醒,沒有意義
}
/*
* 通過同步,解決了沒生產就消費的問題
* 生產完以後,生產者釋放了this鎖,此時,生產者和消費者同時去搶鎖,又是生產者搶到了鎖,所以就出現了一直生產的情況。
* 與“生產一個就消費一個的需求不符合” 等待喚醒機制 wait();該方法可以使線程處於凍結狀態,並將線程臨時存儲到線程池
* notify();喚醒指定線程池中的任意一個線程。 notifyAll();喚醒指定線程池中的所有線程
* 這些方法必須使用在同步函數中,因為他們用來操作同步鎖上的線程上的狀態的。
* 在使用這些方法時候,必須標識他們所屬於的鎖,標識方式就是鎖對象.wait(); 鎖對象.notify(); 鎖對象.notifyAll();
* 相同鎖的notify()可以獲取相同鎖的wait();
*/
public synchronized void getName() {// thread2,thread3在這裡運行
while (!flag)
/*
* ……接着上面的程序執行分析 thread2拿到鎖獲取執行權之後,判斷!flag為假,則不等待,直接消費麵包1,輸出一次.
* 消費完成之後將flag改為假 接下來又喚醒了thread0或者thread1生產者中的一個
* 假設又喚醒了thread0線程,現在活的線程有thread0,thread2,thread3三個線程
* 假設接下來thread2又搶到了執行權,判斷!flag為真,沒麵包了,停止消費,所以thread2執行等待.
* 此時活着的線程有thread0和thread3。
* 假設thread3得到了執行權,拿到鎖之後進來執行等待,此時活着的線程只有thread0.
* 所以thread0隻能搶到執行權之後,生產麵包2,將標記改為true告訴消費者有麵包可以消費了。
* 接下來執行notify喚醒,此時喚醒休眠中的3個線程中的任何一個都有可能。
* 如果喚醒了消費者thread2或者thread3中的任何一個,程序都是正常。如果此時喚醒thread1則不正常。
* 如果喚醒了thread1,此時活着的線程有thread0和thread1兩個線程。
* 假設thread0又獲得了執行權,判讀為真有麵包,則又一次執行等待。
* 接下來只有thread1線程有執行權(此時沒有判斷標記直接生產了,出錯了),所以又生產了麵包3。 在這個過程中,麵包2沒有被消費。
* 這就是連續生產和消費容易出現的問題。
*
* 原因:被喚醒的線程沒有判斷標記就開始執行了,導致了重複的生產和消費發生。
*
* 解決:被喚醒的線程必須判斷標記,使用while循環標記,而不使用if判斷的理由。
*
* 但是接下來會出現死鎖,原因在於:
* 上面的程序中thread0在執行notify的時候喚醒了thread1,而此時thread2和thread3兩個消費者線程都處於等待狀態
* thread1在執行while判斷語句之後判斷為真,則執行等待,此時所有的線程都處於凍結等待狀態了。
*
* 原因:本方線程在執行喚醒的時候又一次喚醒了本方線程,而本方線程循環判斷標記又繼續等待,而導致所有的線程都等待。
*
* 解決:本方線程喚醒對方線程, 可以使用notifyAll()方法
* 喚醒之後,既有本方,又有對方,但是本方線程判斷標記之後,會繼續等待,這樣就有對方線程在執行。
*/
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + this.name);
// 將標記改為false
flag = false;
// 喚醒生產者
this.notify();
}
}
// 生產者
class Producer implements Runnable {
private Resource resource;
public Producer(Resource resource) {
this.resource = resource;
}
public void run() {
while (true) {
resource.setName(“麵包”);
}
}
}
// 消費者
class Consumer implements Runnable {
private Resource resource;
public Consumer(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
while (true) {
resource.getName();
}
}
}
JAVA模擬生產者與消費者實例
使用的生產者和消費者模型具有如下特點:
(1)本實驗的多個緩衝區不是環形循環的,也不要求按順序訪問。生產者可以把產品放到目前某一個空緩衝區中。
(2)消費者只消費指定生產者的產品。
(3)在測試用例文件中指定了所有的生產和消費的需求,只有當共享緩衝區的數據滿足了所有關於它的消費需求後,此共享緩衝區才可以作為空閑空間允許新的生產者使用。
(4)本實驗在為生產者分配緩衝區時各生產者間必須互斥,此後各個生產者的具體生產活動可以並發。而消費者之間只有在對同一產品進行消費時才需要互斥,同時它們在消費過程結束時需要判斷該消費對象是否已經消費完畢並清除該產品。
Windows
用來實現同步和互斥的實體。在Windows
中,常見的同步對象有:信號量(Semaphore)、
互斥量(Mutex)、臨界段(CriticalSection)和事件(Event)等。本程序中用到了前三個。使用這些對象都分
為三個步驟,一是創建或者初始化:接着請求該同步對象,隨即進入臨界區,這一步對應於互斥量的
上鎖;最後釋放該同步對象,這對應於互斥量的解鎖。這些同步對象在一個線程中創建,在其他線程
中都可以使用,從而實現同步互斥。當然,在進程間使用這些同步對象實現同步的方法是類似的。
1.用鎖操作原語實現互斥
為解決進程互斥進人臨界區的問題,可為每類臨界區設置一把鎖,該鎖有打開和關閉兩種狀態,進程執行臨界區程序的操作按下列步驟進行:
①關鎖。先檢查鎖的狀態,如為關閉狀態,則等待其打開;如已打開了,則將其關閉,繼續執行步驟②的操作。
②執行臨界區程序。
③開鎖。將鎖打開,退出臨界區。
2.信號量及WAIT,SIGNAL操作原語
信號量的初值可以由系統根據資源情況和使用需要來確定。在初始條件下信號量的指針項可以置為0,表示隊列為空。信號量在使用過程中它的值是可變的,但只能由WAIT,SIGNAL操作來改變。設信號量為S,對S的WAIT操作記為WAIT(S),對它的SIGNAL操作記為SIGNAL(S)。
WAIT(S):順序執行以下兩個動作:
①信號量的值減1,即S=S-1;
②如果S≥0,則該進程繼續執行;
如果
S(0,則把該進程的狀態置為阻塞態,把相應的WAITCB連人該信號量隊列的末尾,並放棄處理機,進行等待(直至其它進程在S上執行SIGNAL操作,把它釋放出來為止)。
SIGNAL(S):順序執行以下兩個動作
①S值加
1,即
S=S+1;
②如果S)0,則該進程繼續運行;
如果S(0則釋放信號量隊列上的第一個PCB(既信號量指針項所指向的PCB)所對應的進程(把阻塞態改為就緒態),執行SIGNAL操作的進程繼續運行。
在具體實現時注意,WAIT,SIGNAL操作都應作為一個整體實施,不允許分割或相互穿插執行。也就是說,WAIT,SIGNAL操作各自都好像對應一條指令,需要不間斷地做下去,否則會造成混亂。
從物理概念上講,信號量S)時,S值表示可用資源的數量。執行一次WAIT操作意味着請求分配一個單位資源,因此S值減1;當S0時,表示已無可用資源,請求者必須等待別的進程釋放了該類資源,它才能運行下去。所以它要排隊。而執行一次SIGNAL操作意味着釋放一個單位資源,因此S值加1;若S(0時,表示有某些進程正在等待該資源,因而要把隊列頭上的進程喚醒,釋放資源的進程總是可以運行下去的。
—————
/**
*
生產者
*
*/
public
class
Producer
implements
Runnable{
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Producer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf){
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
}
public
void
run(){
while(true){
empty.p();
mutex.p();
System.out.println(name+”
inserts
a
new
product
into
“+buf.nextEmptyIndex);
buf.nextEmptyIndex
=
(buf.nextEmptyIndex+1)%buf.size;
mutex.v();
full.v();
try
{
Thread.sleep(1000);
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
}
—————
/**
*
消費者
*
*/
public
class
Customer
implements
Runnable{
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Customer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf){
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
}
public
void
run(){
while(true){
full.p();
mutex.p();
System.out.println(name+”
gets
a
product
from
“+buf.nextFullIndex);
buf.nextFullIndex
=
(buf.nextFullIndex+1)%buf.size;
mutex.v();
empty.v();
try
{
Thread.sleep(1000);
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
}
————————-
/**
*
緩衝區
*
*/
public
class
Buffer{
public
Buffer(int
size,int
nextEmpty,int
nextFull){
this.nextEmptyIndex
=
nextEmpty;
this.nextFullIndex
=
nextFull;
this.size
=
size;
}
public
int
size;
public
int
nextEmptyIndex;
public
int
nextFullIndex;
}
—————–
/**
*
此類用來模擬信號量
*
*/
public
class
Semaphore{
private
int
semValue;
public
Semaphore(int
semValue){
this.semValue
=
semValue;
}
public
synchronized
void
p(){
semValue–;
if(semValue0){
try
{
this.wait();
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
public
synchronized
void
v(){
semValue++;
if(semValue=0){
this.notify();
}
}
}
————————
public
class
Test
extends
Thread
{
public
static
void
main(String[]
args)
{
Buffer
bf=new
Buffer(10,0,0);
Semaphore
mutex=new
Semaphore(1);
Semaphore
full=new
Semaphore(0);
Semaphore
empty=new
Semaphore(10);
//new
Thread(new
Producer(“p001”,mutex,full,empty,bf)).start();
Producer
p=new
Producer(“p001”,mutex,full,empty,bf);
new
Thread(new
Producer(“p002”,mutex,full,empty,bf)).start();
new
Thread(new
Producer(“p003”,mutex,full,empty,bf)).start();
new
Thread(new
Producer(“p004”,mutex,full,empty,bf)).start();
new
Thread(new
Producer(“p005”,mutex,full,empty,bf)).start();
try{
sleep(3000);
}
catch(Exception
ex)
{
ex.printStackTrace();
}
new
Thread(new
Customer(“c001”,mutex,full,empty,bf)).start();
new
Thread(new
Customer(“c002”,mutex,full,empty,bf)).start();
new
Thread(new
Customer(“c003”,mutex,full,empty,bf)).start();
new
Thread(new
Customer(“c004”,mutex,full,empty,bf)).start();
new
Thread(new
Customer(“c005”,mutex,full,empty,bf)).start();
}
}
——————————————–
BlockingQueue 使用(生產者-消費者)
java.util.concurrent包中的Java BlockingQueue接口表示一個線程安全的隊列,可以放入並獲取實例。
在這篇文章中,我會告訴你如何使用這個BlockingQueue。
本文將不討論如何在Java中實現BlockingQueue。如果您對此感興趣,在我的偏理論的 Java並發教程 中有一個關於阻塞隊列的文章。
BlockingQueue通常用於使線程產生對象,而另一線程則使用該對象。這是一張闡明這一原理的圖表。
生產線程將持續生產新對象並將它們插入隊列,直到隊列達到它可以包含的上限。換句話說,這是極限。如果阻塞隊列達到其上限,則會在嘗試插入新對象時阻塞生產線程。在消耗線程將對象帶出隊列之前,它一直處於阻塞狀態。
消費線程不斷將對象從阻塞隊列中取出,並對其進行處理。如果消費線程試圖將對象從空隊列中取出,則消費線程將被阻塞,直到生成的線程將對象放入隊列。
BlockingQueue有4種不同的方法來插入、刪除和檢查隊列中的元素。每一組方法的行為都是不同的,以防被請求的操作不能立即執行。下面是這些方法的一個表:
這四種不同的行為方式意思
無法將null插入到BlockingQueue中。如果嘗試插入null,則BlockingQueue將引發NullPointerException。
也可以訪問BlockingQueue中的所有元素,而不僅僅是開始和結束處的元素。例如,假設你需要處理隊列中的一個對象,但你的應用程序決定不處理它。然後你可以調用remove(o)刪除隊列中的特定對象。但是,這並不是非常有效,所以除非你真的需要,否則不應該使用這些Collection方法
由於BlockingQueue是一個接口,因此您需要使用它的一個實現來使用它。java.util.concurrent包具有以下BlockingQueue接口(在Java 6中)的實現:
這是一個Java BlockingQueue示例。該示例使用BlockingQueue接口的ArrayBlockingQueue實現。
首先,BlockingQueueExample類在不同的線程中啟動生產者和消費者。生產者將字符串插入共享BlockingQueue中,消費者將它們取出。
這是生產者類。注意它在每個put()調用之間的使用sleep。這將導致消費者在等待隊列中的對象時阻塞。
這是消費者類。
它只是從隊列中取出對象,並將它們打印到System.out。
[原文]( )
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/279067.html