001 /****************************************************************************** 002 * Copyright (C) MActor Developers. All rights reserved. * 003 * ---------------------------------------------------------------------------* 004 * This file is part of MActor. * 005 * * 006 * MActor is free software; you can redistribute it and/or modify * 007 * it under the terms of the GNU General Public License as published by * 008 * the Free Software Foundation; either version 2 of the License, or * 009 * (at your option) any later version. * 010 * * 011 * MActor is distributed in the hope that it will be useful, * 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 014 * GNU General Public License for more details. * 015 * * 016 * You should have received a copy of the GNU General Public License * 017 * along with MActor; if not, write to the Free Software * 018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * 019 ******************************************************************************/ 020 package org.mactor.brokers.tibrv; 021 022 import java.util.HashMap; 023 import java.util.Map; 024 025 import org.mactor.brokers.AbstractMessageBroker; 026 import org.mactor.brokers.Message; 027 import org.mactor.brokers.MessageBroker; 028 import org.mactor.framework.MactorException; 029 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig; 030 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig; 031 032 import com.tibco.tibrv.Tibrv; 033 import com.tibco.tibrv.TibrvException; 034 import com.tibco.tibrv.TibrvListener; 035 import com.tibco.tibrv.TibrvMsg; 036 import com.tibco.tibrv.TibrvMsgCallback; 037 import com.tibco.tibrv.TibrvMsgField; 038 import com.tibco.tibrv.TibrvRvdTransport; 039 import com.tibco.tibrv.TibrvXml; 040 041 /** 042 * A message broker for Tibco Rendezvous (suitable for TIBCO BusinessWorks 043 * projects). (Add the tibrvj.jar library from your version of TIBCO Rv to the 044 * MActor classpath) 045 * 046 * @author Lars Ivar Almli 047 * @see MessageBroker 048 */ 049 public class TibcoRvMessageBroker extends AbstractMessageBroker { 050 private TibrvRvdTransport transport; 051 private static final String FIELD_NAME = "xml"; 052 public TibcoRvMessageBroker(MessageBrokerConfig config) throws MactorException { 053 super(config); 054 try { 055 Tibrv.open(Tibrv.IMPL_NATIVE); 056 this.transport = new TibrvRvdTransport(config.getValue("service"), config.getValue("network"), config.getValue("daemon")); 057 startDipatchThread(); 058 } catch (TibrvException tre) { 059 throw new MactorException("Failed to initialize TibcoRvMessageBroker", tre); 060 } 061 } 062 @Override 063 protected void finalize() throws Throwable { 064 super.finalize(); 065 terminate(); 066 } 067 public void terminate() { 068 super.terminate(); 069 try { 070 Tibrv.close(); 071 } catch (Exception e) { 072 log.warn("Failed to close Tibrv connection", e); 073 } 074 } 075 public void publish(String channel, Message message) throws MactorException { 076 sendMessage(channel, message, false); 077 } 078 public Message publishWithResponse(String channel, Message message) throws MactorException { 079 return sendMessage(channel, message, true); 080 } 081 @Override 082 protected void onFirstSubscribe(String channel) throws MactorException { 083 registerListener(channel); 084 } 085 @Override 086 protected void onLastSubscribe(String channel) throws MactorException { 087 TibrvListener l = listeners.remove(channel); 088 if (l != null) 089 l.destroy(); 090 } 091 private Message sendMessage(String channel, Message message, boolean expectResponse) throws MactorException { 092 ChannelConfig cc = config.getRequieredChannelConfig(channel); 093 String subject = cc.getRequieredValue("subject"); 094 try { 095 TibrvMsg msg = new TibrvMsg(); 096 msg.setSendSubject(subject); 097 msg.update(FIELD_NAME, new TibrvXml(message.getContent().getBytes())); 098 if (expectResponse) { 099 if (log.isDebugEnabled()) { 100 log.debug("Sending message on channel: '" + channel + "', subject: '" + subject + "', message: '" + msg + "'. Expecting reponse message.."); 101 } 102 TibrvMsg responseMessage = transport.sendRequest(msg, 0); 103 return messageFromMessage(responseMessage); 104 } else { 105 if (log.isDebugEnabled()) { 106 log.debug("Sending message on channel: '" + channel + "', subject: '" + subject + "', message: '" + msg + "'. No reponse message expected"); 107 } 108 transport.send(msg); 109 return null; 110 } 111 } catch (TibrvException tre) { 112 throw new MactorException(tre); 113 } 114 } 115 private void startDipatchThread() { 116 Thread t = new Thread(new Runnable() { 117 public void run() { 118 while (true) { 119 try { 120 Tibrv.defaultQueue().dispatch(); 121 } catch (TibrvException e) { 122 log.error("Error while dispatching message from default queue", e); 123 } catch (InterruptedException ie) { 124 log.info("Interrupted. Terminating dispatch loop.."); 125 return; 126 } 127 } 128 } 129 }); 130 t.start(); 131 } 132 private Message messageFromMessage(TibrvMsg msg) throws MactorException, TibrvException { 133 if (msg == null) 134 throw new MactorException("Received an empty message"); 135 TibrvMsgField field = msg.getField(FIELD_NAME); 136 if (field == null) 137 throw new MactorException("Received a message without a '" + FIELD_NAME + "' field"); 138 if (!(field.data instanceof TibrvXml)) { 139 throw new MactorException("Received a message where the '" + FIELD_NAME + "' field does not contain XML"); 140 } 141 return Message.createMessage(new String(((TibrvXml) field.data).getBytes())); 142 } 143 Map<String, TibrvListener> listeners = new HashMap<String, TibrvListener>(); 144 private synchronized void registerListener(String channel) throws MactorException { 145 ChannelConfig cc = config.getRequieredChannelConfig(channel); 146 String subject = cc.getRequieredValue("subject"); 147 try { 148 TibrvListener l = new TibrvListener(Tibrv.defaultQueue(), new SubjectListener(channel), transport, subject, null); 149 listeners.put(channel, l); 150 } catch (TibrvException tre) { 151 throw new MactorException("Failed to create listener for channel '" + channel + "'", tre); 152 } 153 } 154 private class SubjectListener implements TibrvMsgCallback { 155 String channel; 156 SubjectListener(String channel) { 157 this.channel = channel; 158 } 159 public void onMsg(TibrvListener listener, TibrvMsg msg) { 160 try { 161 if (log.isDebugEnabled()) { 162 log.debug("Received message on channel: '" + channel + "', subject: '" + msg.getSendSubject() + "', with reply subject: '" + msg.getReplySubject() + "', message: '" + msg + "'"); 163 } 164 Message m = messageFromMessage(msg); 165 Message resultMessage = raiseOnMessage(channel, m, false); 166 if (resultMessage != null && msg.getReplySubject() != null) { 167 TibrvMsg reyplyMsg = new TibrvMsg(); 168 reyplyMsg.update(FIELD_NAME, new TibrvXml(resultMessage.getContent().getBytes())); 169 transport.sendReply(reyplyMsg, msg); 170 } 171 } catch (MactorException me) { 172 log.info("Exception while processing message from channel '" + channel + "'", me); 173 } catch (TibrvException tre) { 174 log.warn("Tibco exception while processing message from channel '" + channel + "'", tre); 175 } 176 } 177 } 178 }