001    /******************************************************************************
002     * Copyright (C) MActor Developers. All rights reserved.                        *
003     * ---------------------------------------------------------------------------*
004     * This file is part of MActor.                                               *
005     *                                                                            *
006     * MActor is free software; you can redistribute it and/or modify             *
007     * it under the terms of the GNU General Public License as published by       *
008     * the Free Software Foundation; either version 2 of the License, or          *
009     * (at your option) any later version.                                        *
010     *                                                                            *
011     * MActor is distributed in the hope that it will be useful,                  *
012     * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
013     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
014     * GNU General Public License for more details.                               *
015     *                                                                            *
016     * You should have received a copy of the GNU General Public License          *
017     * along with MActor; if not, write to the Free Software                      *
018     * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA *
019     ******************************************************************************/
020    package org.mactor.brokers;
021    
022    import java.io.File;
023    import java.io.FileFilter;
024    import java.lang.reflect.Constructor;
025    import java.lang.reflect.InvocationTargetException;
026    import java.util.ArrayList;
027    import java.util.Calendar;
028    import java.util.Collections;
029    import java.util.HashMap;
030    import java.util.LinkedList;
031    import java.util.Map;
032    
033    import org.apache.log4j.Logger;
034    import org.mactor.framework.MactorException;
035    import org.mactor.framework.extensioninterface.MessageSelectorCommand;
036    import org.mactor.framework.spec.MessageBrokersConfig;
037    import org.mactor.framework.spec.ProjectContext;
038    import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
039    import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
040    
041    /**
042     * Singleton that loads and maintains all message brokers. And works as a facade
043     * to the message brokers (hiding the broker-channel association).
044     * 
045     * @author Lars Ivar Almli
046     */
047    public class MessageBrokerManager {
048            protected static Logger log = Logger.getLogger(MessageBrokerManager.class);
049            private static MessageBrokerManager manager;
050            private boolean active;
051            private Map<String, MessageBrokerHolder> channelToBrokerMap = new HashMap<String, MessageBrokerHolder>();
052            public static synchronized MessageBrokerManager getInstance() throws MactorException {
053                    if (manager != null)
054                            return manager;
055                    manager = new MessageBrokerManager(ProjectContext.getGlobalInstance().loadMessageBrokersConfig());
056                    return manager;
057            }
058            public static boolean isActive() {
059                    return manager != null;
060            }
061            public void terminate() {
062                    manager = null;
063                    for (MessageBrokerHolder mh : channelToBrokerMap.values()) {
064                            try {
065                                    mh.broker.terminate();
066                            } catch (Exception e) {
067                                    log.warn("Failed to terminate message broker '" + mh.config.getName() + "'", e);
068                            }
069                    }
070                    channelToBrokerMap.clear();
071            }
072            private MessageBrokerManager(MessageBrokersConfig config) throws MactorException {
073                    if (config == null)
074                            throw new MactorException("A Message Broker Configuration must be provided (select one in Project Settings...)");
075                    for (MessageBrokerConfig mbc : config.getMessageBrokers()) {
076                            MessageBroker broker = createMessageBroker(mbc);
077                            boolean archiveMessages = mbc.isArchiveConsumedMessages() && mbc.getArchivePath() != null && mbc.getArchivePath().length() > 0;
078                            addBroker(mbc, new MessageBrokerDecorator(archiveMessages, mbc.getArchivePath(), broker));
079                    }
080            }
081            private synchronized void addBroker(MessageBrokerConfig mbc, MessageBrokerDecorator broker) throws MactorException {
082                    for (ChannelConfig cc : mbc.getChannels().values()) {
083                            if (channelToBrokerMap.containsKey(cc.getName()))
084                                    throw new MactorException("Duplicate channel definition. Channel '" + cc.getName() + "' is defined in both '" + mbc.getName() + "' and '"
085                                                    + channelToBrokerMap.get(cc.getName()).config.getName() + "'");
086                            channelToBrokerMap.put(cc.getName(), new MessageBrokerHolder(broker, mbc));
087                    }
088            }
089            private MessageBrokerHolder getBrokerHolder(String channel) throws MactorException {
090                    MessageBrokerHolder brokerHolder = channelToBrokerMap.get(channel);
091                    if (brokerHolder == null)
092                            throw new MactorException("No MessageBroker is registered for channel '" + channel + "'");
093                    return brokerHolder;
094            }
095            private MessageBrokerDecorator getBroker(String channel) throws MactorException {
096                    return getBrokerHolder(channel).broker;
097            }
098            public void addMessageInfoListener(MessageInfoListener listener) throws MactorException {
099                    for (String channel : channelToBrokerMap.keySet())
100                            getBroker(channel).addMessageInfoListener(channel, listener);
101            }
102            public void addMessageInfoListener(String channel, MessageInfoListener listener) throws MactorException {
103                    getBroker(channel).addMessageInfoListener(channel, listener);
104            }
105            public void removeMessageInfoListener(String channel, MessageInfoListener listener) throws MactorException {
106                    getBroker(channel).removeMessageInfoListener(channel, listener);
107            }
108            public Message publish(String channel, Message message) throws MactorException {
109                    MessageBrokerHolder h = getBrokerHolder(channel);
110                    ChannelConfig cc = h.config.getChannelConfig(channel);
111                    if (cc.isRequiresResponse())
112                            return getBroker(channel).publishWithResponse(channel, message);
113                    getBroker(channel).publish(channel, message);
114                    return null;
115            }
116            public void subscribe(String channel, MessageSubscriber subscriber, MessageSelectorCommand messageSelector) throws MactorException {
117                    getBroker(channel).subscribe(channel, subscriber, messageSelector);
118            }
119            public void unsubscribe(String channel, MessageSubscriber subscriber) throws MactorException {
120                    getBroker(channel).unsubscribe(channel, subscriber);
121            }
122            public ArrayList<MessageInfo> loadMessageInfoFromArchive(String channel) throws MactorException {
123                    MessageBrokerDecorator mb = getBroker(channel);
124                    String path = mb.getArchivePath();
125                    if (path == null) {
126                            throw new MactorException("No archive path is specifed for the message broker that contains the channel '" + channel + "'");
127                    }
128                    return MessageInfo.loadFromArchiveDir(new File(path + "/" + channel));
129            }
130            public void clearArchive(String channel) throws MactorException {
131                    MessageBrokerDecorator mb = getBroker(channel);
132                    String path = mb.getArchivePath();
133                    if (path == null) {
134                            throw new MactorException("No archive path is specifed for the message broker that contains the channel '" + channel + "'");
135                    }
136                    MessageInfo.clearArchiveDir(new File(path + "/" + channel));
137            }
138            private class MessageBrokerHolder {
139                    MessageBrokerDecorator broker;
140                    MessageBrokerConfig config;
141                    public MessageBrokerHolder(MessageBrokerDecorator broker, MessageBrokerConfig config) {
142                            this.broker = broker;
143                            this.config = config;
144                    }
145            }
146            private static MessageBroker createMessageBroker(MessageBrokerConfig config) throws MactorException {
147                    String className = config.getBrokerClass();
148                    try {
149                            Class c = Class.forName(className);
150                            Constructor ct = c.getConstructor(new Class[] { MessageBrokerConfig.class });
151                            Object mb = ct.newInstance(new Object[] { config });
152                            if (!(mb instanceof MessageBroker))
153                                    throw new MactorException("The specified message broker class does not implement '" + MessageBroker.class.getName() + "'");
154                            return (MessageBroker) mb;
155                    } catch (ClassNotFoundException cnf) {
156                            throw new MactorException("The specified MessageBroker implementation was not found in the classpath. Class '" + className + "'", cnf);
157                    } catch (NoSuchMethodException nm) {
158                            throw new MactorException("The specified MessageBroker implementation class '" + className + "' does not have the requiered contructor (that takes a single argument of type '"
159                                            + MessageBrokerConfig.class.getName() + "')", nm);
160                    } catch (IllegalAccessException iae) {
161                            throw new MactorException("Unable to instantiate the MessageBroker '" + className + "'. Illegal access", iae);
162                    } catch (InvocationTargetException it) {
163                            throw new MactorException("Unable to instantiate the MessageBroker '" + className + "'. InvocationTargetException", it);
164                    } catch (InstantiationException ie) {
165                            throw new MactorException("Unable to instantiate the MessageBroker '" + className + "'. InstantiationException", ie);
166                    }
167            }
168            public static interface MessageInfoListener {
169                    void onMessageInfo(MessageInfo messageInfo);
170            }
171            private static class AF implements FileFilter {
172                    public boolean accept(File pathname) {
173                            String n = pathname.getName();
174                            return pathname.isFile() && n.endsWith(".xml") && (n.startsWith("out_") || n.startsWith("in_"));
175                    }
176            }
177            public static class MessageInfo implements Comparable<MessageInfo> {
178                    private String archivePath;
179                    private String channel;
180                    private Calendar createdTime;
181                    private boolean incoming;
182                    private boolean response;
183                    public int compareTo(org.mactor.brokers.MessageBrokerManager.MessageInfo other) {
184                            return createdTime.compareTo(other.createdTime);
185                    }
186                    public static MessageInfo create(MessageContextInfo mci) {
187                            MessageInfo mi = new MessageInfo();
188                            mi.archivePath = mci.getArchivePath();
189                            mi.channel = mci.getChannel();
190                            mi.createdTime = mci.getCreatedTime();
191                            mi.incoming = mci.isIncoming();
192                            mi.response = mci.isResponseMessage();
193                            return mi;
194                    }
195                    public static ArrayList<MessageInfo> loadFromArchiveDir(File channelArchiveDir) {
196                            if (channelArchiveDir == null || !channelArchiveDir.exists())
197                                    return new ArrayList<MessageInfo>();
198                            LinkedList<MessageInfo> mis = new LinkedList<MessageInfo>();
199                            File[] files = channelArchiveDir.listFiles(new AF());
200                            if (files != null) {
201                                    for (File f : files) {
202                                            MessageInfo m = MessageInfo.createFromFileInfo(f);
203                                            if (m != null)
204                                                    mis.add(m);
205                                    }
206                            }
207                            Collections.sort(mis);
208                            return new ArrayList<MessageInfo>(mis);
209                    }
210                    public static void clearArchiveDir(File channelArchiveDir) {
211                            if (channelArchiveDir == null || !channelArchiveDir.exists())
212                                    return;
213                            File[] files = channelArchiveDir.listFiles(new AF());
214                            if (files != null) {
215                                    for (File f : files) {
216                                            f.delete();
217                                    }
218                            }
219                    }
220                    public static MessageInfo createFromFileInfo(File f) {
221                            if (!f.exists())
222                                    return null;
223                            MessageInfo mi = new MessageInfo();
224                            mi.createdTime = Calendar.getInstance();
225                            mi.createdTime.setTimeInMillis(f.lastModified());
226                            String fn = f.getName();
227                            mi.response = fn.endsWith("_resp.xml");
228                            mi.incoming = fn.startsWith("in_");
229                            mi.channel = f.getParent();
230                            mi.archivePath = f.getAbsolutePath();
231                            return mi;
232                    }
233                    public String getArchivePath() {
234                            return archivePath;
235                    }
236                    public String getChannel() {
237                            return channel;
238                    }
239                    public Calendar getCreatedTime() {
240                            return createdTime;
241                    }
242                    public boolean isIncoming() {
243                            return incoming;
244                    }
245                    public boolean isResponse() {
246                            return response;
247                    }
248            }
249    }