java生產者消費者,java生產者消費者模式

本文目錄一覽:

JAVA怎麼編多個生產者多個消費者代碼啊

public class ProduceConsumerDemo {

public static void main(String[] args) {

// 1.創建資源

Resource resource = new Resource();

// 2.創建兩個任務

Producer producer = new Producer(resource);

Consumer consumer = new Consumer(resource);

// 3.創建線程

/*

* 多生產多消費產生的問題:重複生產、重複消費

*/

Thread thread0 = new Thread(producer);

Thread thread1 = new Thread(producer);

thread0.setName(“生產者(NO0)”);

thread1.setName(“生產者(NO1)”);

Thread thread2 = new Thread(consumer);

Thread thread3 = new Thread(consumer);

thread2.setName(“消費者(NO2)”);

thread3.setName(“消費者(NO3)”);

thread0.start();

thread1.start();

thread2.start();

thread3.start();

}

}

class Resource {

private String name;

private int count = 1;

// 定義標記

private boolean flag;

// 提供給商品賦值的方法

public synchronized void setName(String name) {// thread0, thread1在這裡運行

while (flag)// 判斷標記為true,執行wait等待,為false則生產

/*

* 這裡使用while,而不使用if的理由如下:

* thread0有可能第二次也搶到鎖的執行權,判斷為真,則有麵包不生產,所以接下來執行等待,此時thread0在線程池中。

* 接下來活的線程有3個(除了thread0),這三個線程都有可能獲取到執行權.

* 假設thread1獲得了執行權,判斷為真,則有麵包不生產,執行等待。此時thread1又進入到了線程池中。

* 接下來有兩個活的線程thread2和thread3。 假設thread2又搶到了執行權,所以程序轉到了消費get處……

*/

try {

this.wait();//這裡wait語句必須包含在try/catch塊中,拋出異常。

} catch (InterruptedException e) {

e.printStackTrace();

}

this.name = name + count;// 第一個麵包

count++;// 2

System.out.println(Thread.currentThread().getName() + this.name);// thread0線程生產了麵包1

// 生產完畢,將標記改成true.

flag = true;// thread0第一次生產完麵包以後,將標記改為真,表示有麵包了

// 喚醒消費者(這裡使用notifyAll而不使用notify的原因在下面)

this.notifyAll();// 第一次在這裡是空喚醒,沒有意義

}

/*

* 通過同步,解決了沒生產就消費的問題

* 生產完以後,生產者釋放了this鎖,此時,生產者和消費者同時去搶鎖,又是生產者搶到了鎖,所以就出現了一直生產的情況。

* 與「生產一個就消費一個的需求不符合」 等待喚醒機制 wait();該方法可以使線程處於凍結狀態,並將線程臨時存儲到線程池

* notify();喚醒指定線程池中的任意一個線程。 notifyAll();喚醒指定線程池中的所有線程

* 這些方法必須使用在同步函數中,因為他們用來操作同步鎖上的線程上的狀態的。

* 在使用這些方法時候,必須標識他們所屬於的鎖,標識方式就是鎖對象.wait(); 鎖對象.notify(); 鎖對象.notifyAll();

* 相同鎖的notify()可以獲取相同鎖的wait();

*/

public synchronized void getName() {// thread2,thread3在這裡運行

while (!flag)

/*

* ……接著上面的程序執行分析 thread2拿到鎖獲取執行權之後,判斷!flag為假,則不等待,直接消費麵包1,輸出一次.

* 消費完成之後將flag改為假 接下來又喚醒了thread0或者thread1生產者中的一個

* 假設又喚醒了thread0線程,現在活的線程有thread0,thread2,thread3三個線程

* 假設接下來thread2又搶到了執行權,判斷!flag為真,沒麵包了,停止消費,所以thread2執行等待.

* 此時活著的線程有thread0和thread3。

* 假設thread3得到了執行權,拿到鎖之後進來執行等待,此時活著的線程只有thread0.

* 所以thread0隻能搶到執行權之後,生產麵包2,將標記改為true告訴消費者有麵包可以消費了。

* 接下來執行notify喚醒,此時喚醒休眠中的3個線程中的任何一個都有可能。

* 如果喚醒了消費者thread2或者thread3中的任何一個,程序都是正常。如果此時喚醒thread1則不正常。

* 如果喚醒了thread1,此時活著的線程有thread0和thread1兩個線程。

* 假設thread0又獲得了執行權,判讀為真有麵包,則又一次執行等待。

* 接下來只有thread1線程有執行權(此時沒有判斷標記直接生產了,出錯了),所以又生產了麵包3。 在這個過程中,麵包2沒有被消費。

* 這就是連續生產和消費容易出現的問題。

* 原因:被喚醒的線程沒有判斷標記就開始執行了,導致了重複的生產和消費發生。

* 解決:被喚醒的線程必須判斷標記,使用while循環標記,而不使用if判斷的理由。

* 但是接下來會出現死鎖,原因在於:

* 上面的程序中thread0在執行notify的時候喚醒了thread1,而此時thread2和thread3兩個消費者線程都處於等待狀態

* thread1在執行while判斷語句之後判斷為真,則執行等待,此時所有的線程都處於凍結等待狀態了。

* 原因:本方線程在執行喚醒的時候又一次喚醒了本方線程,而本方線程循環判斷標記又繼續等待,而導致所有的線程都等待。

* 解決:本方線程喚醒對方線程, 可以使用notifyAll()方法

*  喚醒之後,既有本方,又有對方,但是本方線程判斷標記之後,會繼續等待,這樣就有對方線程在執行。

*/

try {

this.wait();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(Thread.currentThread().getName()  + this.name);

// 將標記改為false

flag = false;

// 喚醒生產者

this.notify();

}

}

// 生產者

class Producer implements Runnable {

private Resource resource;

public Producer(Resource resource) {

this.resource = resource;

}

public void run() {

while (true) {

resource.setName(“麵包”);

}

}

}

// 消費者

class Consumer implements Runnable {

private Resource resource;

public Consumer(Resource resource) {

this.resource = resource;

}

@Override

public void run() {

while (true) {

resource.getName();

}

}

}

JAVA模擬生產者與消費者實例

使用的生產者和消費者模型具有如下特點:

(1)本實驗的多個緩衝區不是環形循環的,也不要求按順序訪問。生產者可以把產品放到目前某一個空緩衝區中。

(2)消費者只消費指定生產者的產品。

(3)在測試用例文件中指定了所有的生產和消費的需求,只有當共享緩衝區的數據滿足了所有關於它的消費需求後,此共享緩衝區才可以作為空閑空間允許新的生產者使用。

(4)本實驗在為生產者分配緩衝區時各生產者間必須互斥,此後各個生產者的具體生產活動可以並發。而消費者之間只有在對同一產品進行消費時才需要互斥,同時它們在消費過程結束時需要判斷該消費對象是否已經消費完畢並清除該產品。

Windows

用來實現同步和互斥的實體。在Windows

中,常見的同步對象有:信號量(Semaphore)、

互斥量(Mutex)、臨界段(CriticalSection)和事件(Event)等。本程序中用到了前三個。使用這些對象都分

為三個步驟,一是創建或者初始化:接著請求該同步對象,隨即進入臨界區,這一步對應於互斥量的

上鎖;最後釋放該同步對象,這對應於互斥量的解鎖。這些同步對象在一個線程中創建,在其他線程

中都可以使用,從而實現同步互斥。當然,在進程間使用這些同步對象實現同步的方法是類似的。

1.用鎖操作原語實現互斥

為解決進程互斥進人臨界區的問題,可為每類臨界區設置一把鎖,該鎖有打開和關閉兩種狀態,進程執行臨界區程序的操作按下列步驟進行:

①關鎖。先檢查鎖的狀態,如為關閉狀態,則等待其打開;如已打開了,則將其關閉,繼續執行步驟②的操作。

②執行臨界區程序。

③開鎖。將鎖打開,退出臨界區。

2.信號量及WAIT,SIGNAL操作原語

信號量的初值可以由系統根據資源情況和使用需要來確定。在初始條件下信號量的指針項可以置為0,表示隊列為空。信號量在使用過程中它的值是可變的,但只能由WAIT,SIGNAL操作來改變。設信號量為S,對S的WAIT操作記為WAIT(S),對它的SIGNAL操作記為SIGNAL(S)。

WAIT(S):順序執行以下兩個動作:

①信號量的值減1,即S=S-1;

②如果S≥0,則該進程繼續執行;

如果

S(0,則把該進程的狀態置為阻塞態,把相應的WAITCB連人該信號量隊列的末尾,並放棄處理機,進行等待(直至其它進程在S上執行SIGNAL操作,把它釋放出來為止)。

SIGNAL(S):順序執行以下兩個動作

①S值加

1,即

S=S+1;

②如果S)0,則該進程繼續運行;

如果S(0則釋放信號量隊列上的第一個PCB(既信號量指針項所指向的PCB)所對應的進程(把阻塞態改為就緒態),執行SIGNAL操作的進程繼續運行。

在具體實現時注意,WAIT,SIGNAL操作都應作為一個整體實施,不允許分割或相互穿插執行。也就是說,WAIT,SIGNAL操作各自都好像對應一條指令,需要不間斷地做下去,否則會造成混亂。

從物理概念上講,信號量S)時,S值表示可用資源的數量。執行一次WAIT操作意味著請求分配一個單位資源,因此S值減1;當S0時,表示已無可用資源,請求者必須等待別的進程釋放了該類資源,它才能運行下去。所以它要排隊。而執行一次SIGNAL操作意味著釋放一個單位資源,因此S值加1;若S(0時,表示有某些進程正在等待該資源,因而要把隊列頭上的進程喚醒,釋放資源的進程總是可以運行下去的。

—————

/**

*

生產者

*

*/

public

class

Producer

implements

Runnable{

private

Semaphore

mutex,full,empty;

private

Buffer

buf;

String

name;

public

Producer(String

name,Semaphore

mutex,Semaphore

full,Semaphore

empty,Buffer

buf){

this.mutex

=

mutex;

this.full

=

full;

this.empty

=

empty;

this.buf

=

buf;

this.name

=

name;

}

public

void

run(){

while(true){

empty.p();

mutex.p();

System.out.println(name+”

inserts

a

new

product

into

“+buf.nextEmptyIndex);

buf.nextEmptyIndex

=

(buf.nextEmptyIndex+1)%buf.size;

mutex.v();

full.v();

try

{

Thread.sleep(1000);

}

catch

(InterruptedException

e)

{

e.printStackTrace();

}

}

}

}

—————

/**

*

消費者

*

*/

public

class

Customer

implements

Runnable{

private

Semaphore

mutex,full,empty;

private

Buffer

buf;

String

name;

public

Customer(String

name,Semaphore

mutex,Semaphore

full,Semaphore

empty,Buffer

buf){

this.mutex

=

mutex;

this.full

=

full;

this.empty

=

empty;

this.buf

=

buf;

this.name

=

name;

}

public

void

run(){

while(true){

full.p();

mutex.p();

System.out.println(name+”

gets

a

product

from

“+buf.nextFullIndex);

buf.nextFullIndex

=

(buf.nextFullIndex+1)%buf.size;

mutex.v();

empty.v();

try

{

Thread.sleep(1000);

}

catch

(InterruptedException

e)

{

e.printStackTrace();

}

}

}

}

————————-

/**

*

緩衝區

*

*/

public

class

Buffer{

public

Buffer(int

size,int

nextEmpty,int

nextFull){

this.nextEmptyIndex

=

nextEmpty;

this.nextFullIndex

=

nextFull;

this.size

=

size;

}

public

int

size;

public

int

nextEmptyIndex;

public

int

nextFullIndex;

}

—————–

/**

*

此類用來模擬信號量

*

*/

public

class

Semaphore{

private

int

semValue;

public

Semaphore(int

semValue){

this.semValue

=

semValue;

}

public

synchronized

void

p(){

semValue–;

if(semValue0){

try

{

this.wait();

}

catch

(InterruptedException

e)

{

e.printStackTrace();

}

}

}

public

synchronized

void

v(){

semValue++;

if(semValue=0){

this.notify();

}

}

}

————————

public

class

Test

extends

Thread

{

public

static

void

main(String[]

args)

{

Buffer

bf=new

Buffer(10,0,0);

Semaphore

mutex=new

Semaphore(1);

Semaphore

full=new

Semaphore(0);

Semaphore

empty=new

Semaphore(10);

//new

Thread(new

Producer(“p001”,mutex,full,empty,bf)).start();

Producer

p=new

Producer(“p001”,mutex,full,empty,bf);

new

Thread(new

Producer(“p002”,mutex,full,empty,bf)).start();

new

Thread(new

Producer(“p003”,mutex,full,empty,bf)).start();

new

Thread(new

Producer(“p004”,mutex,full,empty,bf)).start();

new

Thread(new

Producer(“p005”,mutex,full,empty,bf)).start();

try{

sleep(3000);

}

catch(Exception

ex)

{

ex.printStackTrace();

}

new

Thread(new

Customer(“c001”,mutex,full,empty,bf)).start();

new

Thread(new

Customer(“c002”,mutex,full,empty,bf)).start();

new

Thread(new

Customer(“c003”,mutex,full,empty,bf)).start();

new

Thread(new

Customer(“c004”,mutex,full,empty,bf)).start();

new

Thread(new

Customer(“c005”,mutex,full,empty,bf)).start();

}

}

——————————————–

BlockingQueue 使用(生產者-消費者)

java.util.concurrent包中的Java BlockingQueue介面表示一個線程安全的隊列,可以放入並獲取實例。

在這篇文章中,我會告訴你如何使用這個BlockingQueue。

本文將不討論如何在Java中實現BlockingQueue。如果您對此感興趣,在我的偏理論的 Java並發教程 中有一個關於阻塞隊列的文章。

BlockingQueue通常用於使線程產生對象,而另一線程則使用該對象。這是一張闡明這一原理的圖表。

生產線程將持續生產新對象並將它們插入隊列,直到隊列達到它可以包含的上限。換句話說,這是極限。如果阻塞隊列達到其上限,則會在嘗試插入新對象時阻塞生產線程。在消耗線程將對象帶出隊列之前,它一直處於阻塞狀態。

消費線程不斷將對象從阻塞隊列中取出,並對其進行處理。如果消費線程試圖將對象從空隊列中取出,則消費線程將被阻塞,直到生成的線程將對象放入隊列。

BlockingQueue有4種不同的方法來插入、刪除和檢查隊列中的元素。每一組方法的行為都是不同的,以防被請求的操作不能立即執行。下面是這些方法的一個表:

這四種不同的行為方式意思

無法將null插入到BlockingQueue中。如果嘗試插入null,則BlockingQueue將引發NullPointerException。

也可以訪問BlockingQueue中的所有元素,而不僅僅是開始和結束處的元素。例如,假設你需要處理隊列中的一個對象,但你的應用程序決定不處理它。然後你可以調用remove(o)刪除隊列中的特定對象。但是,這並不是非常有效,所以除非你真的需要,否則不應該使用這些Collection方法

由於BlockingQueue是一個介面,因此您需要使用它的一個實現來使用它。java.util.concurrent包具有以下BlockingQueue介面(在Java 6中)的實現:

這是一個Java BlockingQueue示例。該示例使用BlockingQueue介面的ArrayBlockingQueue實現。

首先,BlockingQueueExample類在不同的線程中啟動生產者和消費者。生產者將字元串插入共享BlockingQueue中,消費者將它們取出。

這是生產者類。注意它在每個put()調用之間的使用sleep。這將導致消費者在等待隊列中的對象時阻塞。

這是消費者類。

它只是從隊列中取出對象,並將它們列印到System.out。

[原文]( )

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/279067.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-12-20 15:02
下一篇 2024-12-20 15:02

相關推薦

  • Java JsonPath 效率優化指南

    本篇文章將深入探討Java JsonPath的效率問題,並提供一些優化方案。 一、JsonPath 簡介 JsonPath是一個可用於從JSON數據中獲取信息的庫。它提供了一種DS…

    編程 2025-04-29
  • java client.getacsresponse 編譯報錯解決方法

    java client.getacsresponse 編譯報錯是Java編程過程中常見的錯誤,常見的原因是代碼的語法錯誤、類庫依賴問題和編譯環境的配置問題。下面將從多個方面進行分析…

    編程 2025-04-29
  • Java Bean載入過程

    Java Bean載入過程涉及到類載入器、反射機制和Java虛擬機的執行過程。在本文中,將從這三個方面詳細闡述Java Bean載入的過程。 一、類載入器 類載入器是Java虛擬機…

    編程 2025-04-29
  • Java騰訊雲音視頻對接

    本文旨在從多個方面詳細闡述Java騰訊雲音視頻對接,提供完整的代碼示例。 一、騰訊雲音視頻介紹 騰訊雲音視頻服務(Cloud Tencent Real-Time Communica…

    編程 2025-04-29
  • Java Milvus SearchParam withoutFields用法介紹

    本文將詳細介紹Java Milvus SearchParam withoutFields的相關知識和用法。 一、什麼是Java Milvus SearchParam without…

    編程 2025-04-29
  • Java 8中某一周的周一

    Java 8是Java語言中的一個版本,於2014年3月18日發布。本文將從多個方面對Java 8中某一周的周一進行詳細的闡述。 一、數組處理 Java 8新特性之一是Stream…

    編程 2025-04-29
  • Java判斷字元串是否存在多個

    本文將從以下幾個方面詳細闡述如何使用Java判斷一個字元串中是否存在多個指定字元: 一、字元串遍歷 字元串是Java編程中非常重要的一種數據類型。要判斷字元串中是否存在多個指定字元…

    編程 2025-04-29
  • VSCode為什麼無法運行Java

    解答:VSCode無法運行Java是因為默認情況下,VSCode並沒有集成Java運行環境,需要手動添加Java運行環境或安裝相關插件才能實現Java代碼的編寫、調試和運行。 一、…

    編程 2025-04-29
  • Java任務下發回滾系統的設計與實現

    本文將介紹一個Java任務下發回滾系統的設計與實現。該系統可以用於執行複雜的任務,包括可回滾的任務,及時恢復任務失敗前的狀態。系統使用Java語言進行開發,可以支持多種類型的任務。…

    編程 2025-04-29
  • Java 8 Group By 會影響排序嗎?

    是的,Java 8中的Group By會對排序產生影響。本文將從多個方面探討Group By對排序的影響。 一、Group By的概述 Group By是SQL中的一種常見操作,它…

    編程 2025-04-29

發表回復

登錄後才能評論