001 /****************************************************************************** 002 * Copyright (C) MActor Developers. All rights reserved. * 003 * ---------------------------------------------------------------------------* 004 * This file is part of MActor. * 005 * * 006 * MActor is free software; you can redistribute it and/or modify * 007 * it under the terms of the GNU General Public License as published by * 008 * the Free Software Foundation; either version 2 of the License, or * 009 * (at your option) any later version. * 010 * * 011 * MActor is distributed in the hope that it will be useful, * 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 014 * GNU General Public License for more details. * 015 * * 016 * You should have received a copy of the GNU General Public License * 017 * along with MActor; if not, write to the Free Software * 018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * 019 ******************************************************************************/ 020 package org.mactor.brokers; 021 022 import java.util.HashMap; 023 import java.util.LinkedList; 024 import java.util.List; 025 import java.util.Map; 026 027 import org.apache.log4j.Logger; 028 import org.mactor.brokers.file.FileMessageBroker; 029 import org.mactor.framework.MactorException; 030 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig; 031 032 /** 033 * A template for implementing message brokers for polling bases protocols (i.e. 034 * such as file system polling) 035 * 036 * @see FileMessageBroker 037 * @author Lars Ivar Almli 038 */ 039 public abstract class PollingMessageBrokerTemplate extends AbstractMessageBroker { 040 protected static Logger log = Logger.getLogger(PollingMessageBrokerTemplate.class); 041 private final int messageReadLimit; 042 private final int messageReadIntervalSeconds; 043 public PollingMessageBrokerTemplate(MessageBrokerConfig config) { 044 super(config); 045 this.messageReadLimit = config.getMessageReadLimit(); 046 this.messageReadIntervalSeconds = config.getMessageReadIntervalSeconds(); 047 } 048 public Message publishWithResponse(String channel, Message message) throws MactorException { 049 publish(channel, message); 050 return null; 051 } 052 public void publish(String channel, Message message) throws MactorException { 053 doPublishMessage(channel, message); 054 } 055 public void terminate() { 056 super.terminate(); 057 for (PollingWorker pw : workerMap.values()) 058 pw.terminate(); 059 workerMap.clear(); 060 } 061 /** 062 * Template method called to check for incoming messages (the polling 063 * interval is defined by the message broker config) 064 * 065 * @param channel 066 * the channel 067 * @param maxMessageCount 068 * the max number of messages to return (as defined in the 069 * message broker config). 070 * @return list of messages 071 * @throws MactorException 072 * if a problem occured (this will not cause the test to fail) 073 */ 074 protected abstract List<Message> doGetMessages(String channel, int maxMessageCount) throws MactorException; 075 /** 076 * Template method called to publish a message. 077 * 078 * @param channel 079 * the channel 080 * @param message 081 * the message 082 * @throws MactorException 083 * if a problem occured (this will cause the test to fail) 084 */ 085 protected abstract void doPublishMessage(String channel, Message message) throws MactorException; 086 Map<String, PollingWorker> workerMap = new HashMap<String, PollingWorker>(); 087 @Override 088 protected void onFirstSubscribe(String channel) { 089 workerMap.put(channel, new PollingWorker(channel)); 090 } 091 @Override 092 protected void onLastSubscribe(String channel) throws MactorException { 093 PollingWorker pw = workerMap.remove(channel); 094 pw.terminate(); 095 } 096 private class PollingWorker { 097 private String channel; 098 private boolean terminated = false; 099 public PollingWorker(String channel) { 100 this.channel = channel; 101 new Thread(new Runnable() { 102 public void run() { 103 work(); 104 }; 105 }).start(); 106 } 107 private Object waitLock = new Object(); 108 public void terminate() { 109 terminated = true; 110 synchronized (waitLock) { 111 waitLock.notifyAll(); 112 } 113 } 114 private void work() { 115 try { 116 while (!terminated) { 117 synchronized (waitLock) { 118 waitLock.wait(messageReadIntervalSeconds * 1000); 119 } 120 if (terminated) 121 break; 122 List<Message> candidates = null; 123 try { 124 log.debug("Polling for messages on channel '" + channel + "'"); 125 candidates = doGetMessages(this.channel, messageReadLimit); 126 } catch (Exception me) { 127 log.error("Error while polling for messages on channel '" + channel + "'", me); 128 } 129 if (candidates == null || candidates.size() == 0) 130 continue; 131 candidates = new LinkedList<Message>(candidates); 132 for (Message message : candidates) { 133 try { 134 PollingMessageBrokerTemplate.this.raiseOnMessage(channel, message, false); 135 } catch (Exception e) { 136 log.info("Exception while delivering message to subscribers. Message: " + message, e); 137 } 138 } 139 } 140 } catch (InterruptedException ie) { 141 } finally { 142 log.info("Channel polling worker for channel '" + channel + "' terminated"); 143 } 144 } 145 } 146 }