001 /******************************************************************************
002 * Copyright (C) MActor Developers. All rights reserved. *
003 * ---------------------------------------------------------------------------*
004 * This file is part of MActor. *
005 * *
006 * MActor is free software; you can redistribute it and/or modify *
007 * it under the terms of the GNU General Public License as published by *
008 * the Free Software Foundation; either version 2 of the License, or *
009 * (at your option) any later version. *
010 * *
011 * MActor is distributed in the hope that it will be useful, *
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014 * GNU General Public License for more details. *
015 * *
016 * You should have received a copy of the GNU General Public License *
017 * along with MActor; if not, write to the Free Software *
018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019 ******************************************************************************/
020 package org.mactor.brokers;
021
022 import java.util.HashMap;
023 import java.util.LinkedList;
024 import java.util.Map;
025
026 import org.apache.log4j.Logger;
027 import org.mactor.framework.MactorException;
028 import org.mactor.framework.extensioninterface.MessageSelectorCommand;
029 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
030
031 /**
032 * Abstract class that contains functinally common to most message broker
033 * implementations (such as maintaining subscribers).
034 *
035 * @author Lars Ivar Almli
036 */
037 public abstract class AbstractMessageBroker implements MessageBroker {
038 protected static Logger log = Logger.getLogger(AbstractMessageBroker.class);
039 protected MessageBrokerConfig config;
040 private Map<String, ChannelListener> channelListenerMap = new HashMap<String, ChannelListener>();
041 private Object channelLock = new Object();
042 public AbstractMessageBroker(MessageBrokerConfig config) {
043 this.config = config;
044 }
045 public void terminate() {
046 // initializedChannels.clear();
047 channelListenerMap.clear();
048 }
049 public void subscribe(String channel, MessageSubscriber subscriber, MessageSelectorCommand messageSelector) throws MactorException {
050 ChannelListener cl = null;
051 synchronized (channelLock) {
052 cl = channelListenerMap.get(channel);
053 if (cl == null) {
054 cl = new ChannelListener(channel);
055 channelListenerMap.put(channel, cl);
056 onFirstSubscribe(channel);
057 }
058 }
059 cl.addSubscriber(subscriber, messageSelector);
060 }
061 public void unsubscribe(String channel, MessageSubscriber subscriber) throws MactorException {
062 ChannelListener cl = null;
063 synchronized (channelLock) {
064 cl = channelListenerMap.get(channel);
065 if (cl != null) {
066 cl.removeSubscriber(subscriber);
067 if (cl.getSubscriberCount() == 0) {
068 channelListenerMap.remove(channel);
069 onLastSubscribe(channel);
070 }
071 }
072 }
073 }
074 /**
075 * This method shold be invoked message broker implementations when message
076 * is received. The method delivers the message to the subscribers of the
077 * channel
078 *
079 * @param channel
080 * the channel the message was recevied
081 * @param message
082 * the message
083 * @param broadcast
084 * a flag that indicates if the message should be distributed to
085 * all subscribers that accepts the message, or just the first
086 * (random) subscriber that accepets the message.
087 * @return a result message that might be used by message broker
088 * implementation to return a synchrounous result to the source of
089 * the incoming messages.
090 * @throws MactorException
091 */
092 protected Message raiseOnMessage(String channel, Message message, boolean broadcast) throws MactorException {
093 ChannelListener cl = null;
094 synchronized (channelLock) {
095 cl = channelListenerMap.get(channel);
096 }
097 if (cl == null)
098 throw new MactorException("Unknown channel '" + channel + "'");
099 Message resultMessage = null;
100 LinkedList<SC> subs = cl.getSnapshotCopyOfSubscribersList();
101 for (SC s : subs) {
102 if (s.messageSelector.isAcceptableMessage(message)) {
103 resultMessage = s.subscriber.onMessage(message);
104 if (!broadcast && message.isConsumed()) {
105 break;
106 }
107 }
108 }
109 return resultMessage;
110 }
111 protected abstract void onFirstSubscribe(String channel) throws MactorException;
112 protected abstract void onLastSubscribe(String channel) throws MactorException;
113 private static class ChannelListener {
114 private HashMap<MessageSubscriber, SC> subscribers = new HashMap<MessageSubscriber, SC>();
115 private Object lock = new Object();
116 private String channel;
117 public ChannelListener(String channel) {
118 this.channel = channel;
119 }
120 public void addSubscriber(MessageSubscriber subscriber, MessageSelectorCommand messageSelector) {
121 synchronized (lock) {
122 subscribers.put(subscriber, new SC(subscriber, messageSelector));
123 }
124 }
125 public boolean removeSubscriber(MessageSubscriber subscriber) {
126 synchronized (lock) {
127 return subscribers.remove(subscriber) != null;
128 }
129 }
130 public int getSubscriberCount() {
131 return subscribers.size();
132 }
133 public LinkedList<SC> getSnapshotCopyOfSubscribersList() {
134 synchronized (lock) {
135 return new LinkedList<SC>(subscribers.values());
136 }
137 }
138 }
139 private static class SC {
140 MessageSubscriber subscriber;
141 MessageSelectorCommand messageSelector;
142 public SC(MessageSubscriber subscriber, MessageSelectorCommand messageSelector) {
143 super();
144 this.subscriber = subscriber;
145 this.messageSelector = messageSelector;
146 }
147 }
148 }