java消息隊列,java消息隊列mq的實現原理

本文目錄一覽:

到底什麼是消息隊列?Java中如何實現消息隊列

消息隊列,顧名思義 首先是個隊列。

隊列的操作有入隊和出隊

也就是有一個程序在產生內容然後入隊(生產者)

另一個程序讀取內容,內容出隊(消費者)

這是最最基本的概念。

java中的消息隊列

消息隊列是線程間通訊的手段:

import java.util.*

public class MsgQueue{

private Vector queue = null;

public MsgQueue(){

queue = new Vector();

}

public synchronized void send(Object o)

{

queue.addElement(o);

}

public synchronized Object recv()

{

if(queue.size()==0)

return null;

Object o = queue.firstElement();

queue.removeElementAt(0);//or queue[0] = null can also work

return o;

}

}

因為java中是locked by object的所以添加synchronized 就可以用於線程同步鎖定對象

可以作為多線程處理多任務的存放task的隊列。他的client包括封裝好的task類以及thread類

在JAVA中怎麼實現消息隊列

如下:

public static String do_post(String url, ListNameValuePair name_value_pair) throws IOException {

String body = “{}”;

DefaultHttpClient httpclient = new DefaultHttpClient();

try {

HttpPost httpost = new HttpPost(url);

httpost.setEntity(new UrlEncodedFormEntity(name_value_pair, StandardCharsets.UTF_8));

HttpResponse response = httpclient.execute(httpost);

HttpEntity entity = response.getEntity();

body = EntityUtils.toString(entity);

} finally {

httpclient.getConnectionManager().shutdown();

}

return body;

}

public static String do_get(String url) throws ClientProtocolException, IOException {

String body = “{}”;

DefaultHttpClient httpclient = new DefaultHttpClient();

try {

HttpGet httpget = new HttpGet(url);

HttpResponse response = httpclient.execute(httpget);

HttpEntity entity = response.getEntity();

body = EntityUtils.toString(entity);

} finally {

httpclient.getConnectionManager().shutdown();

}

return body;

}

如何用JAVA實現Linux上的消息隊列功能

下面來說說如何用不用消息隊列來進行進程間的通信,消息隊列與命名管道有很多相似之處。有關命名管道的更多內容可以參閱我的另一篇文章:Linux進程間通信——使用命名管道

一、什麼是消息隊列

消息隊列提供了一種從一個進程向另一個進程發送一個數據塊的方法。 每個數據塊都被認為含有一個類型,接收進程可以獨立地接收含有不同類型的數據結構。我們可以通過發送消息來避免命名管道的同步和阻塞問題。但是消息隊列與命名管道一樣,每個數據塊都有一個最大長度的限制。

Linux用宏MSGMAX和MSGMNB來限制一條消息的最大長度和一個隊列的最大長度。

二、在Linux中使用消息隊列

Linux提供了一系列消息隊列的函數介面來讓我們方便地使用它來實現進程間的通信。它的用法與其他兩個System V PIC機制,即信號量和共享內存相似。

1、msgget函數

該函數用來創建和訪問一個消息隊列。它的原型為:

int msgget(key_t, key, int msgflg);

與其他的IPC機制一樣,程序必須提供一個鍵來命名某個特定的消息隊列。msgflg是一個許可權標誌,表示消息隊列的訪問許可權,它與文件的訪問許可權一樣。msgflg可以與IPC_CREAT做或操作,表示當key所命名的消息隊列不存在時創建一個消息隊列,如果key所命名的消息隊列存在時,IPC_CREAT標誌會被忽略,而只返回一個標識符。

它返回一個以key命名的消息隊列的標識符(非零整數),失敗時返回-1.

2、msgsnd函數

該函數用來把消息添加到消息隊列中。它的原型為:

int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);

msgid是由msgget函數返回的消息隊列標識符。

msg_ptr是一個指向準備發送消息的指針,但是消息的數據結構卻有一定的要求,指針msg_ptr所指向的消息結構一定要是以一個長整型成員變數開始的結構體,接收函數將用這個成員來確定消息的類型。所以消息結構要定義成這樣:

struct my_message{

long int message_type;

/* The data you wish to transfer*/

};

msg_sz是msg_ptr指向的消息的長度,注意是消息的長度,而不是整個結構體的長度,也就是說msg_sz是不包括長整型消息類型成員變數的長度。

msgflg用於控制當前消息隊列滿或隊列消息到達系統範圍的限制時將要發生的事情。

如果調用成功,消息數據的一分副本將被放到消息隊列中,並返回0,失敗時返回-1.

3、msgrcv函數

該函數用來從一個消息隊列獲取消息,它的原型為

int msgrcv(int msgid, void *msg_ptr, size_t msg_st, long int msgtype, int msgflg);

msgid, msg_ptr, msg_st的作用也函數msgsnd函數的一樣。

msgtype可以實現一種簡單的接收優先順序。如果msgtype為0,就獲取隊列中的第一個消息。如果它的值大於零,將獲取具有相同消息類型的第一個信息。如果它小於零,就獲取類型等於或小於msgtype的絕對值的第一個消息。

msgflg用於控制當隊列中沒有相應類型的消息可以接收時將發生的事情。

調用成功時,該函數返回放到接收緩存區中的位元組數,消息被複制到由msg_ptr指向的用戶分配的緩存區中,然後刪除消息隊列中的對應消息。失敗時返回-1.

4、msgctl函數

該函數用來控制消息隊列,它與共享內存的shmctl函數相似,它的原型為:

int msgctl(int msgid, int command, struct msgid_ds *buf);

command是將要採取的動作,它可以取3個值,

IPC_STAT:把msgid_ds結構中的數據設置為消息隊列的當前關聯值,即用消息隊列的當前關聯值覆蓋msgid_ds的值。

IPC_SET:如果進程有足夠的許可權,就把消息列隊的當前關聯值設置為msgid_ds結構中給出的值

IPC_RMID:刪除消息隊列

buf是指向msgid_ds結構的指針,它指向消息隊列模式和訪問許可權的結構。msgid_ds結構至少包括以下成員:

struct msgid_ds

{

uid_t shm_perm.uid;

uid_t shm_perm.gid;

mode_t shm_perm.mode;

};

成功時返回0,失敗時返回-1.

三、使用消息隊列進行進程間通信

馬不停蹄,介紹完消息隊列的定義和可使用的介面之後,我們來看看它是怎麼讓進程進行通信的。由於可以讓不相關的進程進行行通信,所以我們在這裡將會編寫兩個程序,msgreceive和msgsned來表示接收和發送信息。根據正常的情況,我們允許兩個程序都可以創建消息,但只有接收者在接收完最後一個消息之後,它才把它刪除。

接收信息的程序源文件為msgreceive.c的源代碼為:

#include unistd.h

#include stdlib.h

#include stdio.h

#include string.h

#include errno.h

#include sys/msg.h

struct msg_st

{

long int msg_type;

char text[BUFSIZ];

};

int main()

{

int running = 1;

int msgid = -1;

struct msg_st data;

long int msgtype = 0; //注意1

//建立消息隊列

msgid = msgget((key_t)1234, 0666 | IPC_CREAT);

if(msgid == -1)

{

fprintf(stderr, “msgget failed with error: %d\n”, errno);

exit(EXIT_FAILURE);

}

//從隊列中獲取消息,直到遇到end消息為止

while(running)

{

if(msgrcv(msgid, (void*)data, BUFSIZ, msgtype, 0) == -1)

{

fprintf(stderr, “msgrcv failed with errno: %d\n”, errno);

exit(EXIT_FAILURE);

}

printf(“You wrote: %s\n”,data.text);

//遇到end結束

if(strncmp(data.text, “end”, 3) == 0)

running = 0;

}

//刪除消息隊列

if(msgctl(msgid, IPC_RMID, 0) == -1)

{

fprintf(stderr, “msgctl(IPC_RMID) failed\n”);

exit(EXIT_FAILURE);

}

exit(EXIT_SUCCESS);

}

發送信息的程序的源文件msgsend.c的源代碼為:

#include unistd.h

#include stdlib.h

#include stdio.h

#include string.h

#include sys/msg.h

#include errno.h

#define MAX_TEXT 512

struct msg_st

{

long int msg_type;

char text[MAX_TEXT];

};

int main()

{

int running = 1;

struct msg_st data;

char buffer[BUFSIZ];

int msgid = -1;

//建立消息隊列

msgid = msgget((key_t)1234, 0666 | IPC_CREAT);

if(msgid == -1)

{

fprintf(stderr, “msgget failed with error: %d\n”, errno);

exit(EXIT_FAILURE);

}

//向消息隊列中寫消息,直到寫入end

while(running)

{

//輸入數據

printf(“Enter some text: “);

fgets(buffer, BUFSIZ, stdin);

data.msg_type = 1; //注意2

strcpy(data.text, buffer);

//向隊列發送數據

if(msgsnd(msgid, (void*)data, MAX_TEXT, 0) == -1)

{

fprintf(stderr, “msgsnd failed\n”);

exit(EXIT_FAILURE);

}

//輸入end結束輸入

if(strncmp(buffer, “end”, 3) == 0)

running = 0;

sleep(1);

}

exit(EXIT_SUCCESS);

}

轉載僅供參考,版權屬於原作者。祝你愉快,滿意請採納哦

Java開發中消息隊列和rpc框架都是做什麼的?

一,消息隊列服務一般用於設計多系統之間的信息傳輸,一般這種傳輸不需要對方對數據做出回應。它最常見的方式是構建非同步的生產者-消費者模式。我們在系統開發中,有些業務並不需要及時返回結果,我們可以把這些操作放到隊列中,然後另起一個消費者去處理它。比如日誌,資料庫非同步更新。

二,rpc一般是用於伺服器與伺服器進程之間通信,這種通信有請求和應答。它是建立在底層的socket通信之上的。封裝為rpc之後,更加方便建立通信。就像在同一個進程中調用對方的方法一樣。它本地的方法名一般和請求到達的伺服器的方法名一一對應。這樣可以更好的把模塊劃分。所以它是應對分散式而生的。比如一個網站,一開始可能所有的服務在一個進程中,但是隨著業務的增長,一個進程處理不過來,這時就需要把業務拆分成多個,分部到不同的機器上去。

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/236593.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-12-12 12:00
下一篇 2024-12-12 12:00

相關推薦

  • java client.getacsresponse 編譯報錯解決方法

    java client.getacsresponse 編譯報錯是Java編程過程中常見的錯誤,常見的原因是代碼的語法錯誤、類庫依賴問題和編譯環境的配置問題。下面將從多個方面進行分析…

    編程 2025-04-29
  • Java JsonPath 效率優化指南

    本篇文章將深入探討Java JsonPath的效率問題,並提供一些優化方案。 一、JsonPath 簡介 JsonPath是一個可用於從JSON數據中獲取信息的庫。它提供了一種DS…

    編程 2025-04-29
  • Java Bean載入過程

    Java Bean載入過程涉及到類載入器、反射機制和Java虛擬機的執行過程。在本文中,將從這三個方面詳細闡述Java Bean載入的過程。 一、類載入器 類載入器是Java虛擬機…

    編程 2025-04-29
  • Java騰訊雲音視頻對接

    本文旨在從多個方面詳細闡述Java騰訊雲音視頻對接,提供完整的代碼示例。 一、騰訊雲音視頻介紹 騰訊雲音視頻服務(Cloud Tencent Real-Time Communica…

    編程 2025-04-29
  • Java Milvus SearchParam withoutFields用法介紹

    本文將詳細介紹Java Milvus SearchParam withoutFields的相關知識和用法。 一、什麼是Java Milvus SearchParam without…

    編程 2025-04-29
  • Java 8中某一周的周一

    Java 8是Java語言中的一個版本,於2014年3月18日發布。本文將從多個方面對Java 8中某一周的周一進行詳細的闡述。 一、數組處理 Java 8新特性之一是Stream…

    編程 2025-04-29
  • Java判斷字元串是否存在多個

    本文將從以下幾個方面詳細闡述如何使用Java判斷一個字元串中是否存在多個指定字元: 一、字元串遍歷 字元串是Java編程中非常重要的一種數據類型。要判斷字元串中是否存在多個指定字元…

    編程 2025-04-29
  • VSCode為什麼無法運行Java

    解答:VSCode無法運行Java是因為默認情況下,VSCode並沒有集成Java運行環境,需要手動添加Java運行環境或安裝相關插件才能實現Java代碼的編寫、調試和運行。 一、…

    編程 2025-04-29
  • Java任務下發回滾系統的設計與實現

    本文將介紹一個Java任務下發回滾系統的設計與實現。該系統可以用於執行複雜的任務,包括可回滾的任務,及時恢復任務失敗前的狀態。系統使用Java語言進行開發,可以支持多種類型的任務。…

    編程 2025-04-29
  • Harris角點檢測演算法原理與實現

    本文將從多個方面對Harris角點檢測演算法進行詳細的闡述,包括演算法原理、實現步驟、代碼實現等。 一、Harris角點檢測演算法原理 Harris角點檢測演算法是一種經典的計算機視覺演算法…

    編程 2025-04-29

發表回復

登錄後才能評論