001 /****************************************************************************** 002 * Copyright (C) MActor Developers. All rights reserved. * 003 * ---------------------------------------------------------------------------* 004 * This file is part of MActor. * 005 * * 006 * MActor is free software; you can redistribute it and/or modify * 007 * it under the terms of the GNU General Public License as published by * 008 * the Free Software Foundation; either version 2 of the License, or * 009 * (at your option) any later version. * 010 * * 011 * MActor is distributed in the hope that it will be useful, * 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 014 * GNU General Public License for more details. * 015 * * 016 * You should have received a copy of the GNU General Public License * 017 * along with MActor; if not, write to the Free Software * 018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * 019 ******************************************************************************/ 020 package org.mactor.brokers.file; 021 022 import java.io.File; 023 import java.io.FileWriter; 024 import java.io.IOException; 025 import java.util.LinkedList; 026 import java.util.List; 027 028 import org.mactor.brokers.Message; 029 import org.mactor.brokers.MessageBroker; 030 import org.mactor.brokers.PollingMessageBrokerTemplate; 031 import org.mactor.framework.MactorException; 032 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig; 033 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig; 034 035 /** 036 * A message broker for communicating via file shares. 037 * 038 * </p> 039 * 040 * @author Lars Ivar Almli 041 * @see MessageBroker 042 */ 043 public class FileMessageBroker extends PollingMessageBrokerTemplate { 044 private MessageBrokerConfig config; 045 private final boolean postActionDelete; 046 public FileMessageBroker(MessageBrokerConfig config) throws MactorException { 047 super(config); 048 this.config = config; 049 this.postActionDelete = "DELETE".equalsIgnoreCase(config.getValue("PostAction")); 050 } 051 @Override 052 protected synchronized List<Message> doGetMessages(String channel, int maxMessageCount) throws MactorException { 053 ChannelConfig cf = config.getRequieredChannelConfig(channel); 054 String filterPattern = cf.getValue("filter_patter", ".xml"); 055 File dir = new File(getDir(cf)); 056 List<File> candidates = new LinkedList<File>(); 057 for (File f : dir.listFiles()) { 058 if (f.isFile() && f.getName().endsWith(filterPattern)) { 059 File target = new File(f.getAbsolutePath() + "_seen"); 060 if (f.renameTo(target)) 061 candidates.add(target); 062 else 063 log.error("Failed to parse message received on channel '" + channel + "' to '" + target.getAbsolutePath() + "'"); 064 } 065 if (candidates.size() >= maxMessageCount) 066 break; 067 } 068 List<Message> messages = new LinkedList<Message>(); 069 for (File file : candidates) { 070 try { 071 messages.add(Message.createMessage(file)); 072 if (postActionDelete) 073 if (!file.delete()) 074 log.error("Failed to delete file '" + file.getAbsolutePath() + "' received on channel '" + channel + "'"); 075 } catch (MactorException me) { 076 log.error("Failed to parse message received on channel '" + channel + "'", me); 077 } 078 } 079 return messages; 080 } 081 @Override 082 protected synchronized void doPublishMessage(String channel, Message message) throws MactorException { 083 ChannelConfig cf = config.getRequieredChannelConfig(channel); 084 String dir = getDir(cf); 085 String fn = dir + getNext() + cf.getValue("suffix", ".xml") + "__"; 086 try { 087 File f = new File(fn); 088 FileWriter fw = new FileWriter(fn); 089 fw.write(message.getContent() + ""); 090 fw.close(); 091 f.renameTo(new File(fn.substring(0, fn.length() - 2))); 092 } catch (IOException ioe) { 093 throw new MactorException("Failed to write message to dir '" + dir + "'. File name: ''" + fn + "' . Error:" + ioe.getMessage()); 094 } 095 } 096 private String getDir(ChannelConfig cf) throws MactorException { 097 String dir = cf.getRequieredValue("dir"); 098 File f = new File(dir); 099 if (!f.exists()) 100 if (!f.mkdirs()) 101 throw new MactorException("Unable to create the dir '" + dir + "' specified in the channel config for channel: " + cf.getName()); 102 if (!dir.endsWith("\\") && !dir.endsWith("/")) 103 dir = dir + "/"; 104 return dir; 105 } 106 private static long counter = System.currentTimeMillis(); 107 private static synchronized long getNext() { 108 return counter++; 109 } 110 public void terminate() { 111 super.terminate(); 112 } 113 }