001 /******************************************************************************
002 * Copyright (C) MActor Developers. All rights reserved. *
003 * ---------------------------------------------------------------------------*
004 * This file is part of MActor. *
005 * *
006 * MActor is free software; you can redistribute it and/or modify *
007 * it under the terms of the GNU General Public License as published by *
008 * the Free Software Foundation; either version 2 of the License, or *
009 * (at your option) any later version. *
010 * *
011 * MActor is distributed in the hope that it will be useful, *
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014 * GNU General Public License for more details. *
015 * *
016 * You should have received a copy of the GNU General Public License *
017 * along with MActor; if not, write to the Free Software *
018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019 ******************************************************************************/
020 package org.mactor.brokers.tibrv;
021
022 import java.util.HashMap;
023 import java.util.Map;
024
025 import org.mactor.brokers.AbstractMessageBroker;
026 import org.mactor.brokers.Message;
027 import org.mactor.brokers.MessageBroker;
028 import org.mactor.framework.MactorException;
029 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
030 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
031
032 import com.tibco.tibrv.Tibrv;
033 import com.tibco.tibrv.TibrvException;
034 import com.tibco.tibrv.TibrvListener;
035 import com.tibco.tibrv.TibrvMsg;
036 import com.tibco.tibrv.TibrvMsgCallback;
037 import com.tibco.tibrv.TibrvMsgField;
038 import com.tibco.tibrv.TibrvRvdTransport;
039 import com.tibco.tibrv.TibrvXml;
040
041 /**
042 * A message broker for Tibco Rendezvous (suitable for TIBCO BusinessWorks
043 * projects). (Add the tibrvj.jar library from your version of TIBCO Rv to the
044 * MActor classpath)
045 *
046 * @author Lars Ivar Almli
047 * @see MessageBroker
048 */
049 public class TibcoRvMessageBroker extends AbstractMessageBroker {
050 private TibrvRvdTransport transport;
051 private static final String FIELD_NAME = "xml";
052 public TibcoRvMessageBroker(MessageBrokerConfig config) throws MactorException {
053 super(config);
054 try {
055 Tibrv.open(Tibrv.IMPL_NATIVE);
056 this.transport = new TibrvRvdTransport(config.getValue("service"), config.getValue("network"), config.getValue("daemon"));
057 startDipatchThread();
058 } catch (TibrvException tre) {
059 throw new MactorException("Failed to initialize TibcoRvMessageBroker", tre);
060 }
061 }
062 @Override
063 protected void finalize() throws Throwable {
064 super.finalize();
065 terminate();
066 }
067 public void terminate() {
068 super.terminate();
069 try {
070 Tibrv.close();
071 } catch (Exception e) {
072 log.warn("Failed to close Tibrv connection", e);
073 }
074 }
075 public void publish(String channel, Message message) throws MactorException {
076 sendMessage(channel, message, false);
077 }
078 public Message publishWithResponse(String channel, Message message) throws MactorException {
079 return sendMessage(channel, message, true);
080 }
081 @Override
082 protected void onFirstSubscribe(String channel) throws MactorException {
083 registerListener(channel);
084 }
085 @Override
086 protected void onLastSubscribe(String channel) throws MactorException {
087 TibrvListener l = listeners.remove(channel);
088 if (l != null)
089 l.destroy();
090 }
091 private Message sendMessage(String channel, Message message, boolean expectResponse) throws MactorException {
092 ChannelConfig cc = config.getRequieredChannelConfig(channel);
093 String subject = cc.getRequieredValue("subject");
094 try {
095 TibrvMsg msg = new TibrvMsg();
096 msg.setSendSubject(subject);
097 msg.update(FIELD_NAME, new TibrvXml(message.getContent().getBytes()));
098 if (expectResponse) {
099 if (log.isDebugEnabled()) {
100 log.debug("Sending message on channel: '" + channel + "', subject: '" + subject + "', message: '" + msg + "'. Expecting reponse message..");
101 }
102 TibrvMsg responseMessage = transport.sendRequest(msg, 0);
103 return messageFromMessage(responseMessage);
104 } else {
105 if (log.isDebugEnabled()) {
106 log.debug("Sending message on channel: '" + channel + "', subject: '" + subject + "', message: '" + msg + "'. No reponse message expected");
107 }
108 transport.send(msg);
109 return null;
110 }
111 } catch (TibrvException tre) {
112 throw new MactorException(tre);
113 }
114 }
115 private void startDipatchThread() {
116 Thread t = new Thread(new Runnable() {
117 public void run() {
118 while (true) {
119 try {
120 Tibrv.defaultQueue().dispatch();
121 } catch (TibrvException e) {
122 log.error("Error while dispatching message from default queue", e);
123 } catch (InterruptedException ie) {
124 log.info("Interrupted. Terminating dispatch loop..");
125 return;
126 }
127 }
128 }
129 });
130 t.start();
131 }
132 private Message messageFromMessage(TibrvMsg msg) throws MactorException, TibrvException {
133 if (msg == null)
134 throw new MactorException("Received an empty message");
135 TibrvMsgField field = msg.getField(FIELD_NAME);
136 if (field == null)
137 throw new MactorException("Received a message without a '" + FIELD_NAME + "' field");
138 if (!(field.data instanceof TibrvXml)) {
139 throw new MactorException("Received a message where the '" + FIELD_NAME + "' field does not contain XML");
140 }
141 return Message.createMessage(new String(((TibrvXml) field.data).getBytes()));
142 }
143 Map<String, TibrvListener> listeners = new HashMap<String, TibrvListener>();
144 private synchronized void registerListener(String channel) throws MactorException {
145 ChannelConfig cc = config.getRequieredChannelConfig(channel);
146 String subject = cc.getRequieredValue("subject");
147 try {
148 TibrvListener l = new TibrvListener(Tibrv.defaultQueue(), new SubjectListener(channel), transport, subject, null);
149 listeners.put(channel, l);
150 } catch (TibrvException tre) {
151 throw new MactorException("Failed to create listener for channel '" + channel + "'", tre);
152 }
153 }
154 private class SubjectListener implements TibrvMsgCallback {
155 String channel;
156 SubjectListener(String channel) {
157 this.channel = channel;
158 }
159 public void onMsg(TibrvListener listener, TibrvMsg msg) {
160 try {
161 if (log.isDebugEnabled()) {
162 log.debug("Received message on channel: '" + channel + "', subject: '" + msg.getSendSubject() + "', with reply subject: '" + msg.getReplySubject() + "', message: '" + msg + "'");
163 }
164 Message m = messageFromMessage(msg);
165 Message resultMessage = raiseOnMessage(channel, m, false);
166 if (resultMessage != null && msg.getReplySubject() != null) {
167 TibrvMsg reyplyMsg = new TibrvMsg();
168 reyplyMsg.update(FIELD_NAME, new TibrvXml(resultMessage.getContent().getBytes()));
169 transport.sendReply(reyplyMsg, msg);
170 }
171 } catch (MactorException me) {
172 log.info("Exception while processing message from channel '" + channel + "'", me);
173 } catch (TibrvException tre) {
174 log.warn("Tibco exception while processing message from channel '" + channel + "'", tre);
175 }
176 }
177 }
178 }