001 /******************************************************************************
002 * Copyright (C) MActor Developers. All rights reserved. *
003 * ---------------------------------------------------------------------------*
004 * This file is part of MActor. *
005 * *
006 * MActor is free software; you can redistribute it and/or modify *
007 * it under the terms of the GNU General Public License as published by *
008 * the Free Software Foundation; either version 2 of the License, or *
009 * (at your option) any later version. *
010 * *
011 * MActor is distributed in the hope that it will be useful, *
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014 * GNU General Public License for more details. *
015 * *
016 * You should have received a copy of the GNU General Public License *
017 * along with MActor; if not, write to the Free Software *
018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019 ******************************************************************************/
020 package org.mactor.brokers.jms;
021
022 import java.util.HashMap;
023 import java.util.Hashtable;
024 import java.util.Map;
025
026 import javax.jms.Destination;
027 import javax.jms.JMSException;
028 import javax.jms.MessageListener;
029 import javax.jms.Queue;
030 import javax.jms.QueueConnection;
031 import javax.jms.QueueConnectionFactory;
032 import javax.jms.QueueReceiver;
033 import javax.jms.QueueSender;
034 import javax.jms.QueueSession;
035 import javax.jms.Session;
036 import javax.jms.TemporaryQueue;
037 import javax.jms.TextMessage;
038 import javax.naming.Context;
039 import javax.naming.InitialContext;
040 import javax.naming.NamingException;
041
042 import org.mactor.brokers.AbstractMessageBroker;
043 import org.mactor.brokers.Message;
044 import org.mactor.framework.MactorException;
045 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
046 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
047
048 /**
049 * A message broker for JMS (it should work with any JMS implementation - just
050 * add the client JMS libraries from the particular JMS product to the MActor
051 * class path)
052 *
053 * @author Lars Ivar Almli
054 */
055 public class JmsMessageBroker extends AbstractMessageBroker {
056 public JmsMessageBroker(MessageBrokerConfig config) {
057 super(config);
058 }
059 protected void onFirstSubscribe(String channel) throws MactorException {
060 createSubscriber(config.getRequieredChannelConfig(channel));
061 }
062 public void publish(String channel, Message message) throws MactorException {
063 getSender(config.getRequieredChannelConfig(channel)).send(message);
064 }
065 public Message publishWithResponse(String channel, Message message) throws MactorException {
066 return getSender(config.getRequieredChannelConfig(channel)).send(message);
067 }
068 Map<String, Subscriber> subscribers = new HashMap<String, Subscriber>();
069 public void createSubscriber(ChannelConfig channelConfig) throws MactorException {
070 subscribers.put(channelConfig.getName(), new Subscriber(channelConfig));
071 }
072 @Override
073 protected void onLastSubscribe(String channel) throws MactorException {
074 Subscriber s = subscribers.remove(channel);
075 if (s != null)
076 s.close();
077 }
078 Map<String, Sender> senders = new HashMap<String, Sender>();
079 private synchronized Sender getSender(ChannelConfig channelConfig) throws MactorException {
080 Sender s = senders.get(channelConfig.getName());
081 if (s == null) {
082 s = new Sender(channelConfig);
083 senders.put(channelConfig.getName(), s);
084 }
085 return s;
086 }
087 private class Sender {
088 private QueueConnection qcon;
089 private QueueSession qsession;
090 private QueueSender qsender;
091 private boolean respReq;
092 public Sender(ChannelConfig channelConfig) throws MactorException {
093 try {
094 respReq = channelConfig.isRequiresResponse();
095 String url = channelConfig.getRequieredValue("url");
096 String connectionFactory = channelConfig.getRequieredValue("connection_factory");
097 String contextFactory = channelConfig.getRequieredValue("initial_context_factory");
098 String queueName = channelConfig.getRequieredValue("queue");
099 String userName = channelConfig.getValue("username");
100 String password = channelConfig.getValue("password");
101 InitialContext ctx = getInitialContext(url, contextFactory);
102 QueueConnectionFactory qconFactory = (QueueConnectionFactory) ctx.lookup(connectionFactory);
103 if (userName != null && userName.length() > 0)
104 this.qcon = qconFactory.createQueueConnection(userName, password);
105 else
106 this.qcon = qconFactory.createQueueConnection();
107 this.qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
108 Queue queue = (Queue) ctx.lookup(queueName);
109 this.qsender = qsession.createSender(queue);
110 qcon.start();
111 } catch (Exception e) {
112 throw new MactorException("Failed to create sender for channel '" + channelConfig.getName() + "'. Error:" + e.getMessage(), e);
113 }
114 }
115 public Message send(Message message) throws MactorException {
116 try {
117 TextMessage m = qsession.createTextMessage();
118 m.setText(message.getContent());
119 TemporaryQueue replyQueue = null;
120 if (respReq) {
121 replyQueue = qsession.createTemporaryQueue();
122 m.setJMSReplyTo(replyQueue);
123 }
124 qsender.send(m);
125 if (respReq) {
126 try {
127 javax.jms.Message resp = qsession.createReceiver(replyQueue).receive(5 * 60 * 1000);// 5
128 if (resp != null) {
129 String msgText = null;
130 if (resp instanceof TextMessage) {
131 msgText = ((TextMessage) resp).getText();
132 } else {
133 msgText = resp.toString();
134 }
135 if (msgText != null)
136 return Message.createMessage(msgText);
137 }
138 } finally {
139 replyQueue.delete();
140 }
141 }
142 return null;
143 } catch (JMSException je) {
144 throw new MactorException(je);
145 }
146 }
147 public void close() {
148 try {
149 qsender.close();
150 qsession.close();
151 qcon.close();
152 } catch (JMSException e) {
153 log.warn("Failed to release queue sender", e);
154 }
155 }
156 }
157 private static InitialContext getInitialContext(String url, String jndiFactory) throws NamingException {
158 Hashtable<String, String> env = new Hashtable<String, String>();
159 env.put(Context.INITIAL_CONTEXT_FACTORY, jndiFactory);
160 env.put(Context.PROVIDER_URL, url);
161 return new InitialContext(env);
162 }
163 private class Subscriber {
164 private QueueConnection qcon;
165 private QueueSession qsession;
166 private QueueReceiver qreceiver;
167 private String channel;
168 public Subscriber(ChannelConfig channelConfig) throws MactorException {
169 this.channel = channelConfig.getName();
170 try {
171 String url = channelConfig.getRequieredValue("url");
172 String connectionFactory = channelConfig.getRequieredValue("connection_factory");
173 String contextFactory = channelConfig.getRequieredValue("initial_context_factory");
174 String queueName = channelConfig.getRequieredValue("queue");
175 String userName = channelConfig.getValue("username");
176 String password = channelConfig.getValue("password");
177 InitialContext ctx = getInitialContext(url, contextFactory);
178 QueueConnectionFactory qconFactory = (QueueConnectionFactory) ctx.lookup(connectionFactory);
179 if (userName != null && userName.length() > 0)
180 this.qcon = qconFactory.createQueueConnection(userName, password);
181 else
182 this.qcon = qconFactory.createQueueConnection();
183 this.qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
184 Queue queue = (Queue) ctx.lookup(queueName);
185 this.qreceiver = qsession.createReceiver(queue);
186 this.qreceiver.setMessageListener(new MessageListener() {
187 public void onMessage(javax.jms.Message message) {
188 try {
189 String msgText = null;
190 if (message instanceof TextMessage) {
191 msgText = ((TextMessage) message).getText();
192 } else {
193 msgText = message.toString();
194 }
195 Message resp = raiseOnMessage(Subscriber.this.channel, Message.createMessage(msgText), false);
196 if (resp != null) {
197 Destination replyTo = message.getJMSReplyTo();
198 if (replyTo == null) {
199 log.warn("Unable to deliver the reply message. The message received on channel '" + Subscriber.this.channel + "' does not contain a 'replyTo' destination");
200 } else {
201 String corrId = message.getJMSCorrelationID();
202 TextMessage replyMessage = qsession.createTextMessage();
203 if (corrId != null)
204 replyMessage.setJMSCorrelationID(corrId);
205 replyMessage.setText(resp.getContent());
206 qsession.createSender((Queue) replyTo).send(replyMessage);
207 }
208 }
209 } catch (Exception e) {
210 log.warn("Exception after receiving jms message on channel '" + Subscriber.this.channel + "'. Message:" + message, e);
211 }
212 }
213 });
214 qcon.start();
215 } catch (Exception e) {
216 e.printStackTrace();
217 throw new MactorException("Failed to create receiver for channel '" + channelConfig.getName() + "'. Error:" + e.getMessage(), e);
218 }
219 }
220 public void close() {
221 try {
222 qreceiver.close();
223 qsession.close();
224 qcon.close();
225 } catch (JMSException e) {
226 log.warn("Failed to release queue receiver", e);
227 }
228 }
229 }
230 public void terminate() {
231 super.terminate();
232 for (Sender s : senders.values())
233 s.close();
234 senders.clear();
235 for (Subscriber s : subscribers.values())
236 s.close();
237 subscribers.clear();
238 }
239 }