001 /****************************************************************************** 002 * Copyright (C) MActor Developers. All rights reserved. * 003 * ---------------------------------------------------------------------------* 004 * This file is part of MActor. * 005 * * 006 * MActor is free software; you can redistribute it and/or modify * 007 * it under the terms of the GNU General Public License as published by * 008 * the Free Software Foundation; either version 2 of the License, or * 009 * (at your option) any later version. * 010 * * 011 * MActor is distributed in the hope that it will be useful, * 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 014 * GNU General Public License for more details. * 015 * * 016 * You should have received a copy of the GNU General Public License * 017 * along with MActor; if not, write to the Free Software * 018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * 019 ******************************************************************************/ 020 package org.mactor.brokers.http; 021 022 import java.net.MalformedURLException; 023 import java.net.URL; 024 import java.nio.charset.Charset; 025 import java.util.HashMap; 026 import java.util.Map; 027 028 import org.mactor.brokers.AbstractMessageBroker; 029 import org.mactor.brokers.Message; 030 import org.mactor.brokers.MessageBroker; 031 import org.mactor.framework.ConfigException; 032 import org.mactor.framework.MactorException; 033 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig; 034 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig; 035 036 /** 037 * A simple message broker that supports posting/receinving XML messages over 038 * HTTP (optionally with a response) 039 * 040 * </p> 041 * 042 * @author Lars Ivar Almli 043 * @see MessageBroker 044 */ 045 public class HttpMessageBroker extends AbstractMessageBroker { 046 public HttpMessageBroker(MessageBrokerConfig config) { 047 super(config); 048 } 049 public void publish(String channel, Message message) throws MactorException { 050 ChannelConfig cc = config.getRequieredChannelConfig(channel); 051 sendMessage(cc.getRequieredValue("URL"), cc.getRequieredValue("Method"), message, false, cc.getValue("Username"), cc.getValue("Password")); 052 } 053 public Message publishWithResponse(String channel, Message message) throws MactorException { 054 ChannelConfig cc = config.getRequieredChannelConfig(channel); 055 return sendMessage(cc.getRequieredValue("URL"), cc.getRequieredValue("Method"), message, true, cc.getValue("Username"), cc.getValue("Password")); 056 } 057 @Override 058 protected void onFirstSubscribe(String channel) throws MactorException { 059 registerListener(channel); 060 } 061 @Override 062 protected void onLastSubscribe(String channel) throws MactorException { 063 UrlListener l = listeners.remove(channel); 064 if (l != null) 065 l.close(); 066 } 067 Map<String, UrlListener> listeners = new HashMap<String, UrlListener>(); 068 private synchronized void registerListener(String channel) throws MactorException { 069 UrlListener listener = new UrlListener(config.getRequieredChannelConfig(channel)); 070 listeners.put(channel, listener); 071 } 072 private class UrlListener implements HttpRequestListener { 073 private String channel; 074 private String realUrl; 075 private int port; 076 public UrlListener(ChannelConfig cc) throws MactorException { 077 this.channel = cc.getName(); 078 String endpoint = cc.getRequieredValue("URL"); 079 try { 080 URL url = new URL(endpoint); 081 port = url.getDefaultPort(); 082 if (url.getPort() > 0) 083 port = url.getPort(); 084 realUrl = url.getPath().toLowerCase(); 085 HttpServerManager.getHttpServer(port).addRequestListener(realUrl, this); 086 } catch (MalformedURLException e) { 087 throw new ConfigException(e); 088 } 089 } 090 public void close() { 091 try { 092 System.out.println("removing:" + realUrl); 093 HttpServerManager.getHttpServer(port).removeRequestListener(realUrl); 094 } catch (MactorException me) { 095 log.warn("Failed to remove HTTP listener on port '" + port + "'. Error:" + me.getMessage(), me); 096 } 097 } 098 public HttpResponse onRequest(HttpRequest request) throws Exception { 099 Message m = Message.createMessage(request.getData()); 100 Message result = raiseOnMessage(channel, m, false); 101 if (result != null) { 102 HttpResponse res = new HttpResponse(); 103 res.setData(result.getContentDocument()); 104 res.addHeader("Content-type", " text/xml; charset=" + Charset.defaultCharset().name()); 105 return res; 106 } else { 107 HttpResponse res = new HttpResponse(); 108 res.addHeader("Content-type", " text/plain; charset=" + Charset.defaultCharset().name()); 109 res.setData(""); 110 return res; 111 } 112 } 113 } 114 public Message sendMessage(String endPoint, String method, Message message, boolean expectResponse, String username, String password) throws MactorException { 115 return HttpClient.sendMessage(endPoint, method, message, expectResponse, username, password); 116 } 117 }