001 /******************************************************************************
002 * Copyright (C) MActor Developers. All rights reserved. *
003 * ---------------------------------------------------------------------------*
004 * This file is part of MActor. *
005 * *
006 * MActor is free software; you can redistribute it and/or modify *
007 * it under the terms of the GNU General Public License as published by *
008 * the Free Software Foundation; either version 2 of the License, or *
009 * (at your option) any later version. *
010 * *
011 * MActor is distributed in the hope that it will be useful, *
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014 * GNU General Public License for more details. *
015 * *
016 * You should have received a copy of the GNU General Public License *
017 * along with MActor; if not, write to the Free Software *
018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019 ******************************************************************************/
020 package org.mactor.brokers;
021
022 import java.io.File;
023 import java.io.FileFilter;
024 import java.lang.reflect.Constructor;
025 import java.lang.reflect.InvocationTargetException;
026 import java.util.ArrayList;
027 import java.util.Calendar;
028 import java.util.Collections;
029 import java.util.HashMap;
030 import java.util.LinkedList;
031 import java.util.Map;
032
033 import org.apache.log4j.Logger;
034 import org.mactor.framework.MactorException;
035 import org.mactor.framework.extensioninterface.MessageSelectorCommand;
036 import org.mactor.framework.spec.MessageBrokersConfig;
037 import org.mactor.framework.spec.ProjectContext;
038 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
039 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
040
041 /**
042 * Singleton that loads and maintains all message brokers. And works as a facade
043 * to the message brokers (hiding the broker-channel association).
044 *
045 * @author Lars Ivar Almli
046 */
047 public class MessageBrokerManager {
048 protected static Logger log = Logger.getLogger(MessageBrokerManager.class);
049 private static MessageBrokerManager manager;
050 private boolean active;
051 private Map<String, MessageBrokerHolder> channelToBrokerMap = new HashMap<String, MessageBrokerHolder>();
052 public static synchronized MessageBrokerManager getInstance() throws MactorException {
053 if (manager != null)
054 return manager;
055 manager = new MessageBrokerManager(ProjectContext.getGlobalInstance().loadMessageBrokersConfig());
056 return manager;
057 }
058 public static boolean isActive() {
059 return manager != null;
060 }
061 public void terminate() {
062 manager = null;
063 for (MessageBrokerHolder mh : channelToBrokerMap.values()) {
064 try {
065 mh.broker.terminate();
066 } catch (Exception e) {
067 log.warn("Failed to terminate message broker '" + mh.config.getName() + "'", e);
068 }
069 }
070 channelToBrokerMap.clear();
071 }
072 private MessageBrokerManager(MessageBrokersConfig config) throws MactorException {
073 if (config == null)
074 throw new MactorException("A Message Broker Configuration must be provided (select one in Project Settings...)");
075 for (MessageBrokerConfig mbc : config.getMessageBrokers()) {
076 MessageBroker broker = createMessageBroker(mbc);
077 boolean archiveMessages = mbc.isArchiveConsumedMessages() && mbc.getArchivePath() != null && mbc.getArchivePath().length() > 0;
078 addBroker(mbc, new MessageBrokerDecorator(archiveMessages, mbc.getArchivePath(), broker));
079 }
080 }
081 private synchronized void addBroker(MessageBrokerConfig mbc, MessageBrokerDecorator broker) throws MactorException {
082 for (ChannelConfig cc : mbc.getChannels().values()) {
083 if (channelToBrokerMap.containsKey(cc.getName()))
084 throw new MactorException("Duplicate channel definition. Channel '" + cc.getName() + "' is defined in both '" + mbc.getName() + "' and '"
085 + channelToBrokerMap.get(cc.getName()).config.getName() + "'");
086 channelToBrokerMap.put(cc.getName(), new MessageBrokerHolder(broker, mbc));
087 }
088 }
089 private MessageBrokerHolder getBrokerHolder(String channel) throws MactorException {
090 MessageBrokerHolder brokerHolder = channelToBrokerMap.get(channel);
091 if (brokerHolder == null)
092 throw new MactorException("No MessageBroker is registered for channel '" + channel + "'");
093 return brokerHolder;
094 }
095 private MessageBrokerDecorator getBroker(String channel) throws MactorException {
096 return getBrokerHolder(channel).broker;
097 }
098 public void addMessageInfoListener(MessageInfoListener listener) throws MactorException {
099 for (String channel : channelToBrokerMap.keySet())
100 getBroker(channel).addMessageInfoListener(channel, listener);
101 }
102 public void addMessageInfoListener(String channel, MessageInfoListener listener) throws MactorException {
103 getBroker(channel).addMessageInfoListener(channel, listener);
104 }
105 public void removeMessageInfoListener(String channel, MessageInfoListener listener) throws MactorException {
106 getBroker(channel).removeMessageInfoListener(channel, listener);
107 }
108 public Message publish(String channel, Message message) throws MactorException {
109 MessageBrokerHolder h = getBrokerHolder(channel);
110 ChannelConfig cc = h.config.getChannelConfig(channel);
111 if (cc.isRequiresResponse())
112 return getBroker(channel).publishWithResponse(channel, message);
113 getBroker(channel).publish(channel, message);
114 return null;
115 }
116 public void subscribe(String channel, MessageSubscriber subscriber, MessageSelectorCommand messageSelector) throws MactorException {
117 getBroker(channel).subscribe(channel, subscriber, messageSelector);
118 }
119 public void unsubscribe(String channel, MessageSubscriber subscriber) throws MactorException {
120 getBroker(channel).unsubscribe(channel, subscriber);
121 }
122 public ArrayList<MessageInfo> loadMessageInfoFromArchive(String channel) throws MactorException {
123 MessageBrokerDecorator mb = getBroker(channel);
124 String path = mb.getArchivePath();
125 if (path == null) {
126 throw new MactorException("No archive path is specifed for the message broker that contains the channel '" + channel + "'");
127 }
128 return MessageInfo.loadFromArchiveDir(new File(path + "/" + channel));
129 }
130 public void clearArchive(String channel) throws MactorException {
131 MessageBrokerDecorator mb = getBroker(channel);
132 String path = mb.getArchivePath();
133 if (path == null) {
134 throw new MactorException("No archive path is specifed for the message broker that contains the channel '" + channel + "'");
135 }
136 MessageInfo.clearArchiveDir(new File(path + "/" + channel));
137 }
138 private class MessageBrokerHolder {
139 MessageBrokerDecorator broker;
140 MessageBrokerConfig config;
141 public MessageBrokerHolder(MessageBrokerDecorator broker, MessageBrokerConfig config) {
142 this.broker = broker;
143 this.config = config;
144 }
145 }
146 private static MessageBroker createMessageBroker(MessageBrokerConfig config) throws MactorException {
147 String className = config.getBrokerClass();
148 try {
149 Class c = Class.forName(className);
150 Constructor ct = c.getConstructor(new Class[] { MessageBrokerConfig.class });
151 Object mb = ct.newInstance(new Object[] { config });
152 if (!(mb instanceof MessageBroker))
153 throw new MactorException("The specified message broker class does not implement '" + MessageBroker.class.getName() + "'");
154 return (MessageBroker) mb;
155 } catch (ClassNotFoundException cnf) {
156 throw new MactorException("The specified MessageBroker implementation was not found in the classpath. Class '" + className + "'", cnf);
157 } catch (NoSuchMethodException nm) {
158 throw new MactorException("The specified MessageBroker implementation class '" + className + "' does not have the requiered contructor (that takes a single argument of type '"
159 + MessageBrokerConfig.class.getName() + "')", nm);
160 } catch (IllegalAccessException iae) {
161 throw new MactorException("Unable to instantiate the MessageBroker '" + className + "'. Illegal access", iae);
162 } catch (InvocationTargetException it) {
163 throw new MactorException("Unable to instantiate the MessageBroker '" + className + "'. InvocationTargetException", it);
164 } catch (InstantiationException ie) {
165 throw new MactorException("Unable to instantiate the MessageBroker '" + className + "'. InstantiationException", ie);
166 }
167 }
168 public static interface MessageInfoListener {
169 void onMessageInfo(MessageInfo messageInfo);
170 }
171 private static class AF implements FileFilter {
172 public boolean accept(File pathname) {
173 String n = pathname.getName();
174 return pathname.isFile() && n.endsWith(".xml") && (n.startsWith("out_") || n.startsWith("in_"));
175 }
176 }
177 public static class MessageInfo implements Comparable<MessageInfo> {
178 private String archivePath;
179 private String channel;
180 private Calendar createdTime;
181 private boolean incoming;
182 private boolean response;
183 public int compareTo(org.mactor.brokers.MessageBrokerManager.MessageInfo other) {
184 return createdTime.compareTo(other.createdTime);
185 }
186 public static MessageInfo create(MessageContextInfo mci) {
187 MessageInfo mi = new MessageInfo();
188 mi.archivePath = mci.getArchivePath();
189 mi.channel = mci.getChannel();
190 mi.createdTime = mci.getCreatedTime();
191 mi.incoming = mci.isIncoming();
192 mi.response = mci.isResponseMessage();
193 return mi;
194 }
195 public static ArrayList<MessageInfo> loadFromArchiveDir(File channelArchiveDir) {
196 if (channelArchiveDir == null || !channelArchiveDir.exists())
197 return new ArrayList<MessageInfo>();
198 LinkedList<MessageInfo> mis = new LinkedList<MessageInfo>();
199 File[] files = channelArchiveDir.listFiles(new AF());
200 if (files != null) {
201 for (File f : files) {
202 MessageInfo m = MessageInfo.createFromFileInfo(f);
203 if (m != null)
204 mis.add(m);
205 }
206 }
207 Collections.sort(mis);
208 return new ArrayList<MessageInfo>(mis);
209 }
210 public static void clearArchiveDir(File channelArchiveDir) {
211 if (channelArchiveDir == null || !channelArchiveDir.exists())
212 return;
213 File[] files = channelArchiveDir.listFiles(new AF());
214 if (files != null) {
215 for (File f : files) {
216 f.delete();
217 }
218 }
219 }
220 public static MessageInfo createFromFileInfo(File f) {
221 if (!f.exists())
222 return null;
223 MessageInfo mi = new MessageInfo();
224 mi.createdTime = Calendar.getInstance();
225 mi.createdTime.setTimeInMillis(f.lastModified());
226 String fn = f.getName();
227 mi.response = fn.endsWith("_resp.xml");
228 mi.incoming = fn.startsWith("in_");
229 mi.channel = f.getParent();
230 mi.archivePath = f.getAbsolutePath();
231 return mi;
232 }
233 public String getArchivePath() {
234 return archivePath;
235 }
236 public String getChannel() {
237 return channel;
238 }
239 public Calendar getCreatedTime() {
240 return createdTime;
241 }
242 public boolean isIncoming() {
243 return incoming;
244 }
245 public boolean isResponse() {
246 return response;
247 }
248 }
249 }