本文目錄一覽:
- 1、java 監聽mq消息 底層是用線程實現的嗎
- 2、java怎麼將mq接收的文件消息提取出來
- 3、MQ Java Client 方式和MQ Java Binding方式的區別
- 4、mq java 怎麼判斷隊列為空
- 5、java 怎麼樣調用IBM MQ 或者通信問題
- 6、java事務中調用mq,如果事務回滾,消息會被撤回嗎
java 監聽mq消息 底層是用線程實現的嗎
不是通過線程實現的,它是通過一種註冊–通知機制實現的。在java的設計模式中,有一種模式叫:觀察者模式,和這個類似。舉個例子,本例子是一個簡單的監聽當數據發生變化時要做的操作。
1,我們先定義一個接口,可以讓多個監聽者實現pre t=”code” l=”java”public interface IDataListen {
public void update(Object event,Object msg);
}2,實現一監聽者
pre t=”code” l=”java”public class DataListen implements IDataListen{
@Override
public void update(Object event, Object arg) {
// TODO Auto-generated method stub
System.out.println(“數據發生了變化”);
}
}3,被監聽者
pre t=”code” l=”java”public class DataManager{
private ListIDataListen listenList = new ArrayList();
public void notifyListen(Object event,Object msg){
for(IDataListen dataListen : listenList){
dataListen.update(null, null);
}
}
public void addListen(IDataListen dataListen){
listenList.add(dataListen);
}
public void updateData(Object msg){
this.notifyListen(null, msg);
}
public static void main(String[] args) {
DataManager dataManager = new DataManager();
IDataListen dataListen1 = new DataListen();
IDataListen dataListen2 = new DataListen();
dataManager.addListen(dataListen1);
dataManager.addListen(dataListen2);
dataManager.updateData(“aaa”);
}
}main方法裡面是監聽的應用。這樣就可以監聽DataManager中的updateData行為了,當有數據發生變化時,就可以即時被監聽者收到。
java怎麼將mq接收的文件消息提取出來
WebSphere MQ 接收發送
添加mq jar
類介紹:
SendMSG:消息發送類。
Main():主方法。
SendMSG():消息發送方法。
方法描述:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package test;
public class SendMSG{
MQEnvironment.hostname = “192.168.10.201”;
//通道類型為服務器連接通道
MQEnvironment.channel = “tongdao”;
MQEnvironment.CCSID = 1381;
//消息隊列端口號
MQEnvironment.port = 10618;
try{
//建立隊列管理器QM_SERVER為隊列管理器名稱
MQQueueManager qMgr = new MQQueueManager(“test”);
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF|MQC.MQOO_OUTPUTMQC.MQOO_INQUIRE;//建立隊列INITQ隊列名稱INITQ為本地隊列
MQQueue queue = qMgr.accessQueue(“wanghui”,openOptions,null,null,null);
System.out.println(“成功建立通道”);
MQMessage message = new MQMessage();
message.format = MQC.MQFMT_STRING;
message.characterSet = 1381;
message.writeString(“王輝”);
message.expiry = -1;//設置消息用不過期
queue.put(message);//將消息放入隊列
queue.close();//關閉隊列
qMgr.disconnect();//斷開連接
}catch(EOFExceptione){
e.printStackTrace();
}catch(MQExceptione){
e.printStackTrace();
}catch(Exceptione){
e.printStackTrace();
}
}
ReceiveMSG:消息接收類。
Main():主方法。
ReceiveMSG():消息接收方法。
public class ReceiveMSG {
MQEnvironment.hostname=”192.168.10.201″;//通道類型為服務器連接通道
MQEnvironment.channel=”tongdao”;
MQEnvironment.CCSID=1381;
MQEnvironment.port=10618;
try{
//建立隊列管理器QM_SERVER為隊列管理器名稱
MQQueueManager qMgr = new MQQueueManager(“test”);
int openOptions=MQC.MQOO_INPUT_AS_Q_DEF|MQC.MQOO_OUTPUT|MQC.MQOO_INQUIRE;//建立隊列INITQ隊列名稱INITQ為本地隊列
MQQueue queue=qMgr.accessQueue(“wanghui”,openOptions,null,null,null);
System.out.println(“成功建立通道”);
MQMessage message= new MQMessage();
message.format=MQC.MQFMT_STRING;
message.characterSet=1381;
//從隊列中獲取消息
MQGetMessage Optionspmo=new MQGetMessageOptions();
queue.get(message,pmo);
Stringchars=message.readLine();
System.out.println(chars);
queue.close();//關閉隊列
qMgr.disconnect();//斷開連接
}catch(EOFExceptione){
e.printStackTrace();
}catch(MQExceptione){
e.printStackTrace();
}catch(Exceptione){
e.printStackTrace();
}
}
MQ Java Client 方式和MQ Java Binding方式的區別
MQ Java Binding方式使用JNI(Java Native Interface)類似於MQ 服務器應用程序。
MQSeries Java客戶機服務器連接最快的方式是MQ Java Binding方式,這種方式要求MQ Java應用和MQ Server在同一台機器上。使用MQ Java Binding方式避免了建立網絡連接的開銷,因此,當連接對性能影響很大時,應當選用MQ Java Binding方式。
MQ Java Client方式通過Server端定義的服務器連接通道連接,服務器方需要啟動偵聽程序。MQ Java Client方式用於Java客戶程序和服務器不在同一台機器時進行連接。
客戶端連接,建立MQEnvironment類
MQEnvironment.hostname
以下是,客戶端連接例子
// ===========================================================================
//
// Licensed Materials – Property of IBM
//
// 5639-C34
//
// (c) Copyright IBM Corp. 1995,1999
//
// ===========================================================================
// WebSphere MQ M’z Java f sample applet
//
// This sample runs as an applet using the appletviewer and HTML file,
// using the command :-
// appletviewer MQSample.html
// Output is to the command line, NOT the applet viewer window.
//
// Note. If you receive WebSphere MQ error 2 reason 2059 and you are sure your
// WebSphere MQ and TCP/IPsetup is correct,
// you should click on the “Applet” selection in the Applet viewer window
// select properties, and change “Network access” to unrestricted.
import com.ibm.mq.*; // Include the WebSphere MQ classes for Java package
public class MQSample extends java.applet.Applet
{
private String hostname = “your_hostname”; // define the name of your
// host to connect to
private String channel = “server_channel”; // define name of channel
// for client to use
// Note. assumes WebSphere MQ Server
// is listening on the default
// TCP/IPport of 1414
private String qManager = “your_Q_manager”; // define name of queue
// manager object to
// connect to.
private MQQueueManager qMgr; // define a queue manager object
// When the class is called, this initialization is done first.
public void init()
{
// Set up WebSphere MQ environment
MQEnvironment.hostname = hostname; // Could have put the
// hostname channel
MQEnvironment.channel = channel; // string directly here!
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,//Set TCP/IPor server
MQC.TRANSPORT_MQSERIES);//Connection
} // end of init
public void start()
{
try {
// Create a connection to the queue manager
qMgr = new MQQueueManager(qManager);
// Set up the options on the queue we wish to open…
// Note. All WebSphere MQ Options are prefixed with MQC in Java.
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF |
MQC.MQOO_OUTPUT ;
// Now specify the queue that we wish to open, and the open options…
MQQueue system_default_local_queue =
qMgr.accessQueue(“SYSTEM.DEFAULT.LOCAL.QUEUE”,
openOptions);
// Define a simple WebSphere MQ message, and write some text in UTF format..
MQMessage hello_world = new MQMessage();
hello_world.writeUTF(“Hello World!”);
// specify the message options…
MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the defaults,
// same as
// MQPMO_DEFAULT
// constant
// put the message on the queue
system_default_local_queue.put(hello_world,pmo);
// get the message back again…
// First define WebSphere MQ message buffer to receive the message into..
MQMessage retrievedMessage = new MQMessage();
retrievedMessage.messageId = hello_world.messageId;
// Set the get message options..
MQGetMessageOptions gmo = new MQGetMessageOptions(); // accept the defaults
// same as
// MQGMO_DEFAULT
// get the message off the queue..
system_default_local_queue.get(retrievedMessage, gmo);
// And prove we have the message by displaying the UTF message text
String msgText = retrievedMessage.readUTF();
System.out.println(“The message is: ” + msgText);
// Close the queue
system_default_local_queue.close();
// Disconnect from the queue manager
qMgr.disconnect();
}
// If an error has occurred in the above, try to identify what went wrong.
// Was it WebSphere MQ error?
1. WebSphere MQ classes for Java } applet (2/3)
}zk
62 WebSphere MQ 9C Java
}
TBzkN];vr%DCLr,|9Cs(==:
1. ,S=SP\mw
2. +{“Ek SYSTEM.DEFAULT.LOCAL.QUEUE
3. YN!5XD{“
catch (MQException ex)
{
System.out.println(“WebSphere MQ error occurred : Completion code ” +
ex.completionCode +
” Reason code ” + ex.reasonCode);
}
// Was it a Java buffer space error?
catch (java.io.IOException ex)
{
System.out.println(“An error occurred whilst writing to the
message buffer: ” + ex);
}
} // end of start
} // end of sample
mq java 怎麼判斷隊列為空
MQException
該類包含WebSphere MQ 完成代碼和錯誤代碼常量的定義。以MQCC_開始的常量是WebSphere MQ 完成代碼,而以MQRC_開始的常量則是WebSphere MQ 原因代碼。只要出現WebSphere MQ
錯誤,就會給出MQException。
MQGetMessageOptions
該類包含控制MQQueue.get()方法行為的選項。
MQManagedObject
該類是MQQueueManager、MQQueue 和MQProcess 類的超類。它提供查詢並設置這些資源屬性的能力。
——解決方案——————–
去取一次,得到 2033 錯誤就是沒有消息符合你的條件。
使用 PCF 查詢隊列資料:
/**
* @return current depth of queue connected currently.
* @throws Exception
*/
public QueueInfo queryQueueInfo() throws Exception {
if (!checkStatus2(this.queueManager)) {
throw new IllegalStateException(“Not Connected to queue manager.”);
}
PCFMessageAgent agent = null;
try {
agent = new PCFMessageAgent(this.queueManager);
// Inquiry Queue Name Current Depth.
int[] attrs = {
CMQC.MQCA_Q_NAME, CMQC.MQIA_CURRENT_Q_DEPTH,
CMQC.MQIA_OPEN_INPUT_COUNT, CMQC.MQIA_OPEN_OUTPUT_COUNT,
CMQC.MQIA_Q_TYPE, CMQC.MQIA_DEFINITION_TYPE, CMQC.MQIA_INHIBIT_GET,
CMQC.MQIA_INHIBIT_PUT };
PCFParameter[] parameters = {
new MQCFST(CMQC.MQCA_Q_NAME , getInputQueue().getText().trim()),
new MQCFIL(CMQCFC.MQIACF_Q_ATTRS , attrs) };
// logger.log(“Querying current depth of current queue.”);
MQMessage[] responses = agent.send(CMQCFC.MQCMD_INQUIRE_Q, parameters);
QueueInfo info = new QueueInfo();
for (int i = 0; i responses.length; i++) {
MQCFH cfh = new MQCFH(responses[i]);
// Check the PCF header (MQCFH) in the response message
if (cfh.reason == 0) {
String name = “”;
Integer depth = new Integer(0);
for (int j = 0; j cfh.parameterCount; j++) { // Extract what we want from the returned attributes
PCFParameter p = PCFParameter.nextParameter(responses[i]);
switch (p.getParameter()) {
case CMQC.MQCA_Q_NAME:
name = (String) p.getValue();
info.name = name;
break;
case CMQC.MQIA_CURRENT_Q_DEPTH:
depth = (Integer) p.getValue();
info.depth = depth.intValue();
break;
case CMQC.MQIA_OPEN_INPUT_COUNT:
Integer inputCount = (Integer) p.getValue();
info.inputCount = inputCount.intValue();
break;
case CMQC.MQIA_OPEN_OUTPUT_COUNT:
Integer outputCount = (Integer) p.getValue();
info.outputCount = outputCount.intValue();
break;
case CMQC.MQIA_Q_TYPE:
info.type = ((Integer) p.getValue()).intValue();
break;
case CMQC.MQIA_DEFINITION_TYPE:
info.definitionType = ((Integer) p.getValue()).intValue();
break;
case CMQC.MQIA_INHIBIT_PUT:
info.putNotAllowed = ((Integer) p.getValue()).intValue() == 1;
break; case CMQC.MQIA_INHIBIT_GET:
info.getNotAllowed = ((Integer) p.getValue()).intValue() == 1;
default:
}
}
// System.out.println(“Queue ” + name + ” curdepth ” + depth);
return info;
} else {
System.out.println(“PCF error:\n” + cfh);
// Walk through the returned parameters describing the error
for (int j = 0; j cfh.parameterCount; j++) {
System.out.println(PCFParameter.nextParameter(responses[0]));
}
throw new Exception(“PCF Error [reason :” + cfh.reason + “]”);
}
}
return null;
} catch (Exception e) {
throw e;
} finally {
if (agent != null) {
try {
agent.disconnect();
} catch (Exception e) {
logger.log(e);
}
}
}
java 怎麼樣調用IBM MQ 或者通信問題
websphere mq : 用於傳輸信息 具有跨平台的功能。
1 安裝websphere mq 並啟動
2 websphere mq 建立 queue Manager (如:MQSI_SAMPLE_QM)
3 建立queue 類型選擇 Local類型 的 (如lq )
3 建立channels 類型選擇Server Connection (如BridgeChannel)
java 代碼如下:
package test.mq;
import com.ibm.mq.*;
/*
* 成功的訪問mq 的java 類
*/
public class FirstMqTest {
// public static void main(String[] args[]){
// FirstMqTest first = new FirstMqTest();
// first.test();
// }
public static void main(String args[]){
FirstMqTest first = new FirstMqTest();
first.test();
}
public void test(){
String qManager = “MQSI_SAMPLE_QM”; //QueueManager name
String qName = “lq”;//Queue Name
try {
//configure connection parameters
MQEnvironment.hostname=”172.16.17.123″;//MQ Server name or IP
//MQEnvironment.port=1414;//listenr port
MQEnvironment.channel=”BridgeChannel”;//Server-Connection Channel
MQEnvironment.CCSID =1381;
// Create a connection to the QueueManager
System.out.println(“Connecting to queue manager: “+qManager);
MQQueueManager qMgr = new MQQueueManager(qManager);
// Set up the options on the queue we wish to open
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;
// Now specify the queue that we wish to open and the open options
System.out.println(“Accessing queue: “+qName);
MQQueue queue = qMgr.accessQueue(qName, openOptions);
// Define a simple WebSphere MQ Message …
MQMessage msg = new MQMessage();
// … and write some text in UTF8 format
msg.writeUTF(“Hello, World!”);
// Specify the default put message options
MQPutMessageOptions pmo = new MQPutMessageOptions();
// Put the message to the queue
System.out.println(“Sending a message…”);
/*
* 在此測試一下 mq 的傳輸次列
*
*/
for(int j=0;j 5;j++){
String str =”test11111111111″;
str = str+j;
msg.writeUTF(str);
queue.put(msg, pmo);
}
queue.put(msg, pmo);
// Now get the message back again. First define a WebSphere MQ message
// to receive the data
MQMessage rcvMessage = new MQMessage();
// Specify default get message options
MQGetMessageOptions gmo = new MQGetMessageOptions();
// Get the message off the queue.
System.out.println(“…and getting the message back again”);
queue.get(rcvMessage, gmo);
// And display the message text…
String msgText = rcvMessage.readUTF();
System.out.println(“The message is: ” + msgText);
// Close the queue
System.out.println(“Closing the queue”);
queue.close();
// Disconnect from the QueueManager
System.out.println(“Disconnecting from the Queue Manager”);
qMgr.disconnect();
System.out.println(“Done!”);
}
catch (MQException ex) {
System.out.println(“A WebSphere MQ Error occured : Completion Code “
+ ex.completionCode + ” Reason Code ” + ex.reasonCode);
}
catch (java.io.IOException ex) {
System.out.println(“An IOException occured whilst writing to the message buffer: “
+ ex);
}
}
}
java事務中調用mq,如果事務回滾,消息會被撤回嗎
回。java事務中調用mq是公司開發的平台程序,程序內設置了撤回語言,如果事務回滾消息會被撤回處理,Java的事務處理,如果對數據庫進行多次操作,每一次的執行或步驟都是一個事務。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/291711.html