001    /******************************************************************************
002     * Copyright (C) MActor Developers. All rights reserved.                        *
003     * ---------------------------------------------------------------------------*
004     * This file is part of MActor.                                               *
005     *                                                                            *
006     * MActor is free software; you can redistribute it and/or modify             *
007     * it under the terms of the GNU General Public License as published by       *
008     * the Free Software Foundation; either version 2 of the License, or          *
009     * (at your option) any later version.                                        *
010     *                                                                            *
011     * MActor is distributed in the hope that it will be useful,                  *
012     * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
013     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
014     * GNU General Public License for more details.                               *
015     *                                                                            *
016     * You should have received a copy of the GNU General Public License          *
017     * along with MActor; if not, write to the Free Software                      *
018     * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA *
019     ******************************************************************************/
020    package org.mactor.brokers;
021    
022    import java.util.HashMap;
023    import java.util.LinkedList;
024    import java.util.List;
025    import java.util.Map;
026    
027    import org.apache.log4j.Logger;
028    import org.mactor.brokers.file.FileMessageBroker;
029    import org.mactor.framework.MactorException;
030    import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
031    
032    /**
033     * A template for implementing message brokers for polling bases protocols (i.e.
034     * such as file system polling)
035     * 
036     * @see FileMessageBroker
037     * @author Lars Ivar Almli
038     */
039    public abstract class PollingMessageBrokerTemplate extends AbstractMessageBroker {
040            protected static Logger log = Logger.getLogger(PollingMessageBrokerTemplate.class);
041            private final int messageReadLimit;
042            private final int messageReadIntervalSeconds;
043            public PollingMessageBrokerTemplate(MessageBrokerConfig config) {
044                    super(config);
045                    this.messageReadLimit = config.getMessageReadLimit();
046                    this.messageReadIntervalSeconds = config.getMessageReadIntervalSeconds();
047            }
048            public Message publishWithResponse(String channel, Message message) throws MactorException {
049                    publish(channel, message);
050                    return null;
051            }
052            public void publish(String channel, Message message) throws MactorException {
053                    doPublishMessage(channel, message);
054            }
055            public void terminate() {
056                    super.terminate();
057                    for (PollingWorker pw : workerMap.values())
058                            pw.terminate();
059                    workerMap.clear();
060            }
061            /**
062             * Template method called to check for incoming messages (the polling
063             * interval is defined by the message broker config)
064             * 
065             * @param channel
066             *            the channel
067             * @param maxMessageCount
068             *            the max number of messages to return (as defined in the
069             *            message broker config).
070             * @return list of messages
071             * @throws MactorException
072             *             if a problem occured (this will not cause the test to fail)
073             */
074            protected abstract List<Message> doGetMessages(String channel, int maxMessageCount) throws MactorException;
075            /**
076             * Template method called to publish a message.
077             * 
078             * @param channel
079             *            the channel
080             * @param message
081             *            the message
082             * @throws MactorException
083             *             if a problem occured (this will cause the test to fail)
084             */
085            protected abstract void doPublishMessage(String channel, Message message) throws MactorException;
086            Map<String, PollingWorker> workerMap = new HashMap<String, PollingWorker>();
087            @Override
088            protected void onFirstSubscribe(String channel) {
089                    workerMap.put(channel, new PollingWorker(channel));
090            }
091            @Override
092            protected void onLastSubscribe(String channel) throws MactorException {
093                    PollingWorker pw = workerMap.remove(channel);
094                    pw.terminate();
095            }
096            private class PollingWorker {
097                    private String channel;
098                    private boolean terminated = false;
099                    public PollingWorker(String channel) {
100                            this.channel = channel;
101                            new Thread(new Runnable() {
102                                    public void run() {
103                                            work();
104                                    };
105                            }).start();
106                    }
107                    private Object waitLock = new Object();
108                    public void terminate() {
109                            terminated = true;
110                            synchronized (waitLock) {
111                                    waitLock.notifyAll();
112                            }
113                    }
114                    private void work() {
115                            try {
116                                    while (!terminated) {
117                                            synchronized (waitLock) {
118                                                    waitLock.wait(messageReadIntervalSeconds * 1000);
119                                            }
120                                            if (terminated)
121                                                    break;
122                                            List<Message> candidates = null;
123                                            try {
124                                                    log.debug("Polling for messages on channel '" + channel + "'");
125                                                    candidates = doGetMessages(this.channel, messageReadLimit);
126                                            } catch (Exception me) {
127                                                    log.error("Error while polling for messages on channel '" + channel + "'", me);
128                                            }
129                                            if (candidates == null || candidates.size() == 0)
130                                                    continue;
131                                            candidates = new LinkedList<Message>(candidates);
132                                            for (Message message : candidates) {
133                                                    try {
134                                                            PollingMessageBrokerTemplate.this.raiseOnMessage(channel, message, false);
135                                                    } catch (Exception e) {
136                                                            log.info("Exception while delivering message to subscribers. Message: " + message, e);
137                                                    }
138                                            }
139                                    }
140                            } catch (InterruptedException ie) {
141                            } finally {
142                                    log.info("Channel polling worker for channel '" + channel + "' terminated");
143                            }
144                    }
145            }
146    }