001    /******************************************************************************
002     * Copyright (C) MActor Developers. All rights reserved.                        *
003     * ---------------------------------------------------------------------------*
004     * This file is part of MActor.                                               *
005     *                                                                            *
006     * MActor is free software; you can redistribute it and/or modify             *
007     * it under the terms of the GNU General Public License as published by       *
008     * the Free Software Foundation; either version 2 of the License, or          *
009     * (at your option) any later version.                                        *
010     *                                                                            *
011     * MActor is distributed in the hope that it will be useful,                  *
012     * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
013     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
014     * GNU General Public License for more details.                               *
015     *                                                                            *
016     * You should have received a copy of the GNU General Public License          *
017     * along with MActor; if not, write to the Free Software                      *
018     * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA *
019     ******************************************************************************/
020    package org.mactor.brokers.file;
021    
022    import java.io.File;
023    import java.io.FileWriter;
024    import java.io.IOException;
025    import java.util.LinkedList;
026    import java.util.List;
027    
028    import org.mactor.brokers.Message;
029    import org.mactor.brokers.MessageBroker;
030    import org.mactor.brokers.PollingMessageBrokerTemplate;
031    import org.mactor.framework.MactorException;
032    import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
033    import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
034    
035    /**
036     * A message broker for communicating via file shares.
037     * 
038     * </p>
039     * 
040     * @author Lars Ivar Almli
041     * @see MessageBroker
042     */
043    public class FileMessageBroker extends PollingMessageBrokerTemplate {
044            private MessageBrokerConfig config;
045            private final boolean postActionDelete;
046            public FileMessageBroker(MessageBrokerConfig config) throws MactorException {
047                    super(config);
048                    this.config = config;
049                    this.postActionDelete = "DELETE".equalsIgnoreCase(config.getValue("PostAction"));
050            }
051            @Override
052            protected synchronized List<Message> doGetMessages(String channel, int maxMessageCount) throws MactorException {
053                    ChannelConfig cf = config.getRequieredChannelConfig(channel);
054                    String filterPattern = cf.getValue("filter_patter", ".xml");
055                    File dir = new File(getDir(cf));
056                    List<File> candidates = new LinkedList<File>();
057                    for (File f : dir.listFiles()) {
058                            if (f.isFile() && f.getName().endsWith(filterPattern)) {
059                                    File target = new File(f.getAbsolutePath() + "_seen");
060                                    if (f.renameTo(target))
061                                            candidates.add(target);
062                                    else
063                                            log.error("Failed to parse message received on channel '" + channel + "' to '" + target.getAbsolutePath() + "'");
064                            }
065                            if (candidates.size() >= maxMessageCount)
066                                    break;
067                    }
068                    List<Message> messages = new LinkedList<Message>();
069                    for (File file : candidates) {
070                            try {
071                                    messages.add(Message.createMessage(file));
072                                    if (postActionDelete)
073                                            if (!file.delete())
074                                                    log.error("Failed to delete file '" + file.getAbsolutePath() + "' received on channel '" + channel + "'");
075                            } catch (MactorException me) {
076                                    log.error("Failed to parse message received on channel '" + channel + "'", me);
077                            }
078                    }
079                    return messages;
080            }
081            @Override
082            protected synchronized void doPublishMessage(String channel, Message message) throws MactorException {
083                    ChannelConfig cf = config.getRequieredChannelConfig(channel);
084                    String dir = getDir(cf);
085                    String fn = dir + getNext() + cf.getValue("suffix", ".xml") + "__";
086                    try {
087                            File f = new File(fn);
088                            FileWriter fw = new FileWriter(fn);
089                            fw.write(message.getContent() + "");
090                            fw.close();
091                            f.renameTo(new File(fn.substring(0, fn.length() - 2)));
092                    } catch (IOException ioe) {
093                            throw new MactorException("Failed to write message to dir '" + dir + "'. File name: ''" + fn + "' . Error:" + ioe.getMessage());
094                    }
095            }
096            private String getDir(ChannelConfig cf) throws MactorException {
097                    String dir = cf.getRequieredValue("dir");
098                    File f = new File(dir);
099                    if (!f.exists())
100                            if (!f.mkdirs())
101                                    throw new MactorException("Unable to create the dir '" + dir + "' specified in the channel config for channel: " + cf.getName());
102                    if (!dir.endsWith("\\") && !dir.endsWith("/"))
103                            dir = dir + "/";
104                    return dir;
105            }
106            private static long counter = System.currentTimeMillis();
107            private static synchronized long getNext() {
108                    return counter++;
109            }
110            public void terminate() {
111                    super.terminate();
112            }
113    }