Microsoft Message Queuing或MSMQ微软公司实现的一种消息队列,始于Windows NT 4与Windows 95。Windows Server 2016与Windows 10仍然包含这种组件。1999年起,Microsoft Embedded平台以及Windows CE 3.0也开始支持这一组件。[1]
MSMQ作为一种消息协议,允许多服务器/多进程通信,即使不总是保持互联。而sockets与其他网络协议要求直连总是成立。
MSMQ从1997年开始可用[2]。
MSMQ是可靠分发消息。分发失败的消息保存在队列中直到目标可达时重发该消息。还支持安全与优先级的消息机制。可以创建死信队列(英语:Dead letter queue)用于调试。
MSMQ支持可持续性与不可持续性消息,使得性能与消息是否写到磁盘的一致性上可以权衡。不可持续性消息只能用于向非事务性队列发送快速消息。
MSMQ支持事务处理。允许多个动作发给多个队列中包装为单个事务。微软分布式事务协调器 (MSDTC) 支持对MSMQ或其他资源的事务访问。
MSMQ使用下述端口:
C#例子:
usingSystem;usingSystem.Collections.Generic;usingSystem.Linq;usingSystem.Messaging;usingSystem.Text;usingSystem.Threading.Tasks;namespaceTest{publicclassQueueManger{/// <summary>/// 创建MSMQ队列/// </summary>/// <param name="queuePath">队列路径</param>/// <param name="transactional">是否事务队列</param>publicstaticvoidCreatequeue(stringqueuePath,booltransactional=false){try{//判断队列是否存在if(!MessageQueue.Exists(queuePath)){MessageQueue.Create(queuePath);Console.WriteLine(queuePath+"已成功创建!");}else{Console.WriteLine(queuePath+"已经存在!");}}catch(MessageQueueExceptione){Console.WriteLine(e.Message);}}/// <summary>/// 删除队列/// </summary>/// <param name="queuePath"></param>publicstaticvoidDeletequeue(stringqueuePath){try{//判断队列是否存在if(MessageQueue.Exists(queuePath)){MessageQueue.Delete(@".\private$\myQueue");Console.WriteLine(queuePath+"已删除!");}else{Console.WriteLine(queuePath+"不存在!");}}catch(MessageQueueExceptione){Console.WriteLine(e.Message);}}/// <summary>/// 发送消息/// </summary>/// <typeparam name="T">用户数据类型</typeparam>/// <param name="target">用户数据</param>/// <param name="queuePath">队列名称</param>/// <param name="tran"></param>/// <returns></returns>publicstaticboolSendMessage<T>(Ttarget,stringqueuePath,MessageQueueTransactiontran=null){try{//连接到本地的队列MessageQueuemyQueue=newMessageQueue(queuePath);System.Messaging.MessagemyMessage=newSystem.Messaging.Message();myMessage.Body=target;myMessage.Formatter=newXmlMessageFormatter(newType[]{typeof(T)});//发送消息到队列中if(tran==null){myQueue.Send(myMessage);}else{myQueue.Send(myMessage,tran);}Console.WriteLine("消息已成功发送到"+queuePath+"队列!");returntrue;}catch(ArgumentExceptione){Console.WriteLine(e.Message);returnfalse;}}/// <summary>/// 接收消息/// </summary>/// <typeparam name="T">用户的数据类型</typeparam>/// <param name="queuePath">消息路径</param>/// <returns>用户填充在消息当中的数据</returns>publicstaticTReceiveMessage<T>(stringqueuePath,MessageQueueTransactiontran=null){//连接到本地队列MessageQueuemyQueue=newMessageQueue(queuePath);myQueue.Formatter=newXmlMessageFormatter(newType[]{typeof(T)});try{//从队列中接收消息System.Messaging.MessagemyMessage=tran==null?myQueue.Receive():myQueue.Receive(tran);return(T)myMessage.Body;//获取消息的内容}catch(MessageQueueExceptione){Console.WriteLine(e.Message);}catch(InvalidCastExceptione){Console.WriteLine(e.Message);}returndefault(T);}/// <summary>/// 采用Peek方法接收消息/// </summary>/// <typeparam name="T">用户数据类型</typeparam>/// <param name="queuePath">队列路径</param>/// <returns>用户数据</returns>publicstaticTReceiveMessageByPeek<T>(stringqueuePath){//连接到本地队列MessageQueuemyQueue=newMessageQueue(queuePath);myQueue.Formatter=newXmlMessageFormatter(newType[]{typeof(T)});try{//从队列中接收消息System.Messaging.MessagemyMessage=myQueue.Peek();return(T)myMessage.Body;//获取消息的内容}catch(MessageQueueExceptione){Console.WriteLine(e.Message);}catch(InvalidCastExceptione){Console.WriteLine(e.Message);}returndefault(T);}/// <summary>/// 获取队列中的所有消息/// </summary>/// <typeparam name="T">用户数据类型</typeparam>/// <param name="queuePath">队列路径</param>/// <returns>用户数据集合</returns>publicstaticList<T>GetAllMessage<T>(stringqueuePath){MessageQueuemyQueue=newMessageQueue(queuePath);myQueue.Formatter=newXmlMessageFormatter(newType[]{typeof(T)});try{Message[]msgArr=myQueue.GetAllMessages();List<T>list=newList<T>();msgArr.ToList().ForEach((o)=>{list.Add((T)o.Body);});returnlist;}catch(Exceptione){Console.WriteLine(e.Message);}returnnull;}}}namespaceTest{publicclassStudent{/// <summary>/// 年龄/// </summary>publicintAge{get;set;}/// <summary>/// 姓名/// </summary>publicstringName{get;set;}}}namespaceTest{classProgram{staticvoidMain(string[]args){stringqueuepath=@".\private$\myQueue";//QueueManger.Createqueue(queuepath);//Student stu = new Student() { Name="shaoshun",Age=18};//QueueManger.SendMessage<Student>(stu, queuepath);//Student stu= QueueManger.ReceiveMessageByPeek<Student>(queuepath);//Student stu = QueueManger.ReceiveMessage<Student>(queuepath);//Console.WriteLine(stu.Name);QueueManger.Deletequeue(queuepath);QueueManger.Createqueue(queuepath);MessageQueueTransactiontran=newMessageQueueTransaction();tran.Begin();try{Studentstu;for(inti=0;i<4;i++){stu=newStudent(){Name="shaoshun"+i,Age=i};QueueManger.SendMessage<Student>(stu,queuepath,tran);if(i==3){thrownewException();}}tran.Commit();}catch{tran.Abort();}Console.ReadKey();}}}
C语言调用Windows API例子:
#include"windows.h"#include"mq.h"#include"tchar.h"#define BUFLEN = 256 ;HRESULTCreateMSMQQueue(LPWSTRwszPathName,PSECURITY_DESCRIPTORpSecurityDescriptor,LPWSTRwszOutFormatName,DWORD*pdwOutFormatNameLength){// Define the maximum number of queue properties.constintNUMBEROFPROPERTIES=2;// Define a queue property structure and the structures needed to initialize it.MQQUEUEPROPSQueueProps;MQPROPVARIANTaQueuePropVar[NUMBEROFPROPERTIES];QUEUEPROPIDaQueuePropId[NUMBEROFPROPERTIES];HRESULTaQueueStatus[NUMBEROFPROPERTIES];HRESULThr=MQ_OK;// Validate the input parameters.if(wszPathName==NULL||wszOutFormatName==NULL||pdwOutFormatNameLength==NULL){returnMQ_ERROR_INVALID_PARAMETER;}// Set queue properties.DWORDcPropId=0;aQueuePropId[cPropId]=PROPID_Q_PATHNAME;aQueuePropVar[cPropId].vt=VT_LPWSTR;aQueuePropVar[cPropId].pwszVal=wszPathName;cPropId++;WCHARwszLabel[MQ_MAX_Q_LABEL_LEN]=L"Test Queue";aQueuePropId[cPropId]=PROPID_Q_LABEL;aQueuePropVar[cPropId].vt=VT_LPWSTR;aQueuePropVar[cPropId].pwszVal=wszLabel;cPropId++;// Initialize the MQQUEUEPROPS structure.QueueProps.cProp=cPropId;// Number of propertiesQueueProps.aPropID=aQueuePropId;// IDs of the queue propertiesQueueProps.aPropVar=aQueuePropVar;// Values of the queue propertiesQueueProps.aStatus=aQueueStatus;// Pointer to the return status// Call MQCreateQueue to create the queue.WCHARwszFormatNameBuffer[BUFLEN];DWORDdwFormatNameBufferLength=BUFLEN;hr=MQCreateQueue(pSecurityDescriptor,// Security descriptor&QueueProps,// Address of queue property structurewszFormatNameBuffer,// Pointer to format name buffer&dwFormatNameBufferLength);// Pointer to receive the queue's format name length in Unicode characters not bytes.// Return the format name if the queue is created successfully.if(hr==MQ_OK||hr==MQ_INFORMATION_PROPERTY){if(*pdwOutFormatNameLength>=dwFormatNameBufferLength){wcsncpy_s(wszOutFormatName,*pdwOutFormatNameLength-1,wszFormatNameBuffer,_TRUNCATE);// ************************************// You must copy wszFormatNameBuffer into the// wszOutFormatName buffer.// ************************************wszOutFormatName[*pdwOutFormatNameLength-1]=L'\0';*pdwOutFormatNameLength=dwFormatNameBufferLength;}else{wprintf(L"The queue was created, but its format name cannot be returned.\n");}}returnhr;}