001 /******************************************************************************
002 * Copyright (C) MActor Developers. All rights reserved. *
003 * ---------------------------------------------------------------------------*
004 * This file is part of MActor. *
005 * *
006 * MActor is free software; you can redistribute it and/or modify *
007 * it under the terms of the GNU General Public License as published by *
008 * the Free Software Foundation; either version 2 of the License, or *
009 * (at your option) any later version. *
010 * *
011 * MActor is distributed in the hope that it will be useful, *
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014 * GNU General Public License for more details. *
015 * *
016 * You should have received a copy of the GNU General Public License *
017 * along with MActor; if not, write to the Free Software *
018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019 ******************************************************************************/
020 package org.mactor.brokers.file;
021
022 import java.io.File;
023 import java.io.FileWriter;
024 import java.io.IOException;
025 import java.util.LinkedList;
026 import java.util.List;
027
028 import org.mactor.brokers.Message;
029 import org.mactor.brokers.MessageBroker;
030 import org.mactor.brokers.PollingMessageBrokerTemplate;
031 import org.mactor.framework.MactorException;
032 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
033 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
034
035 /**
036 * A message broker for communicating via file shares.
037 *
038 * </p>
039 *
040 * @author Lars Ivar Almli
041 * @see MessageBroker
042 */
043 public class FileMessageBroker extends PollingMessageBrokerTemplate {
044 private MessageBrokerConfig config;
045 private final boolean postActionDelete;
046 public FileMessageBroker(MessageBrokerConfig config) throws MactorException {
047 super(config);
048 this.config = config;
049 this.postActionDelete = "DELETE".equalsIgnoreCase(config.getValue("PostAction"));
050 }
051 @Override
052 protected synchronized List<Message> doGetMessages(String channel, int maxMessageCount) throws MactorException {
053 ChannelConfig cf = config.getRequieredChannelConfig(channel);
054 String filterPattern = cf.getValue("filter_patter", ".xml");
055 File dir = new File(getDir(cf));
056 List<File> candidates = new LinkedList<File>();
057 for (File f : dir.listFiles()) {
058 if (f.isFile() && f.getName().endsWith(filterPattern)) {
059 File target = new File(f.getAbsolutePath() + "_seen");
060 if (f.renameTo(target))
061 candidates.add(target);
062 else
063 log.error("Failed to parse message received on channel '" + channel + "' to '" + target.getAbsolutePath() + "'");
064 }
065 if (candidates.size() >= maxMessageCount)
066 break;
067 }
068 List<Message> messages = new LinkedList<Message>();
069 for (File file : candidates) {
070 try {
071 messages.add(Message.createMessage(file));
072 if (postActionDelete)
073 if (!file.delete())
074 log.error("Failed to delete file '" + file.getAbsolutePath() + "' received on channel '" + channel + "'");
075 } catch (MactorException me) {
076 log.error("Failed to parse message received on channel '" + channel + "'", me);
077 }
078 }
079 return messages;
080 }
081 @Override
082 protected synchronized void doPublishMessage(String channel, Message message) throws MactorException {
083 ChannelConfig cf = config.getRequieredChannelConfig(channel);
084 String dir = getDir(cf);
085 String fn = dir + getNext() + cf.getValue("suffix", ".xml") + "__";
086 try {
087 File f = new File(fn);
088 FileWriter fw = new FileWriter(fn);
089 fw.write(message.getContent() + "");
090 fw.close();
091 f.renameTo(new File(fn.substring(0, fn.length() - 2)));
092 } catch (IOException ioe) {
093 throw new MactorException("Failed to write message to dir '" + dir + "'. File name: ''" + fn + "' . Error:" + ioe.getMessage());
094 }
095 }
096 private String getDir(ChannelConfig cf) throws MactorException {
097 String dir = cf.getRequieredValue("dir");
098 File f = new File(dir);
099 if (!f.exists())
100 if (!f.mkdirs())
101 throw new MactorException("Unable to create the dir '" + dir + "' specified in the channel config for channel: " + cf.getName());
102 if (!dir.endsWith("\\") && !dir.endsWith("/"))
103 dir = dir + "/";
104 return dir;
105 }
106 private static long counter = System.currentTimeMillis();
107 private static synchronized long getNext() {
108 return counter++;
109 }
110 public void terminate() {
111 super.terminate();
112 }
113 }