001    /******************************************************************************
002     * Copyright (C) MActor Developers. All rights reserved.                        *
003     * ---------------------------------------------------------------------------*
004     * This file is part of MActor.                                               *
005     *                                                                            *
006     * MActor is free software; you can redistribute it and/or modify             *
007     * it under the terms of the GNU General Public License as published by       *
008     * the Free Software Foundation; either version 2 of the License, or          *
009     * (at your option) any later version.                                        *
010     *                                                                            *
011     * MActor is distributed in the hope that it will be useful,                  *
012     * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
013     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
014     * GNU General Public License for more details.                               *
015     *                                                                            *
016     * You should have received a copy of the GNU General Public License          *
017     * along with MActor; if not, write to the Free Software                      *
018     * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA *
019     ******************************************************************************/
020    package org.mactor.brokers;
021    
022    import java.util.HashMap;
023    import java.util.LinkedList;
024    import java.util.Map;
025    
026    import org.apache.log4j.Logger;
027    import org.mactor.framework.MactorException;
028    import org.mactor.framework.extensioninterface.MessageSelectorCommand;
029    import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
030    
031    /**
032     * Abstract class that contains functinally common to most message broker
033     * implementations (such as maintaining subscribers).
034     * 
035     * @author Lars Ivar Almli
036     */
037    public abstract class AbstractMessageBroker implements MessageBroker {
038            protected static Logger log = Logger.getLogger(AbstractMessageBroker.class);
039            protected MessageBrokerConfig config;
040            private Map<String, ChannelListener> channelListenerMap = new HashMap<String, ChannelListener>();
041            private Object channelLock = new Object();
042            public AbstractMessageBroker(MessageBrokerConfig config) {
043                    this.config = config;
044            }
045            public void terminate() {
046                    // initializedChannels.clear();
047                    channelListenerMap.clear();
048            }
049            public void subscribe(String channel, MessageSubscriber subscriber, MessageSelectorCommand messageSelector) throws MactorException {
050                    ChannelListener cl = null;
051                    synchronized (channelLock) {
052                            cl = channelListenerMap.get(channel);
053                            if (cl == null) {
054                                    cl = new ChannelListener(channel);
055                                    channelListenerMap.put(channel, cl);
056                                    onFirstSubscribe(channel);
057                            }
058                    }
059                    cl.addSubscriber(subscriber, messageSelector);
060            }
061            public void unsubscribe(String channel, MessageSubscriber subscriber) throws MactorException {
062                    ChannelListener cl = null;
063                    synchronized (channelLock) {
064                            cl = channelListenerMap.get(channel);
065                            if (cl != null) {
066                                    cl.removeSubscriber(subscriber);
067                                    if (cl.getSubscriberCount() == 0) {
068                                            channelListenerMap.remove(channel);
069                                            onLastSubscribe(channel);
070                                    }
071                            }
072                    }
073            }
074            /**
075             * This method shold be invoked message broker implementations when message
076             * is received. The method delivers the message to the subscribers of the
077             * channel
078             * 
079             * @param channel
080             *            the channel the message was recevied
081             * @param message
082             *            the message
083             * @param broadcast
084             *            a flag that indicates if the message should be distributed to
085             *            all subscribers that accepts the message, or just the first
086             *            (random) subscriber that accepets the message.
087             * @return a result message that might be used by message broker
088             *         implementation to return a synchrounous result to the source of
089             *         the incoming messages.
090             * @throws MactorException
091             */
092            protected Message raiseOnMessage(String channel, Message message, boolean broadcast) throws MactorException {
093                    ChannelListener cl = null;
094                    synchronized (channelLock) {
095                            cl = channelListenerMap.get(channel);
096                    }
097                    if (cl == null)
098                            throw new MactorException("Unknown channel '" + channel + "'");
099                    Message resultMessage = null;
100                    LinkedList<SC> subs = cl.getSnapshotCopyOfSubscribersList();
101                    for (SC s : subs) {
102                            if (s.messageSelector.isAcceptableMessage(message)) {
103                                    resultMessage = s.subscriber.onMessage(message);
104                                    if (!broadcast && message.isConsumed()) {
105                                            break;
106                                    }
107                            }
108                    }
109                    return resultMessage;
110            }
111            protected abstract void onFirstSubscribe(String channel) throws MactorException;
112            protected abstract void onLastSubscribe(String channel) throws MactorException;
113            private static class ChannelListener {
114                    private HashMap<MessageSubscriber, SC> subscribers = new HashMap<MessageSubscriber, SC>();
115                    private Object lock = new Object();
116                    private String channel;
117                    public ChannelListener(String channel) {
118                            this.channel = channel;
119                    }
120                    public void addSubscriber(MessageSubscriber subscriber, MessageSelectorCommand messageSelector) {
121                            synchronized (lock) {
122                                    subscribers.put(subscriber, new SC(subscriber, messageSelector));
123                            }
124                    }
125                    public boolean removeSubscriber(MessageSubscriber subscriber) {
126                            synchronized (lock) {
127                                    return subscribers.remove(subscriber) != null;
128                            }
129                    }
130                    public int getSubscriberCount() {
131                            return subscribers.size();
132                    }
133                    public LinkedList<SC> getSnapshotCopyOfSubscribersList() {
134                            synchronized (lock) {
135                                    return new LinkedList<SC>(subscribers.values());
136                            }
137                    }
138            }
139            private static class SC {
140                    MessageSubscriber subscriber;
141                    MessageSelectorCommand messageSelector;
142                    public SC(MessageSubscriber subscriber, MessageSelectorCommand messageSelector) {
143                            super();
144                            this.subscriber = subscriber;
145                            this.messageSelector = messageSelector;
146                    }
147            }
148    }