001 /****************************************************************************** 002 * Copyright (C) MActor Developers. All rights reserved. * 003 * ---------------------------------------------------------------------------* 004 * This file is part of MActor. * 005 * * 006 * MActor is free software; you can redistribute it and/or modify * 007 * it under the terms of the GNU General Public License as published by * 008 * the Free Software Foundation; either version 2 of the License, or * 009 * (at your option) any later version. * 010 * * 011 * MActor is distributed in the hope that it will be useful, * 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 014 * GNU General Public License for more details. * 015 * * 016 * You should have received a copy of the GNU General Public License * 017 * along with MActor; if not, write to the Free Software * 018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * 019 ******************************************************************************/ 020 package org.mactor.brokers; 021 022 import java.io.File; 023 import java.io.FileFilter; 024 import java.lang.reflect.Constructor; 025 import java.lang.reflect.InvocationTargetException; 026 import java.util.ArrayList; 027 import java.util.Calendar; 028 import java.util.Collections; 029 import java.util.HashMap; 030 import java.util.LinkedList; 031 import java.util.Map; 032 033 import org.apache.log4j.Logger; 034 import org.mactor.framework.MactorException; 035 import org.mactor.framework.extensioninterface.MessageSelectorCommand; 036 import org.mactor.framework.spec.MessageBrokersConfig; 037 import org.mactor.framework.spec.ProjectContext; 038 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig; 039 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig; 040 041 /** 042 * Singleton that loads and maintains all message brokers. And works as a facade 043 * to the message brokers (hiding the broker-channel association). 044 * 045 * @author Lars Ivar Almli 046 */ 047 public class MessageBrokerManager { 048 protected static Logger log = Logger.getLogger(MessageBrokerManager.class); 049 private static MessageBrokerManager manager; 050 private boolean active; 051 private Map<String, MessageBrokerHolder> channelToBrokerMap = new HashMap<String, MessageBrokerHolder>(); 052 public static synchronized MessageBrokerManager getInstance() throws MactorException { 053 if (manager != null) 054 return manager; 055 manager = new MessageBrokerManager(ProjectContext.getGlobalInstance().loadMessageBrokersConfig()); 056 return manager; 057 } 058 public static boolean isActive() { 059 return manager != null; 060 } 061 public void terminate() { 062 manager = null; 063 for (MessageBrokerHolder mh : channelToBrokerMap.values()) { 064 try { 065 mh.broker.terminate(); 066 } catch (Exception e) { 067 log.warn("Failed to terminate message broker '" + mh.config.getName() + "'", e); 068 } 069 } 070 channelToBrokerMap.clear(); 071 } 072 private MessageBrokerManager(MessageBrokersConfig config) throws MactorException { 073 if (config == null) 074 throw new MactorException("A Message Broker Configuration must be provided (select one in Project Settings...)"); 075 for (MessageBrokerConfig mbc : config.getMessageBrokers()) { 076 MessageBroker broker = createMessageBroker(mbc); 077 boolean archiveMessages = mbc.isArchiveConsumedMessages() && mbc.getArchivePath() != null && mbc.getArchivePath().length() > 0; 078 addBroker(mbc, new MessageBrokerDecorator(archiveMessages, mbc.getArchivePath(), broker)); 079 } 080 } 081 private synchronized void addBroker(MessageBrokerConfig mbc, MessageBrokerDecorator broker) throws MactorException { 082 for (ChannelConfig cc : mbc.getChannels().values()) { 083 if (channelToBrokerMap.containsKey(cc.getName())) 084 throw new MactorException("Duplicate channel definition. Channel '" + cc.getName() + "' is defined in both '" + mbc.getName() + "' and '" 085 + channelToBrokerMap.get(cc.getName()).config.getName() + "'"); 086 channelToBrokerMap.put(cc.getName(), new MessageBrokerHolder(broker, mbc)); 087 } 088 } 089 private MessageBrokerHolder getBrokerHolder(String channel) throws MactorException { 090 MessageBrokerHolder brokerHolder = channelToBrokerMap.get(channel); 091 if (brokerHolder == null) 092 throw new MactorException("No MessageBroker is registered for channel '" + channel + "'"); 093 return brokerHolder; 094 } 095 private MessageBrokerDecorator getBroker(String channel) throws MactorException { 096 return getBrokerHolder(channel).broker; 097 } 098 public void addMessageInfoListener(MessageInfoListener listener) throws MactorException { 099 for (String channel : channelToBrokerMap.keySet()) 100 getBroker(channel).addMessageInfoListener(channel, listener); 101 } 102 public void addMessageInfoListener(String channel, MessageInfoListener listener) throws MactorException { 103 getBroker(channel).addMessageInfoListener(channel, listener); 104 } 105 public void removeMessageInfoListener(String channel, MessageInfoListener listener) throws MactorException { 106 getBroker(channel).removeMessageInfoListener(channel, listener); 107 } 108 public Message publish(String channel, Message message) throws MactorException { 109 MessageBrokerHolder h = getBrokerHolder(channel); 110 ChannelConfig cc = h.config.getChannelConfig(channel); 111 if (cc.isRequiresResponse()) 112 return getBroker(channel).publishWithResponse(channel, message); 113 getBroker(channel).publish(channel, message); 114 return null; 115 } 116 public void subscribe(String channel, MessageSubscriber subscriber, MessageSelectorCommand messageSelector) throws MactorException { 117 getBroker(channel).subscribe(channel, subscriber, messageSelector); 118 } 119 public void unsubscribe(String channel, MessageSubscriber subscriber) throws MactorException { 120 getBroker(channel).unsubscribe(channel, subscriber); 121 } 122 public ArrayList<MessageInfo> loadMessageInfoFromArchive(String channel) throws MactorException { 123 MessageBrokerDecorator mb = getBroker(channel); 124 String path = mb.getArchivePath(); 125 if (path == null) { 126 throw new MactorException("No archive path is specifed for the message broker that contains the channel '" + channel + "'"); 127 } 128 return MessageInfo.loadFromArchiveDir(new File(path + "/" + channel)); 129 } 130 public void clearArchive(String channel) throws MactorException { 131 MessageBrokerDecorator mb = getBroker(channel); 132 String path = mb.getArchivePath(); 133 if (path == null) { 134 throw new MactorException("No archive path is specifed for the message broker that contains the channel '" + channel + "'"); 135 } 136 MessageInfo.clearArchiveDir(new File(path + "/" + channel)); 137 } 138 private class MessageBrokerHolder { 139 MessageBrokerDecorator broker; 140 MessageBrokerConfig config; 141 public MessageBrokerHolder(MessageBrokerDecorator broker, MessageBrokerConfig config) { 142 this.broker = broker; 143 this.config = config; 144 } 145 } 146 private static MessageBroker createMessageBroker(MessageBrokerConfig config) throws MactorException { 147 String className = config.getBrokerClass(); 148 try { 149 Class c = Class.forName(className); 150 Constructor ct = c.getConstructor(new Class[] { MessageBrokerConfig.class }); 151 Object mb = ct.newInstance(new Object[] { config }); 152 if (!(mb instanceof MessageBroker)) 153 throw new MactorException("The specified message broker class does not implement '" + MessageBroker.class.getName() + "'"); 154 return (MessageBroker) mb; 155 } catch (ClassNotFoundException cnf) { 156 throw new MactorException("The specified MessageBroker implementation was not found in the classpath. Class '" + className + "'", cnf); 157 } catch (NoSuchMethodException nm) { 158 throw new MactorException("The specified MessageBroker implementation class '" + className + "' does not have the requiered contructor (that takes a single argument of type '" 159 + MessageBrokerConfig.class.getName() + "')", nm); 160 } catch (IllegalAccessException iae) { 161 throw new MactorException("Unable to instantiate the MessageBroker '" + className + "'. Illegal access", iae); 162 } catch (InvocationTargetException it) { 163 throw new MactorException("Unable to instantiate the MessageBroker '" + className + "'. InvocationTargetException", it); 164 } catch (InstantiationException ie) { 165 throw new MactorException("Unable to instantiate the MessageBroker '" + className + "'. InstantiationException", ie); 166 } 167 } 168 public static interface MessageInfoListener { 169 void onMessageInfo(MessageInfo messageInfo); 170 } 171 private static class AF implements FileFilter { 172 public boolean accept(File pathname) { 173 String n = pathname.getName(); 174 return pathname.isFile() && n.endsWith(".xml") && (n.startsWith("out_") || n.startsWith("in_")); 175 } 176 } 177 public static class MessageInfo implements Comparable<MessageInfo> { 178 private String archivePath; 179 private String channel; 180 private Calendar createdTime; 181 private boolean incoming; 182 private boolean response; 183 public int compareTo(org.mactor.brokers.MessageBrokerManager.MessageInfo other) { 184 return createdTime.compareTo(other.createdTime); 185 } 186 public static MessageInfo create(MessageContextInfo mci) { 187 MessageInfo mi = new MessageInfo(); 188 mi.archivePath = mci.getArchivePath(); 189 mi.channel = mci.getChannel(); 190 mi.createdTime = mci.getCreatedTime(); 191 mi.incoming = mci.isIncoming(); 192 mi.response = mci.isResponseMessage(); 193 return mi; 194 } 195 public static ArrayList<MessageInfo> loadFromArchiveDir(File channelArchiveDir) { 196 if (channelArchiveDir == null || !channelArchiveDir.exists()) 197 return new ArrayList<MessageInfo>(); 198 LinkedList<MessageInfo> mis = new LinkedList<MessageInfo>(); 199 File[] files = channelArchiveDir.listFiles(new AF()); 200 if (files != null) { 201 for (File f : files) { 202 MessageInfo m = MessageInfo.createFromFileInfo(f); 203 if (m != null) 204 mis.add(m); 205 } 206 } 207 Collections.sort(mis); 208 return new ArrayList<MessageInfo>(mis); 209 } 210 public static void clearArchiveDir(File channelArchiveDir) { 211 if (channelArchiveDir == null || !channelArchiveDir.exists()) 212 return; 213 File[] files = channelArchiveDir.listFiles(new AF()); 214 if (files != null) { 215 for (File f : files) { 216 f.delete(); 217 } 218 } 219 } 220 public static MessageInfo createFromFileInfo(File f) { 221 if (!f.exists()) 222 return null; 223 MessageInfo mi = new MessageInfo(); 224 mi.createdTime = Calendar.getInstance(); 225 mi.createdTime.setTimeInMillis(f.lastModified()); 226 String fn = f.getName(); 227 mi.response = fn.endsWith("_resp.xml"); 228 mi.incoming = fn.startsWith("in_"); 229 mi.channel = f.getParent(); 230 mi.archivePath = f.getAbsolutePath(); 231 return mi; 232 } 233 public String getArchivePath() { 234 return archivePath; 235 } 236 public String getChannel() { 237 return channel; 238 } 239 public Calendar getCreatedTime() { 240 return createdTime; 241 } 242 public boolean isIncoming() { 243 return incoming; 244 } 245 public boolean isResponse() { 246 return response; 247 } 248 } 249 }