001    /******************************************************************************
002     * Copyright (C) MActor Developers. All rights reserved.                        *
003     * ---------------------------------------------------------------------------*
004     * This file is part of MActor.                                               *
005     *                                                                            *
006     * MActor is free software; you can redistribute it and/or modify             *
007     * it under the terms of the GNU General Public License as published by       *
008     * the Free Software Foundation; either version 2 of the License, or          *
009     * (at your option) any later version.                                        *
010     *                                                                            *
011     * MActor is distributed in the hope that it will be useful,                  *
012     * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
013     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
014     * GNU General Public License for more details.                               *
015     *                                                                            *
016     * You should have received a copy of the GNU General Public License          *
017     * along with MActor; if not, write to the Free Software                      *
018     * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA *
019     ******************************************************************************/
020    package org.mactor.brokers.tibrv;
021    
022    import java.util.HashMap;
023    import java.util.Map;
024    
025    import org.mactor.brokers.AbstractMessageBroker;
026    import org.mactor.brokers.Message;
027    import org.mactor.brokers.MessageBroker;
028    import org.mactor.framework.MactorException;
029    import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
030    import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
031    
032    import com.tibco.tibrv.Tibrv;
033    import com.tibco.tibrv.TibrvException;
034    import com.tibco.tibrv.TibrvListener;
035    import com.tibco.tibrv.TibrvMsg;
036    import com.tibco.tibrv.TibrvMsgCallback;
037    import com.tibco.tibrv.TibrvMsgField;
038    import com.tibco.tibrv.TibrvRvdTransport;
039    import com.tibco.tibrv.TibrvXml;
040    
041    /**
042     * A message broker for Tibco Rendezvous (suitable for TIBCO BusinessWorks
043     * projects). (Add the tibrvj.jar library from your version of TIBCO Rv to the
044     * MActor classpath)
045     * 
046     * @author Lars Ivar Almli
047     * @see MessageBroker
048     */
049    public class TibcoRvMessageBroker extends AbstractMessageBroker {
050            private TibrvRvdTransport transport;
051            private static final String FIELD_NAME = "xml";
052            public TibcoRvMessageBroker(MessageBrokerConfig config) throws MactorException {
053                    super(config);
054                    try {
055                            Tibrv.open(Tibrv.IMPL_NATIVE);
056                            this.transport = new TibrvRvdTransport(config.getValue("service"), config.getValue("network"), config.getValue("daemon"));
057                            startDipatchThread();
058                    } catch (TibrvException tre) {
059                            throw new MactorException("Failed to initialize TibcoRvMessageBroker", tre);
060                    }
061            }
062            @Override
063            protected void finalize() throws Throwable {
064                    super.finalize();
065                    terminate();
066            }
067            public void terminate() {
068                    super.terminate();
069                    try {
070                            Tibrv.close();
071                    } catch (Exception e) {
072                            log.warn("Failed to close Tibrv connection", e);
073                    }
074            }
075            public void publish(String channel, Message message) throws MactorException {
076                    sendMessage(channel, message, false);
077            }
078            public Message publishWithResponse(String channel, Message message) throws MactorException {
079                    return sendMessage(channel, message, true);
080            }
081            @Override
082            protected void onFirstSubscribe(String channel) throws MactorException {
083                    registerListener(channel);
084            }
085            @Override
086            protected void onLastSubscribe(String channel) throws MactorException {
087                    TibrvListener l = listeners.remove(channel);
088                    if (l != null)
089                            l.destroy();
090            }
091            private Message sendMessage(String channel, Message message, boolean expectResponse) throws MactorException {
092                    ChannelConfig cc = config.getRequieredChannelConfig(channel);
093                    String subject = cc.getRequieredValue("subject");
094                    try {
095                            TibrvMsg msg = new TibrvMsg();
096                            msg.setSendSubject(subject);
097                            msg.update(FIELD_NAME, new TibrvXml(message.getContent().getBytes()));
098                            if (expectResponse) {
099                                    if (log.isDebugEnabled()) {
100                                            log.debug("Sending message on channel: '" + channel + "', subject: '" + subject + "', message: '" + msg + "'. Expecting reponse message..");
101                                    }
102                                    TibrvMsg responseMessage = transport.sendRequest(msg, 0);
103                                    return messageFromMessage(responseMessage);
104                            } else {
105                                    if (log.isDebugEnabled()) {
106                                            log.debug("Sending message on channel: '" + channel + "', subject: '" + subject + "', message: '" + msg + "'. No reponse message expected");
107                                    }
108                                    transport.send(msg);
109                                    return null;
110                            }
111                    } catch (TibrvException tre) {
112                            throw new MactorException(tre);
113                    }
114            }
115            private void startDipatchThread() {
116                    Thread t = new Thread(new Runnable() {
117                            public void run() {
118                                    while (true) {
119                                            try {
120                                                    Tibrv.defaultQueue().dispatch();
121                                            } catch (TibrvException e) {
122                                                    log.error("Error while dispatching message from default queue", e);
123                                            } catch (InterruptedException ie) {
124                                                    log.info("Interrupted. Terminating dispatch loop..");
125                                                    return;
126                                            }
127                                    }
128                            }
129                    });
130                    t.start();
131            }
132            private Message messageFromMessage(TibrvMsg msg) throws MactorException, TibrvException {
133                    if (msg == null)
134                            throw new MactorException("Received an empty message");
135                    TibrvMsgField field = msg.getField(FIELD_NAME);
136                    if (field == null)
137                            throw new MactorException("Received a message without a '" + FIELD_NAME + "' field");
138                    if (!(field.data instanceof TibrvXml)) {
139                            throw new MactorException("Received a message where the '" + FIELD_NAME + "' field does not contain XML");
140                    }
141                    return Message.createMessage(new String(((TibrvXml) field.data).getBytes()));
142            }
143            Map<String, TibrvListener> listeners = new HashMap<String, TibrvListener>();
144            private synchronized void registerListener(String channel) throws MactorException {
145                    ChannelConfig cc = config.getRequieredChannelConfig(channel);
146                    String subject = cc.getRequieredValue("subject");
147                    try {
148                            TibrvListener l = new TibrvListener(Tibrv.defaultQueue(), new SubjectListener(channel), transport, subject, null);
149                            listeners.put(channel, l);
150                    } catch (TibrvException tre) {
151                            throw new MactorException("Failed to create listener for channel '" + channel + "'", tre);
152                    }
153            }
154            private class SubjectListener implements TibrvMsgCallback {
155                    String channel;
156                    SubjectListener(String channel) {
157                            this.channel = channel;
158                    }
159                    public void onMsg(TibrvListener listener, TibrvMsg msg) {
160                            try {
161                                    if (log.isDebugEnabled()) {
162                                            log.debug("Received message on channel: '" + channel + "', subject: '" + msg.getSendSubject() + "', with reply subject: '" + msg.getReplySubject() + "', message: '" + msg + "'");
163                                    }
164                                    Message m = messageFromMessage(msg);
165                                    Message resultMessage = raiseOnMessage(channel, m, false);
166                                    if (resultMessage != null && msg.getReplySubject() != null) {
167                                            TibrvMsg reyplyMsg = new TibrvMsg();
168                                            reyplyMsg.update(FIELD_NAME, new TibrvXml(resultMessage.getContent().getBytes()));
169                                            transport.sendReply(reyplyMsg, msg);
170                                    }
171                            } catch (MactorException me) {
172                                    log.info("Exception while processing message from channel '" + channel + "'", me);
173                            } catch (TibrvException tre) {
174                                    log.warn("Tibco exception while processing message from channel '" + channel + "'", tre);
175                            }
176                    }
177            }
178    }