001 /******************************************************************************
002 * Copyright (C) MActor Developers. All rights reserved. *
003 * ---------------------------------------------------------------------------*
004 * This file is part of MActor. *
005 * *
006 * MActor is free software; you can redistribute it and/or modify *
007 * it under the terms of the GNU General Public License as published by *
008 * the Free Software Foundation; either version 2 of the License, or *
009 * (at your option) any later version. *
010 * *
011 * MActor is distributed in the hope that it will be useful, *
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014 * GNU General Public License for more details. *
015 * *
016 * You should have received a copy of the GNU General Public License *
017 * along with MActor; if not, write to the Free Software *
018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019 ******************************************************************************/
020 package org.mactor.brokers.http;
021
022 import java.net.MalformedURLException;
023 import java.net.URL;
024 import java.nio.charset.Charset;
025 import java.util.HashMap;
026 import java.util.Map;
027
028 import org.mactor.brokers.AbstractMessageBroker;
029 import org.mactor.brokers.Message;
030 import org.mactor.brokers.MessageBroker;
031 import org.mactor.framework.ConfigException;
032 import org.mactor.framework.MactorException;
033 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
034 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
035
036 /**
037 * A simple message broker that supports posting/receinving XML messages over
038 * HTTP (optionally with a response)
039 *
040 * </p>
041 *
042 * @author Lars Ivar Almli
043 * @see MessageBroker
044 */
045 public class HttpMessageBroker extends AbstractMessageBroker {
046 public HttpMessageBroker(MessageBrokerConfig config) {
047 super(config);
048 }
049 public void publish(String channel, Message message) throws MactorException {
050 ChannelConfig cc = config.getRequieredChannelConfig(channel);
051 sendMessage(cc.getRequieredValue("URL"), cc.getRequieredValue("Method"), message, false, cc.getValue("Username"), cc.getValue("Password"));
052 }
053 public Message publishWithResponse(String channel, Message message) throws MactorException {
054 ChannelConfig cc = config.getRequieredChannelConfig(channel);
055 return sendMessage(cc.getRequieredValue("URL"), cc.getRequieredValue("Method"), message, true, cc.getValue("Username"), cc.getValue("Password"));
056 }
057 @Override
058 protected void onFirstSubscribe(String channel) throws MactorException {
059 registerListener(channel);
060 }
061 @Override
062 protected void onLastSubscribe(String channel) throws MactorException {
063 UrlListener l = listeners.remove(channel);
064 if (l != null)
065 l.close();
066 }
067 Map<String, UrlListener> listeners = new HashMap<String, UrlListener>();
068 private synchronized void registerListener(String channel) throws MactorException {
069 UrlListener listener = new UrlListener(config.getRequieredChannelConfig(channel));
070 listeners.put(channel, listener);
071 }
072 private class UrlListener implements HttpRequestListener {
073 private String channel;
074 private String realUrl;
075 private int port;
076 public UrlListener(ChannelConfig cc) throws MactorException {
077 this.channel = cc.getName();
078 String endpoint = cc.getRequieredValue("URL");
079 try {
080 URL url = new URL(endpoint);
081 port = url.getDefaultPort();
082 if (url.getPort() > 0)
083 port = url.getPort();
084 realUrl = url.getPath().toLowerCase();
085 HttpServerManager.getHttpServer(port).addRequestListener(realUrl, this);
086 } catch (MalformedURLException e) {
087 throw new ConfigException(e);
088 }
089 }
090 public void close() {
091 try {
092 System.out.println("removing:" + realUrl);
093 HttpServerManager.getHttpServer(port).removeRequestListener(realUrl);
094 } catch (MactorException me) {
095 log.warn("Failed to remove HTTP listener on port '" + port + "'. Error:" + me.getMessage(), me);
096 }
097 }
098 public HttpResponse onRequest(HttpRequest request) throws Exception {
099 Message m = Message.createMessage(request.getData());
100 Message result = raiseOnMessage(channel, m, false);
101 if (result != null) {
102 HttpResponse res = new HttpResponse();
103 res.setData(result.getContentDocument());
104 res.addHeader("Content-type", " text/xml; charset=" + Charset.defaultCharset().name());
105 return res;
106 } else {
107 HttpResponse res = new HttpResponse();
108 res.addHeader("Content-type", " text/plain; charset=" + Charset.defaultCharset().name());
109 res.setData("");
110 return res;
111 }
112 }
113 }
114 public Message sendMessage(String endPoint, String method, Message message, boolean expectResponse, String username, String password) throws MactorException {
115 return HttpClient.sendMessage(endPoint, method, message, expectResponse, username, password);
116 }
117 }