001    /******************************************************************************
002     * Copyright (C) MActor Developers. All rights reserved.                        *
003     * ---------------------------------------------------------------------------*
004     * This file is part of MActor.                                               *
005     *                                                                            *
006     * MActor is free software; you can redistribute it and/or modify             *
007     * it under the terms of the GNU General Public License as published by       *
008     * the Free Software Foundation; either version 2 of the License, or          *
009     * (at your option) any later version.                                        *
010     *                                                                            *
011     * MActor is distributed in the hope that it will be useful,                  *
012     * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
013     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
014     * GNU General Public License for more details.                               *
015     *                                                                            *
016     * You should have received a copy of the GNU General Public License          *
017     * along with MActor; if not, write to the Free Software                      *
018     * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA *
019     ******************************************************************************/
020    package org.mactor.brokers.jms;
021    
022    import java.util.HashMap;
023    import java.util.Hashtable;
024    import java.util.Map;
025    
026    import javax.jms.Destination;
027    import javax.jms.JMSException;
028    import javax.jms.MessageListener;
029    import javax.jms.Queue;
030    import javax.jms.QueueConnection;
031    import javax.jms.QueueConnectionFactory;
032    import javax.jms.QueueReceiver;
033    import javax.jms.QueueSender;
034    import javax.jms.QueueSession;
035    import javax.jms.Session;
036    import javax.jms.TemporaryQueue;
037    import javax.jms.TextMessage;
038    import javax.naming.Context;
039    import javax.naming.InitialContext;
040    import javax.naming.NamingException;
041    
042    import org.mactor.brokers.AbstractMessageBroker;
043    import org.mactor.brokers.Message;
044    import org.mactor.framework.MactorException;
045    import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
046    import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
047    
048    /**
049     * A message broker for JMS (it should work with any JMS implementation - just
050     * add the client JMS libraries from the particular JMS product to the MActor
051     * class path)
052     * 
053     * @author Lars Ivar Almli
054     */
055    public class JmsMessageBroker extends AbstractMessageBroker {
056            public JmsMessageBroker(MessageBrokerConfig config) {
057                    super(config);
058            }
059            protected void onFirstSubscribe(String channel) throws MactorException {
060                    createSubscriber(config.getRequieredChannelConfig(channel));
061            }
062            public void publish(String channel, Message message) throws MactorException {
063                    getSender(config.getRequieredChannelConfig(channel)).send(message);
064            }
065            public Message publishWithResponse(String channel, Message message) throws MactorException {
066                    return getSender(config.getRequieredChannelConfig(channel)).send(message);
067            }
068            Map<String, Subscriber> subscribers = new HashMap<String, Subscriber>();
069            public void createSubscriber(ChannelConfig channelConfig) throws MactorException {
070                    subscribers.put(channelConfig.getName(), new Subscriber(channelConfig));
071            }
072            @Override
073            protected void onLastSubscribe(String channel) throws MactorException {
074                    Subscriber s = subscribers.remove(channel);
075                    if (s != null)
076                            s.close();
077            }
078            Map<String, Sender> senders = new HashMap<String, Sender>();
079            private synchronized Sender getSender(ChannelConfig channelConfig) throws MactorException {
080                    Sender s = senders.get(channelConfig.getName());
081                    if (s == null) {
082                            s = new Sender(channelConfig);
083                            senders.put(channelConfig.getName(), s);
084                    }
085                    return s;
086            }
087            private class Sender {
088                    private QueueConnection qcon;
089                    private QueueSession qsession;
090                    private QueueSender qsender;
091                    private boolean respReq;
092                    public Sender(ChannelConfig channelConfig) throws MactorException {
093                            try {
094                                    respReq = channelConfig.isRequiresResponse();
095                                    String url = channelConfig.getRequieredValue("url");
096                                    String connectionFactory = channelConfig.getRequieredValue("connection_factory");
097                                    String contextFactory = channelConfig.getRequieredValue("initial_context_factory");
098                                    String queueName = channelConfig.getRequieredValue("queue");
099                                    String userName = channelConfig.getValue("username");
100                                    String password = channelConfig.getValue("password");
101                                    InitialContext ctx = getInitialContext(url, contextFactory);
102                                    QueueConnectionFactory qconFactory = (QueueConnectionFactory) ctx.lookup(connectionFactory);
103                                    if (userName != null && userName.length() > 0)
104                                            this.qcon = qconFactory.createQueueConnection(userName, password);
105                                    else
106                                            this.qcon = qconFactory.createQueueConnection();
107                                    this.qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
108                                    Queue queue = (Queue) ctx.lookup(queueName);
109                                    this.qsender = qsession.createSender(queue);
110                                    qcon.start();
111                            } catch (Exception e) {
112                                    throw new MactorException("Failed to create sender for channel '" + channelConfig.getName() + "'. Error:" + e.getMessage(), e);
113                            }
114                    }
115                    public Message send(Message message) throws MactorException {
116                            try {
117                                    TextMessage m = qsession.createTextMessage();
118                                    m.setText(message.getContent());
119                                    TemporaryQueue replyQueue = null;
120                                    if (respReq) {
121                                            replyQueue = qsession.createTemporaryQueue();
122                                            m.setJMSReplyTo(replyQueue);
123                                    }
124                                    qsender.send(m);
125                                    if (respReq) {
126                                            try {
127                                                    javax.jms.Message resp = qsession.createReceiver(replyQueue).receive(5 * 60 * 1000);// 5
128                                                    if (resp != null) {
129                                                            String msgText = null;
130                                                            if (resp instanceof TextMessage) {
131                                                                    msgText = ((TextMessage) resp).getText();
132                                                            } else {
133                                                                    msgText = resp.toString();
134                                                            }
135                                                            if (msgText != null)
136                                                                    return Message.createMessage(msgText);
137                                                    }
138                                            } finally {
139                                                    replyQueue.delete();
140                                            }
141                                    }
142                                    return null;
143                            } catch (JMSException je) {
144                                    throw new MactorException(je);
145                            }
146                    }
147                    public void close() {
148                            try {
149                                    qsender.close();
150                                    qsession.close();
151                                    qcon.close();
152                            } catch (JMSException e) {
153                                    log.warn("Failed to release queue sender", e);
154                            }
155                    }
156            }
157            private static InitialContext getInitialContext(String url, String jndiFactory) throws NamingException {
158                    Hashtable<String, String> env = new Hashtable<String, String>();
159                    env.put(Context.INITIAL_CONTEXT_FACTORY, jndiFactory);
160                    env.put(Context.PROVIDER_URL, url);
161                    return new InitialContext(env);
162            }
163            private class Subscriber {
164                    private QueueConnection qcon;
165                    private QueueSession qsession;
166                    private QueueReceiver qreceiver;
167                    private String channel;
168                    public Subscriber(ChannelConfig channelConfig) throws MactorException {
169                            this.channel = channelConfig.getName();
170                            try {
171                                    String url = channelConfig.getRequieredValue("url");
172                                    String connectionFactory = channelConfig.getRequieredValue("connection_factory");
173                                    String contextFactory = channelConfig.getRequieredValue("initial_context_factory");
174                                    String queueName = channelConfig.getRequieredValue("queue");
175                                    String userName = channelConfig.getValue("username");
176                                    String password = channelConfig.getValue("password");
177                                    InitialContext ctx = getInitialContext(url, contextFactory);
178                                    QueueConnectionFactory qconFactory = (QueueConnectionFactory) ctx.lookup(connectionFactory);
179                                    if (userName != null && userName.length() > 0)
180                                            this.qcon = qconFactory.createQueueConnection(userName, password);
181                                    else
182                                            this.qcon = qconFactory.createQueueConnection();
183                                    this.qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
184                                    Queue queue = (Queue) ctx.lookup(queueName);
185                                    this.qreceiver = qsession.createReceiver(queue);
186                                    this.qreceiver.setMessageListener(new MessageListener() {
187                                            public void onMessage(javax.jms.Message message) {
188                                                    try {
189                                                            String msgText = null;
190                                                            if (message instanceof TextMessage) {
191                                                                    msgText = ((TextMessage) message).getText();
192                                                            } else {
193                                                                    msgText = message.toString();
194                                                            }
195                                                            Message resp = raiseOnMessage(Subscriber.this.channel, Message.createMessage(msgText), false);
196                                                            if (resp != null) {
197                                                                    Destination replyTo = message.getJMSReplyTo();
198                                                                    if (replyTo == null) {
199                                                                            log.warn("Unable to deliver the reply message. The message received on channel '" + Subscriber.this.channel + "' does not contain a 'replyTo' destination");
200                                                                    } else {
201                                                                            String corrId = message.getJMSCorrelationID();
202                                                                            TextMessage replyMessage = qsession.createTextMessage();
203                                                                            if (corrId != null)
204                                                                                    replyMessage.setJMSCorrelationID(corrId);
205                                                                            replyMessage.setText(resp.getContent());
206                                                                            qsession.createSender((Queue) replyTo).send(replyMessage);
207                                                                    }
208                                                            }
209                                                    } catch (Exception e) {
210                                                            log.warn("Exception after receiving jms message on channel '" + Subscriber.this.channel + "'. Message:" + message, e);
211                                                    }
212                                            }
213                                    });
214                                    qcon.start();
215                            } catch (Exception e) {
216                                    e.printStackTrace();
217                                    throw new MactorException("Failed to create receiver for channel '" + channelConfig.getName() + "'. Error:" + e.getMessage(), e);
218                            }
219                    }
220                    public void close() {
221                            try {
222                                    qreceiver.close();
223                                    qsession.close();
224                                    qcon.close();
225                            } catch (JMSException e) {
226                                    log.warn("Failed to release queue receiver", e);
227                            }
228                    }
229            }
230            public void terminate() {
231                    super.terminate();
232                    for (Sender s : senders.values())
233                            s.close();
234                    senders.clear();
235                    for (Subscriber s : subscribers.values())
236                            s.close();
237                    subscribers.clear();
238            }
239    }