001 /******************************************************************************
002 * Copyright (C) MActor Developers. All rights reserved. *
003 * ---------------------------------------------------------------------------*
004 * This file is part of MActor. *
005 * *
006 * MActor is free software; you can redistribute it and/or modify *
007 * it under the terms of the GNU General Public License as published by *
008 * the Free Software Foundation; either version 2 of the License, or *
009 * (at your option) any later version. *
010 * *
011 * MActor is distributed in the hope that it will be useful, *
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014 * GNU General Public License for more details. *
015 * *
016 * You should have received a copy of the GNU General Public License *
017 * along with MActor; if not, write to the Free Software *
018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019 ******************************************************************************/
020 package org.mactor.brokers;
021
022 import java.util.HashMap;
023 import java.util.LinkedList;
024 import java.util.List;
025 import java.util.Map;
026
027 import org.apache.log4j.Logger;
028 import org.mactor.brokers.file.FileMessageBroker;
029 import org.mactor.framework.MactorException;
030 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
031
032 /**
033 * A template for implementing message brokers for polling bases protocols (i.e.
034 * such as file system polling)
035 *
036 * @see FileMessageBroker
037 * @author Lars Ivar Almli
038 */
039 public abstract class PollingMessageBrokerTemplate extends AbstractMessageBroker {
040 protected static Logger log = Logger.getLogger(PollingMessageBrokerTemplate.class);
041 private final int messageReadLimit;
042 private final int messageReadIntervalSeconds;
043 public PollingMessageBrokerTemplate(MessageBrokerConfig config) {
044 super(config);
045 this.messageReadLimit = config.getMessageReadLimit();
046 this.messageReadIntervalSeconds = config.getMessageReadIntervalSeconds();
047 }
048 public Message publishWithResponse(String channel, Message message) throws MactorException {
049 publish(channel, message);
050 return null;
051 }
052 public void publish(String channel, Message message) throws MactorException {
053 doPublishMessage(channel, message);
054 }
055 public void terminate() {
056 super.terminate();
057 for (PollingWorker pw : workerMap.values())
058 pw.terminate();
059 workerMap.clear();
060 }
061 /**
062 * Template method called to check for incoming messages (the polling
063 * interval is defined by the message broker config)
064 *
065 * @param channel
066 * the channel
067 * @param maxMessageCount
068 * the max number of messages to return (as defined in the
069 * message broker config).
070 * @return list of messages
071 * @throws MactorException
072 * if a problem occured (this will not cause the test to fail)
073 */
074 protected abstract List<Message> doGetMessages(String channel, int maxMessageCount) throws MactorException;
075 /**
076 * Template method called to publish a message.
077 *
078 * @param channel
079 * the channel
080 * @param message
081 * the message
082 * @throws MactorException
083 * if a problem occured (this will cause the test to fail)
084 */
085 protected abstract void doPublishMessage(String channel, Message message) throws MactorException;
086 Map<String, PollingWorker> workerMap = new HashMap<String, PollingWorker>();
087 @Override
088 protected void onFirstSubscribe(String channel) {
089 workerMap.put(channel, new PollingWorker(channel));
090 }
091 @Override
092 protected void onLastSubscribe(String channel) throws MactorException {
093 PollingWorker pw = workerMap.remove(channel);
094 pw.terminate();
095 }
096 private class PollingWorker {
097 private String channel;
098 private boolean terminated = false;
099 public PollingWorker(String channel) {
100 this.channel = channel;
101 new Thread(new Runnable() {
102 public void run() {
103 work();
104 };
105 }).start();
106 }
107 private Object waitLock = new Object();
108 public void terminate() {
109 terminated = true;
110 synchronized (waitLock) {
111 waitLock.notifyAll();
112 }
113 }
114 private void work() {
115 try {
116 while (!terminated) {
117 synchronized (waitLock) {
118 waitLock.wait(messageReadIntervalSeconds * 1000);
119 }
120 if (terminated)
121 break;
122 List<Message> candidates = null;
123 try {
124 log.debug("Polling for messages on channel '" + channel + "'");
125 candidates = doGetMessages(this.channel, messageReadLimit);
126 } catch (Exception me) {
127 log.error("Error while polling for messages on channel '" + channel + "'", me);
128 }
129 if (candidates == null || candidates.size() == 0)
130 continue;
131 candidates = new LinkedList<Message>(candidates);
132 for (Message message : candidates) {
133 try {
134 PollingMessageBrokerTemplate.this.raiseOnMessage(channel, message, false);
135 } catch (Exception e) {
136 log.info("Exception while delivering message to subscribers. Message: " + message, e);
137 }
138 }
139 }
140 } catch (InterruptedException ie) {
141 } finally {
142 log.info("Channel polling worker for channel '" + channel + "' terminated");
143 }
144 }
145 }
146 }