Friday, December 3, 2010

MSMQ – Sending Messages to Remote Queues

The situation is simple – you want to send a message to a remote MSMQ. To do so you might write some code that looks something like:
1.using (var queue = new MessageQueue(@"FormatName:Direct=TCP:192.168.2.157\Private$\TheQueue"))
2.{  
3.var message = new Message(shipment);
4.message.Formatter = new BinaryMessageFormatter();
5.queue.Send(message, MessageQueueTransactionType.Single);   
6.}

This code works fine... most of the time. The problem is that, for remote queues, MSMQ sends messages in a "fire and forget" type of mode by default, i.e. if the message gets there, great! If not, oh damn well. This was not apparent to me at first. In workflow situations where you cannot let things "slip through the cracks" it is critical that you receive acknowledgement of delivery. For example, consider the following pseudo-code:

1.using (var transaction = new TransactionScope())
2.{
3.// Perform numerous database operations here
4. 
5.// Send message to remote queue here
6. 
7.transaction.Complete();
8.}

What if the send operation to the remote queue does not succeed? Well, in a workflow with state tracking, we absolutely must rollback all of the database operations and leave the system in the exact same state that existed before we attempted the send operation. The only way we can do this is to know whether the message arrived in the remote queue. Thankfully MSMQ provides a fairly easy way to accomplish this through the use of an Administrative Queue. The purpose of this queue is to provide a "callback" location for the target queue to send an acknowledgement message. This message is uniquely identified by its Message ID property. The high-level steps for guaranteed delivery are:

1) Create a message and define an Administrative Queue 
2) Send the message 
3) Wait a little 
4) Check the Administrative Queue for the message's unique ID
The code to do this is:
01.using (var queue = new MessageQueue(@"FormatName:Direct=TCP:192.168.2.157\Private$\TheQueue"))
02.{
03.var message = new Message(shipment);
04.message.Formatter = new BinaryMessageFormatter();
05.message.AdministrationQueue = new MessageQueue(@"FormatName:Direct=TCP:192.168.2.148\Private$\AdminQueue");
06.message.AcknowledgeType = AcknowledgeTypes.PositiveArrival;
07.queue.Send(message, MessageQueueTransactionType.Single);
08. 
09.Thread.Sleep(100);
10. 
11.bool acknowledged = ReceiveAcknowledgment(message.Id, @".\Private$\AdminQueue");
12. 
13.if (!acknowledged)
14.{
15.throw new InvalidOperationException("Acknowledgement was not received");
16.}
17.}
Yes, the above code uses a Thread.Sleep call. I realize that this is somewhat of an anti-pattern but it is called for here as you have no way of gauging network latency.  The best you can really do is supply a sleep time that is relative to your queue architecture, i.e. if your queues are on an Intranet then you can probably get away with 100 ms (or less).  If you’re using queues over the Internet then you will probably need a longer wait time.  This is the price you pay for guaranteed delivery acknowledgement – a small one in my opinion.
The "ReceiveAcknowledgment" helper method is defined as follows:
01.private static bool ReceiveAcknowledgment(string messageId, string queuePath)
02.{
03.var queue = new MessageQueue(queuePath);
04.queue.MessageReadPropertyFilter.CorrelationId = true;
05.queue.MessageReadPropertyFilter.Acknowledgment = true;
06. 
07.while (queue.PeekByCorrelationId(messageId) != null)
08.{
09.Message message = queue.ReceiveByCorrelationId(messageId);
10.return true;
11.}
12. 
13.return false;

No comments:

Post a Comment