001 /****************************************************************************** 002 * Copyright (C) MActor Developers. All rights reserved. * 003 * ---------------------------------------------------------------------------* 004 * This file is part of MActor. * 005 * * 006 * MActor is free software; you can redistribute it and/or modify * 007 * it under the terms of the GNU General Public License as published by * 008 * the Free Software Foundation; either version 2 of the License, or * 009 * (at your option) any later version. * 010 * * 011 * MActor is distributed in the hope that it will be useful, * 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 014 * GNU General Public License for more details. * 015 * * 016 * You should have received a copy of the GNU General Public License * 017 * along with MActor; if not, write to the Free Software * 018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * 019 ******************************************************************************/ 020 package org.mactor.brokers.jms; 021 022 import java.util.HashMap; 023 import java.util.Hashtable; 024 import java.util.Map; 025 026 import javax.jms.Destination; 027 import javax.jms.JMSException; 028 import javax.jms.MessageListener; 029 import javax.jms.Queue; 030 import javax.jms.QueueConnection; 031 import javax.jms.QueueConnectionFactory; 032 import javax.jms.QueueReceiver; 033 import javax.jms.QueueSender; 034 import javax.jms.QueueSession; 035 import javax.jms.Session; 036 import javax.jms.TemporaryQueue; 037 import javax.jms.TextMessage; 038 import javax.naming.Context; 039 import javax.naming.InitialContext; 040 import javax.naming.NamingException; 041 042 import org.mactor.brokers.AbstractMessageBroker; 043 import org.mactor.brokers.Message; 044 import org.mactor.framework.MactorException; 045 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig; 046 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig; 047 048 /** 049 * A message broker for JMS (it should work with any JMS implementation - just 050 * add the client JMS libraries from the particular JMS product to the MActor 051 * class path) 052 * 053 * @author Lars Ivar Almli 054 */ 055 public class JmsMessageBroker extends AbstractMessageBroker { 056 public JmsMessageBroker(MessageBrokerConfig config) { 057 super(config); 058 } 059 protected void onFirstSubscribe(String channel) throws MactorException { 060 createSubscriber(config.getRequieredChannelConfig(channel)); 061 } 062 public void publish(String channel, Message message) throws MactorException { 063 getSender(config.getRequieredChannelConfig(channel)).send(message); 064 } 065 public Message publishWithResponse(String channel, Message message) throws MactorException { 066 return getSender(config.getRequieredChannelConfig(channel)).send(message); 067 } 068 Map<String, Subscriber> subscribers = new HashMap<String, Subscriber>(); 069 public void createSubscriber(ChannelConfig channelConfig) throws MactorException { 070 subscribers.put(channelConfig.getName(), new Subscriber(channelConfig)); 071 } 072 @Override 073 protected void onLastSubscribe(String channel) throws MactorException { 074 Subscriber s = subscribers.remove(channel); 075 if (s != null) 076 s.close(); 077 } 078 Map<String, Sender> senders = new HashMap<String, Sender>(); 079 private synchronized Sender getSender(ChannelConfig channelConfig) throws MactorException { 080 Sender s = senders.get(channelConfig.getName()); 081 if (s == null) { 082 s = new Sender(channelConfig); 083 senders.put(channelConfig.getName(), s); 084 } 085 return s; 086 } 087 private class Sender { 088 private QueueConnection qcon; 089 private QueueSession qsession; 090 private QueueSender qsender; 091 private boolean respReq; 092 public Sender(ChannelConfig channelConfig) throws MactorException { 093 try { 094 respReq = channelConfig.isRequiresResponse(); 095 String url = channelConfig.getRequieredValue("url"); 096 String connectionFactory = channelConfig.getRequieredValue("connection_factory"); 097 String contextFactory = channelConfig.getRequieredValue("initial_context_factory"); 098 String queueName = channelConfig.getRequieredValue("queue"); 099 String userName = channelConfig.getValue("username"); 100 String password = channelConfig.getValue("password"); 101 InitialContext ctx = getInitialContext(url, contextFactory); 102 QueueConnectionFactory qconFactory = (QueueConnectionFactory) ctx.lookup(connectionFactory); 103 if (userName != null && userName.length() > 0) 104 this.qcon = qconFactory.createQueueConnection(userName, password); 105 else 106 this.qcon = qconFactory.createQueueConnection(); 107 this.qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 108 Queue queue = (Queue) ctx.lookup(queueName); 109 this.qsender = qsession.createSender(queue); 110 qcon.start(); 111 } catch (Exception e) { 112 throw new MactorException("Failed to create sender for channel '" + channelConfig.getName() + "'. Error:" + e.getMessage(), e); 113 } 114 } 115 public Message send(Message message) throws MactorException { 116 try { 117 TextMessage m = qsession.createTextMessage(); 118 m.setText(message.getContent()); 119 TemporaryQueue replyQueue = null; 120 if (respReq) { 121 replyQueue = qsession.createTemporaryQueue(); 122 m.setJMSReplyTo(replyQueue); 123 } 124 qsender.send(m); 125 if (respReq) { 126 try { 127 javax.jms.Message resp = qsession.createReceiver(replyQueue).receive(5 * 60 * 1000);// 5 128 if (resp != null) { 129 String msgText = null; 130 if (resp instanceof TextMessage) { 131 msgText = ((TextMessage) resp).getText(); 132 } else { 133 msgText = resp.toString(); 134 } 135 if (msgText != null) 136 return Message.createMessage(msgText); 137 } 138 } finally { 139 replyQueue.delete(); 140 } 141 } 142 return null; 143 } catch (JMSException je) { 144 throw new MactorException(je); 145 } 146 } 147 public void close() { 148 try { 149 qsender.close(); 150 qsession.close(); 151 qcon.close(); 152 } catch (JMSException e) { 153 log.warn("Failed to release queue sender", e); 154 } 155 } 156 } 157 private static InitialContext getInitialContext(String url, String jndiFactory) throws NamingException { 158 Hashtable<String, String> env = new Hashtable<String, String>(); 159 env.put(Context.INITIAL_CONTEXT_FACTORY, jndiFactory); 160 env.put(Context.PROVIDER_URL, url); 161 return new InitialContext(env); 162 } 163 private class Subscriber { 164 private QueueConnection qcon; 165 private QueueSession qsession; 166 private QueueReceiver qreceiver; 167 private String channel; 168 public Subscriber(ChannelConfig channelConfig) throws MactorException { 169 this.channel = channelConfig.getName(); 170 try { 171 String url = channelConfig.getRequieredValue("url"); 172 String connectionFactory = channelConfig.getRequieredValue("connection_factory"); 173 String contextFactory = channelConfig.getRequieredValue("initial_context_factory"); 174 String queueName = channelConfig.getRequieredValue("queue"); 175 String userName = channelConfig.getValue("username"); 176 String password = channelConfig.getValue("password"); 177 InitialContext ctx = getInitialContext(url, contextFactory); 178 QueueConnectionFactory qconFactory = (QueueConnectionFactory) ctx.lookup(connectionFactory); 179 if (userName != null && userName.length() > 0) 180 this.qcon = qconFactory.createQueueConnection(userName, password); 181 else 182 this.qcon = qconFactory.createQueueConnection(); 183 this.qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 184 Queue queue = (Queue) ctx.lookup(queueName); 185 this.qreceiver = qsession.createReceiver(queue); 186 this.qreceiver.setMessageListener(new MessageListener() { 187 public void onMessage(javax.jms.Message message) { 188 try { 189 String msgText = null; 190 if (message instanceof TextMessage) { 191 msgText = ((TextMessage) message).getText(); 192 } else { 193 msgText = message.toString(); 194 } 195 Message resp = raiseOnMessage(Subscriber.this.channel, Message.createMessage(msgText), false); 196 if (resp != null) { 197 Destination replyTo = message.getJMSReplyTo(); 198 if (replyTo == null) { 199 log.warn("Unable to deliver the reply message. The message received on channel '" + Subscriber.this.channel + "' does not contain a 'replyTo' destination"); 200 } else { 201 String corrId = message.getJMSCorrelationID(); 202 TextMessage replyMessage = qsession.createTextMessage(); 203 if (corrId != null) 204 replyMessage.setJMSCorrelationID(corrId); 205 replyMessage.setText(resp.getContent()); 206 qsession.createSender((Queue) replyTo).send(replyMessage); 207 } 208 } 209 } catch (Exception e) { 210 log.warn("Exception after receiving jms message on channel '" + Subscriber.this.channel + "'. Message:" + message, e); 211 } 212 } 213 }); 214 qcon.start(); 215 } catch (Exception e) { 216 e.printStackTrace(); 217 throw new MactorException("Failed to create receiver for channel '" + channelConfig.getName() + "'. Error:" + e.getMessage(), e); 218 } 219 } 220 public void close() { 221 try { 222 qreceiver.close(); 223 qsession.close(); 224 qcon.close(); 225 } catch (JMSException e) { 226 log.warn("Failed to release queue receiver", e); 227 } 228 } 229 } 230 public void terminate() { 231 super.terminate(); 232 for (Sender s : senders.values()) 233 s.close(); 234 senders.clear(); 235 for (Subscriber s : subscribers.values()) 236 s.close(); 237 subscribers.clear(); 238 } 239 }