本文目錄一覽:
到底什麼是消息隊列?Java中如何實現消息隊列
消息隊列,顧名思義 首先是個隊列。
隊列的操作有入隊和出隊
也就是有一個程序在產生內容然後入隊(生產者)
另一個程序讀取內容,內容出隊(消費者)
這是最最基本的概念。
java中的消息隊列
消息隊列是線程間通訊的手段:
import java.util.*
public class MsgQueue{
private Vector queue = null;
public MsgQueue(){
queue = new Vector();
}
public synchronized void send(Object o)
{
queue.addElement(o);
}
public synchronized Object recv()
{
if(queue.size()==0)
return null;
Object o = queue.firstElement();
queue.removeElementAt(0);//or queue[0] = null can also work
return o;
}
}
因為java中是locked by object的所以添加synchronized 就可以用於線程同步鎖定對象
可以作為多線程處理多任務的存放task的隊列。他的client包括封裝好的task類以及thread類
在JAVA中怎麼實現消息隊列
如下:
public static String do_post(String url, ListNameValuePair name_value_pair) throws IOException {
String body = “{}”;
DefaultHttpClient httpclient = new DefaultHttpClient();
try {
HttpPost httpost = new HttpPost(url);
httpost.setEntity(new UrlEncodedFormEntity(name_value_pair, StandardCharsets.UTF_8));
HttpResponse response = httpclient.execute(httpost);
HttpEntity entity = response.getEntity();
body = EntityUtils.toString(entity);
} finally {
httpclient.getConnectionManager().shutdown();
}
return body;
}
public static String do_get(String url) throws ClientProtocolException, IOException {
String body = “{}”;
DefaultHttpClient httpclient = new DefaultHttpClient();
try {
HttpGet httpget = new HttpGet(url);
HttpResponse response = httpclient.execute(httpget);
HttpEntity entity = response.getEntity();
body = EntityUtils.toString(entity);
} finally {
httpclient.getConnectionManager().shutdown();
}
return body;
}
如何用JAVA實現Linux上的消息隊列功能
下面來說說如何用不用消息隊列來進行進程間的通信,消息隊列與命名管道有很多相似之處。有關命名管道的更多內容可以參閱我的另一篇文章:Linux進程間通信——使用命名管道
一、什麼是消息隊列
消息隊列提供了一種從一個進程向另一個進程發送一個數據塊的方法。 每個數據塊都被認為含有一個類型,接收進程可以獨立地接收含有不同類型的數據結構。我們可以通過發送消息來避免命名管道的同步和阻塞問題。但是消息隊列與命名管道一樣,每個數據塊都有一個最大長度的限制。
Linux用宏MSGMAX和MSGMNB來限制一條消息的最大長度和一個隊列的最大長度。
二、在Linux中使用消息隊列
Linux提供了一系列消息隊列的函數接口來讓我們方便地使用它來實現進程間的通信。它的用法與其他兩個System V PIC機制,即信號量和共享內存相似。
1、msgget函數
該函數用來創建和訪問一個消息隊列。它的原型為:
int msgget(key_t, key, int msgflg);
與其他的IPC機制一樣,程序必須提供一個鍵來命名某個特定的消息隊列。msgflg是一個權限標誌,表示消息隊列的訪問權限,它與文件的訪問權限一樣。msgflg可以與IPC_CREAT做或操作,表示當key所命名的消息隊列不存在時創建一個消息隊列,如果key所命名的消息隊列存在時,IPC_CREAT標誌會被忽略,而只返回一個標識符。
它返回一個以key命名的消息隊列的標識符(非零整數),失敗時返回-1.
2、msgsnd函數
該函數用來把消息添加到消息隊列中。它的原型為:
int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);
msgid是由msgget函數返回的消息隊列標識符。
msg_ptr是一個指向準備發送消息的指針,但是消息的數據結構卻有一定的要求,指針msg_ptr所指向的消息結構一定要是以一個長整型成員變量開始的結構體,接收函數將用這個成員來確定消息的類型。所以消息結構要定義成這樣:
struct my_message{
long int message_type;
/* The data you wish to transfer*/
};
msg_sz是msg_ptr指向的消息的長度,注意是消息的長度,而不是整個結構體的長度,也就是說msg_sz是不包括長整型消息類型成員變量的長度。
msgflg用於控制當前消息隊列滿或隊列消息到達系統範圍的限制時將要發生的事情。
如果調用成功,消息數據的一分副本將被放到消息隊列中,並返回0,失敗時返回-1.
3、msgrcv函數
該函數用來從一個消息隊列獲取消息,它的原型為
int msgrcv(int msgid, void *msg_ptr, size_t msg_st, long int msgtype, int msgflg);
msgid, msg_ptr, msg_st的作用也函數msgsnd函數的一樣。
msgtype可以實現一種簡單的接收優先級。如果msgtype為0,就獲取隊列中的第一個消息。如果它的值大於零,將獲取具有相同消息類型的第一個信息。如果它小於零,就獲取類型等於或小於msgtype的絕對值的第一個消息。
msgflg用於控制當隊列中沒有相應類型的消息可以接收時將發生的事情。
調用成功時,該函數返回放到接收緩存區中的字節數,消息被複制到由msg_ptr指向的用戶分配的緩存區中,然後刪除消息隊列中的對應消息。失敗時返回-1.
4、msgctl函數
該函數用來控制消息隊列,它與共享內存的shmctl函數相似,它的原型為:
int msgctl(int msgid, int command, struct msgid_ds *buf);
command是將要採取的動作,它可以取3個值,
IPC_STAT:把msgid_ds結構中的數據設置為消息隊列的當前關聯值,即用消息隊列的當前關聯值覆蓋msgid_ds的值。
IPC_SET:如果進程有足夠的權限,就把消息列隊的當前關聯值設置為msgid_ds結構中給出的值
IPC_RMID:刪除消息隊列
buf是指向msgid_ds結構的指針,它指向消息隊列模式和訪問權限的結構。msgid_ds結構至少包括以下成員:
struct msgid_ds
{
uid_t shm_perm.uid;
uid_t shm_perm.gid;
mode_t shm_perm.mode;
};
成功時返回0,失敗時返回-1.
三、使用消息隊列進行進程間通信
馬不停蹄,介紹完消息隊列的定義和可使用的接口之後,我們來看看它是怎麼讓進程進行通信的。由於可以讓不相關的進程進行行通信,所以我們在這裡將會編寫兩個程序,msgreceive和msgsned來表示接收和發送信息。根據正常的情況,我們允許兩個程序都可以創建消息,但只有接收者在接收完最後一個消息之後,它才把它刪除。
接收信息的程序源文件為msgreceive.c的源代碼為:
#include unistd.h
#include stdlib.h
#include stdio.h
#include string.h
#include errno.h
#include sys/msg.h
struct msg_st
{
long int msg_type;
char text[BUFSIZ];
};
int main()
{
int running = 1;
int msgid = -1;
struct msg_st data;
long int msgtype = 0; //注意1
//建立消息隊列
msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
if(msgid == -1)
{
fprintf(stderr, “msgget failed with error: %d\n”, errno);
exit(EXIT_FAILURE);
}
//從隊列中獲取消息,直到遇到end消息為止
while(running)
{
if(msgrcv(msgid, (void*)data, BUFSIZ, msgtype, 0) == -1)
{
fprintf(stderr, “msgrcv failed with errno: %d\n”, errno);
exit(EXIT_FAILURE);
}
printf(“You wrote: %s\n”,data.text);
//遇到end結束
if(strncmp(data.text, “end”, 3) == 0)
running = 0;
}
//刪除消息隊列
if(msgctl(msgid, IPC_RMID, 0) == -1)
{
fprintf(stderr, “msgctl(IPC_RMID) failed\n”);
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}
發送信息的程序的源文件msgsend.c的源代碼為:
#include unistd.h
#include stdlib.h
#include stdio.h
#include string.h
#include sys/msg.h
#include errno.h
#define MAX_TEXT 512
struct msg_st
{
long int msg_type;
char text[MAX_TEXT];
};
int main()
{
int running = 1;
struct msg_st data;
char buffer[BUFSIZ];
int msgid = -1;
//建立消息隊列
msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
if(msgid == -1)
{
fprintf(stderr, “msgget failed with error: %d\n”, errno);
exit(EXIT_FAILURE);
}
//向消息隊列中寫消息,直到寫入end
while(running)
{
//輸入數據
printf(“Enter some text: “);
fgets(buffer, BUFSIZ, stdin);
data.msg_type = 1; //注意2
strcpy(data.text, buffer);
//向隊列發送數據
if(msgsnd(msgid, (void*)data, MAX_TEXT, 0) == -1)
{
fprintf(stderr, “msgsnd failed\n”);
exit(EXIT_FAILURE);
}
//輸入end結束輸入
if(strncmp(buffer, “end”, 3) == 0)
running = 0;
sleep(1);
}
exit(EXIT_SUCCESS);
}
轉載僅供參考,版權屬於原作者。祝你愉快,滿意請採納哦
Java開發中消息隊列和rpc框架都是做什麼的?
一,消息隊列服務一般用於設計多系統之間的信息傳輸,一般這種傳輸不需要對方對數據做出回應。它最常見的方式是構建異步的生產者-消費者模式。我們在系統開發中,有些業務並不需要及時返回結果,我們可以把這些操作放到隊列中,然後另起一個消費者去處理它。比如日誌,數據庫異步更新。
二,rpc一般是用於服務器與服務器進程之間通信,這種通信有請求和應答。它是建立在底層的socket通信之上的。封裝為rpc之後,更加方便建立通信。就像在同一個進程中調用對方的方法一樣。它本地的方法名一般和請求到達的服務器的方法名一一對應。這樣可以更好的把模塊劃分。所以它是應對分布式而生的。比如一個網站,一開始可能所有的服務在一個進程中,但是隨着業務的增長,一個進程處理不過來,這時就需要把業務拆分成多個,分部到不同的機器上去。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/236593.html