001 /****************************************************************************** 002 * Copyright (C) MActor Developers. All rights reserved. * 003 * ---------------------------------------------------------------------------* 004 * This file is part of MActor. * 005 * * 006 * MActor is free software; you can redistribute it and/or modify * 007 * it under the terms of the GNU General Public License as published by * 008 * the Free Software Foundation; either version 2 of the License, or * 009 * (at your option) any later version. * 010 * * 011 * MActor is distributed in the hope that it will be useful, * 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 014 * GNU General Public License for more details. * 015 * * 016 * You should have received a copy of the GNU General Public License * 017 * along with MActor; if not, write to the Free Software * 018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * 019 ******************************************************************************/ 020 package org.mactor.brokers; 021 022 import java.util.HashMap; 023 import java.util.LinkedList; 024 import java.util.Map; 025 026 import org.apache.log4j.Logger; 027 import org.mactor.framework.MactorException; 028 import org.mactor.framework.extensioninterface.MessageSelectorCommand; 029 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig; 030 031 /** 032 * Abstract class that contains functinally common to most message broker 033 * implementations (such as maintaining subscribers). 034 * 035 * @author Lars Ivar Almli 036 */ 037 public abstract class AbstractMessageBroker implements MessageBroker { 038 protected static Logger log = Logger.getLogger(AbstractMessageBroker.class); 039 protected MessageBrokerConfig config; 040 private Map<String, ChannelListener> channelListenerMap = new HashMap<String, ChannelListener>(); 041 private Object channelLock = new Object(); 042 public AbstractMessageBroker(MessageBrokerConfig config) { 043 this.config = config; 044 } 045 public void terminate() { 046 // initializedChannels.clear(); 047 channelListenerMap.clear(); 048 } 049 public void subscribe(String channel, MessageSubscriber subscriber, MessageSelectorCommand messageSelector) throws MactorException { 050 ChannelListener cl = null; 051 synchronized (channelLock) { 052 cl = channelListenerMap.get(channel); 053 if (cl == null) { 054 cl = new ChannelListener(channel); 055 channelListenerMap.put(channel, cl); 056 onFirstSubscribe(channel); 057 } 058 } 059 cl.addSubscriber(subscriber, messageSelector); 060 } 061 public void unsubscribe(String channel, MessageSubscriber subscriber) throws MactorException { 062 ChannelListener cl = null; 063 synchronized (channelLock) { 064 cl = channelListenerMap.get(channel); 065 if (cl != null) { 066 cl.removeSubscriber(subscriber); 067 if (cl.getSubscriberCount() == 0) { 068 channelListenerMap.remove(channel); 069 onLastSubscribe(channel); 070 } 071 } 072 } 073 } 074 /** 075 * This method shold be invoked message broker implementations when message 076 * is received. The method delivers the message to the subscribers of the 077 * channel 078 * 079 * @param channel 080 * the channel the message was recevied 081 * @param message 082 * the message 083 * @param broadcast 084 * a flag that indicates if the message should be distributed to 085 * all subscribers that accepts the message, or just the first 086 * (random) subscriber that accepets the message. 087 * @return a result message that might be used by message broker 088 * implementation to return a synchrounous result to the source of 089 * the incoming messages. 090 * @throws MactorException 091 */ 092 protected Message raiseOnMessage(String channel, Message message, boolean broadcast) throws MactorException { 093 ChannelListener cl = null; 094 synchronized (channelLock) { 095 cl = channelListenerMap.get(channel); 096 } 097 if (cl == null) 098 throw new MactorException("Unknown channel '" + channel + "'"); 099 Message resultMessage = null; 100 LinkedList<SC> subs = cl.getSnapshotCopyOfSubscribersList(); 101 for (SC s : subs) { 102 if (s.messageSelector.isAcceptableMessage(message)) { 103 resultMessage = s.subscriber.onMessage(message); 104 if (!broadcast && message.isConsumed()) { 105 break; 106 } 107 } 108 } 109 return resultMessage; 110 } 111 protected abstract void onFirstSubscribe(String channel) throws MactorException; 112 protected abstract void onLastSubscribe(String channel) throws MactorException; 113 private static class ChannelListener { 114 private HashMap<MessageSubscriber, SC> subscribers = new HashMap<MessageSubscriber, SC>(); 115 private Object lock = new Object(); 116 private String channel; 117 public ChannelListener(String channel) { 118 this.channel = channel; 119 } 120 public void addSubscriber(MessageSubscriber subscriber, MessageSelectorCommand messageSelector) { 121 synchronized (lock) { 122 subscribers.put(subscriber, new SC(subscriber, messageSelector)); 123 } 124 } 125 public boolean removeSubscriber(MessageSubscriber subscriber) { 126 synchronized (lock) { 127 return subscribers.remove(subscriber) != null; 128 } 129 } 130 public int getSubscriberCount() { 131 return subscribers.size(); 132 } 133 public LinkedList<SC> getSnapshotCopyOfSubscribersList() { 134 synchronized (lock) { 135 return new LinkedList<SC>(subscribers.values()); 136 } 137 } 138 } 139 private static class SC { 140 MessageSubscriber subscriber; 141 MessageSelectorCommand messageSelector; 142 public SC(MessageSubscriber subscriber, MessageSelectorCommand messageSelector) { 143 super(); 144 this.subscriber = subscriber; 145 this.messageSelector = messageSelector; 146 } 147 } 148 }