Introduce DataBase,Asp.net,JavaScript,Xml,Html,Css,Sql,Php,ASP.NET Controls,AJAX,Tools,HTML,CSS,JavaScript,Open Source Project,WPF,.Net Framework,Linq
Top Recommended Hosting

Simple Example demonstrating MSMQ Message Routing

by the3factory 4/8/2008 6:03:00 AM

Introduction

Disconnected Enterprise Applications are difficult to design, especially when data is critical. There are several methodologies available but not all of them fit well. Some of the possible implementations may include Web Services, Message Queuing, Enterprise Service Bus (ESB) architecture and Custom Synchronization Strategy.

While working on one of our projects we needed to find the optimum solution for Distributed Architecture based application. The project required information exchange over several offices spread across multiple geographical areas . So in this article I am explaining the methodology we adopted to make it simpler and get the work done.

Problem

The setup involved multiple integration points with various offices, franchisees, third party independent contractors and their customers who all needed to access data at various levels of security. So we needed a reliable mechanism for sharing the details across geographical limits.

One of the well known approaches to integrate applications with disconnected nature is to use a messaging architecture. That's exactly what we chose. However using such architecture has its own pros and cons. Well that is a subject of another article.

Anyway, for our application, we wanted to provide reliable messaging as well as work in a highly disconnected infrastructure. We found message queues to provide us the answer, however we still needed to solve a few annoyances like how do we forward a message from one queue to another? What if we want to forward based on content of the message.

So we decided, hey why not hack your way to a quick utility that does this for us? Further, why not make it configurable and threaded?

So essentially we wanted to do the following:

    1. Route messages from one Message Queue to another Message Queue.
    2. Route message from one Message Queue to multiple Message Queues.
    3. Routing on the basis of Text inside the Messages.

      First utility must send all the messages from one public queue to another public queue directly and second utility to send the messages from a Message Queue to multiple Message Queues and the third one – a Content Based Router which can distribute the messages from one queue to another on the basis of what is inside the Messages.

      We also wanted to check the activities of application so we need to log it's working. We also wanted to make the code adaptable. We wanted to be able to change the path of the message queues from outside the application using Extensible Markup Language (XML) Configuration files. A quick hack was to use in-built Serialization of .NET framework.

      Oh, did I forgot to mention we wanted this to run on our new quad core server. To achieve parallelism and use benefits of threaded applications we needed to make our application threaded as well as Thread Safe so that messages can be transfered from multiple queues simultaneously without interfering with another queues in processing.

      Now, you may ask, why go through so much trouble when a simple code can do the trick. Well, we also needed to monitor the message queue activities to know what's going on. The operations performed need also to be shown on a Graphical User Interface (GUI) which will be discussed in upcoming article. The implementation of these forms the basis for employing a monitoring utility that we will write on top of this in next article.

      Solution

      First of all, we needed a configuration class to hold the values that we can pass to the forwarder. Here's a simple design:

      class diagram of MessageForwarderConfiguration

      We started creating Forwarder utility which simply forwards the messages from one message queue to another without looking inside the messages. For this we created a class that received the path of message queues from XMLSerialized file in the constructor.

      StreamReader strRd = new StreamReader(xmlfile);
              MessageForwarderConfiguration queue =(MessageForwarderConfiguration)MessageForwarderConfiguration.Serializer.Deserialize(strRd);
              MessageForwardingWorker fwdWorker = new MessageForwardingWorker(queue);
              fwdWorker.RunWorkerAsync();
              

      All the processing is done on a separate thread so that the application can manage interaction between multiple queues at the same time. To achieve this we inherited BackgroundWorker class (System.ComponentModel.BackgroungWorker) so that we can perform our operations on multiple threads.

      using System.ComponentModel;
              public class MessageForwardingWorker : BackgroundWorker
              {
              ....
              }
              

      Then OnDoWork() method of BackgroungWorker class is overloaded in MessageForwardingWorker class to create respective objects of Message Queue Class - one for receiving the messages and another for sending the messages.

      // constructor 
              public MessageForwardingWorker(MessageForwarderConfiguration objForwardQueue)
              {
              queueInfo_ = objForwardQueue;
              }
              // overloaded method of BackgroundWorker
              protected override void OnDoWork(System.ComponentModel.DoWorkEventArgs e)
              {
              while (!this.CancellationPending)
              {
              MessageQueue srcqueue = new MessageQueue(queueInfo_.SourceQueue);
              MessageQueue destqueue = new MessageQueue(queueInfo_.DestinationQueue);
              ...
              }
              }
              

      Then we determined the existence of source and destination queues along with their read, write access.

      if (MessageQueue.Exists(queueInfo_.SourceQueue))
              {
              if (MessageQueue.Exists(queueInfo_.DestinationQueue))
              {
              if (srcqueue.CanRead)
              {
              if (destqueue.CanWrite)
              {
              // do the actual work here
              ....
              }
              }
              }
              }
              

      We start receiving the messages from Source Queue dynamically using GetEnumerator2() method which provides a dynamic link to the queue which we are referencing. After picking the message from the source queue the message is sent to the destination queue.

      MessageEnumerator msgEnum = srcqueue.GetMessageEnumerator2();
              while (msgEnum.MoveNext())
              {
              // iterating on each message
              msgToSend = msgEnum.Current;
              id = msgToSend.Id;
              destqueue.Send(msgToSend);
              // removing the message after sending to destination queue
              srcqueue.ReceiveById(id);
              }
              

      Note :- To get Quality Of Service (QOS) we receive the message and after sending the message to destination queue we remove the message from source queue. This provides us the security that message is successfully transferred else if we would have removed the message from queue before actually sending it to destination queue then it could have resulted in Consistency problems.

      We also wanted to make sure that transactional destination queues were handled correctly.

      while (msgEnum.MoveNext())
              {
              // iterating on messages and using transactions
              msgToSend = msgEnum.Current;
              id = msgToSend.Id;
              if (isTransactional) destqueue.Send(msgToSend, mqtr);
              else destqueue.Send(msgToSend);
              srcqueue.ReceiveById(id);
              }
              if (isTransactional) mqtr.Commit();
              

      Further, how will we know what's going on? Well, we logged all operations in text file using LOG4NET – An Apache Project (http://logging.apache.org/log4net/).

      private log4net.ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
              public void doLog(string addToLog)
              {
              ...
              log.Info(addToLog);
              }
              

      When writing threaded applications, one thing you must keep in mind. Make the threads efficient, not hogging the system completely. In our implementation, when a forwarder transferred all of it's messages then it will wait for a predefined time interval specified in XML Configuration file and when the thread comes out of wait period it again starts performing the same operations.

      Thread.Sleep(queueInfo_.SleepTime);
              

      The system can be enhanced to allow forwarding to multiple destinations by simply implementing a list of destination queues or as an aggregator that combines messages from multiple source queues to a single destination.

      Enhancing Message Forwarder to Content based Message Router

      The second utility is just a refinement of this Message Forwarder utility which sends the message to more than one Destination Queue by looking inside the message. Technically we can implement the Router using just this utility.

      The next step is to implement an upgraded version of Forwarder utility facilitating content based message routing.We wanted our application to be able to distribute the messages among various queues on the basis of the content inside the message.

      A reliable and efficient method is needed for matching the messages inside the Source Queues so that they can be sent to appropriate destination Queue. We used the label property associated with each message to identify the messages when they are actually stored in Source Queues. Thus, we only have to match the label of each message inspite of matching the whole message body and without descrambling the message. For matching labels we preferred to use Regular Expressions (also known as RegEx) due to their powerful matching capabilities.

      class diagram of MessageRouterConfiguration

      We modified the configuration class for our needs as follows:

      using System.ComponentModel;
              public class MessageRoutingWorker : BackgroundWorker
              {
              MessageRouterConfiguration queueInfo_;
              Dictionary<Regex, List<MessageQueue>> dictRegexMsgQList;
              MessageQueue srcQueue;
              ...
              }
              

      To achieve parallelism in the application, messages would be read from more than one Source queue so it will also be a Threaded as well as Thread Safe application where each thread is performing independently.

      For matching the Label of messages, we created a dictionary of Regex and List of Message Queues where for each matched regex there is a list of Message Queues on which the message needs to be sent.

      Dictionary<Regex, List<MessageQueue>> dictRegexMsgQList;
              

      As before, we initialize our class with the XML configuration and initialize our dictionary with the matching pattern and path of Source and Destination Queues corrosponding to it.

      Collapse
      // Constructor
              public MessageRoutingWorker(MessageRouterConfiguration objRouteQueue)
              {
              // Set the internal configuration object
              queueInfo_ = objRouteQueue;
              // Create the source queue
              srcQueue = new MessageQueue(queueInfo_.SourceQueue);
              // Now call the helper to create destination and the relevant regex
              doDictionaryInitializer();
              }
              private void doDictionaryInitializer()
              {
              dictRegexMsgQList = new Dictionary<Regex, List<MessageQueue>>();
              // We create a simple pair structure for holding two values.
              foreach (Pair pair in queueInfo_.DestQueueList)
              {
              // Create an expression we want to match and associated destination
              // queues in a list.
              Regex regexpr = new Regex(pair.MatchExpression);
              List<MessageQueue> listMQ = new List<MessageQueue>();
              foreach (string str in pair.DestQueueList)
              {
              listMQ.Add(new MessageQueue(str));
              }
              // Now add these to our internal dictionary object
              dictRegexMsgQList.Add(regexpr, listMQ);
              }
              }
              

      At this point we had the following members in each thread:

      1. Path of Source Queue
      2. Sets of Match Expression and Destination Queues where the message needs to be sent

      The constructor initialized all the message queues and regular expressions as defined in configuration. At this point we fire the threads to run and do their work.

      StreamReader strRd = new StreamReader(xmlfile);
              MessageRouterConfiguration queue = MessageRouterConfiguration)MessageRouterConfiguration.Serializer.Deserialize(strRd);
              MessageRoutingWorker routingWorker = new MessageRoutingWorker(queue);
              fwdWorker.RunWorkerAsync();
              

      In each thread the existence of Source queues along with their read permissions are checked and their status is logged in the Text file using LOG4NET – An Apache Project (http://logging.apache.org/log4net/ ).

      // preparing the dictionary
              private void doDictionaryInitializer()
              {
              dictRegexMsgQList = new Dictionary<Regex, List<MessageQueue>>();
              // We create a simple pair structure for holding two values.
              foreach (Pair pair in queueInfo_.DestQueueList)
              {
              Regex regexpr = new Regex(pair.MatchExpression);
              List<MessageQueue> listMQ = new List<MessageQueue>();
              foreach (string str in pair.DestQueueList)
              {
              listMQ.Add(new MessageQueue(str));
              }
              dictRegexMsgQList.Add(regexpr, listMQ);
              }
              }
              

      To actually route the messages, we create an instance of MessageEnumerator class using GetMessageEnumerator2() method to enumerate on the messages in the queue. Now traverse over each message and match each regex available in the dictionary with the label of the message. When a match is found, send the message to the corresponding list of Message Queues. Before actually sending the message to specified Message Queues the availability and write permission on these Queues are checked and logged in text file.

      MessageEnumerator msgEnum = srcQueue.GetMessageEnumerator2();
              Message msgToSend = null;
              while (msgEnum.MoveNext())
              {
              // iterating on messages
              msgToSend = msgEnum.Current;
              string id = msgToSend.Id;
              foreach (Regex regex in dictRegexMsgQList.Keys)
              {
              // matching the regex
              if (regex.IsMatch(msgToSend.Label))
              {
              List<MessageQueue> destinationList = dictRegexMsgQList[regex];
              if (doCheckAndSend(msgToSend, destinationList))
              // if message is sent then remove it from queue
              srcQueue.ReceiveById(id);
              }
              }
              }
              

      The log is simply implemented as a helper function as follows:

      private log4net.ILog log =log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
              public void doLog(string addToLog)
              {
              ...
              log.Info(addToLog);
              }
              

      The method doCheckAndSend checks the existence and write permissions of Destination Queue and if all is well then sends the message (present in msgToSend) to the List of Destination Queues.

      Collapse
      private bool doCheckAndSend(Message msg, List<MessageQueue> listDestQueue)
              {
              bool sendSucceded = false;
              // checking properties of destination queues
              foreach (MessageQueue destQueue in listDestQueue)
              {
              // logging the activities of application
              doLog("Checking Existence Of Destination Queue [" + destQueue.Path + "]");
              if (MessageQueue.Exists(destQueue.Path))
              if (destQueue.CanWrite)
              ...
              else ...
              else ...
              }
              foreach (MessageQueue destQueue in listDestQueue)
              {
              bool queueTransactional = destQueue.Transactional;
              MessageQueueTransaction mqTr = new MessageQueueTransaction();
              if (queueTransactional)  ...
              else ...
              if (queueTransactional) mqTr.Begin();
              if (queueTransactional) destQueue.Send(msg, mqTr);
              else destQueue.Send(msg);
              if (queueTransactional) mqTr.Commit();
              sendSucceded = true;
              }
              return sendSucceded;
              }
              

      Note :- To get Quality Of Service (QOS) we receive the message and after sending the message to destination queue we remove the message from source queue. This provides us the security that message is successfully transferred else if we would have removed the message from queue before actually sending it to destination queue then it could have resulted in Consistency problems.

      When a thread transferred all of it's messages then it will wait for a predefined time specified in XML Configuration file. And when the Thread comes out of wait period it again starts performing routing operations.

      Thread.Sleep(queueInfo_.SleepTime);
              

      Conclusion

      With theses utilities, we now have the ability to transfer the messages from any queue (source) to any queue (local or remote destination). We can aggregate as well as distribute messages based on it's contents. We have the ability to deploy any number of message queues in the current as well as future scenarios. This also provides us with the flexibility to re-route messages in the event of server issues or if we need to do load balancing for high volume queues.

      Most importantly we can now stay connected in disconnected architecture.

      Related posts

      Sign up for PayPal and start accepting credit card payments instantly.


      Powered by BlogEngine.NET 1.2.0.0