001 /******************************************************************************
002 * Copyright (C) MActor Developers. All rights reserved. *
003 * ---------------------------------------------------------------------------*
004 * This file is part of MActor. *
005 * *
006 * MActor is free software; you can redistribute it and/or modify *
007 * it under the terms of the GNU General Public License as published by *
008 * the Free Software Foundation; either version 2 of the License, or *
009 * (at your option) any later version. *
010 * *
011 * MActor is distributed in the hope that it will be useful, *
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014 * GNU General Public License for more details. *
015 * *
016 * You should have received a copy of the GNU General Public License *
017 * along with MActor; if not, write to the Free Software *
018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019 ******************************************************************************/
020 package org.mactor.brokers.mqseries;
021
022 import java.io.ByteArrayInputStream;
023 import java.io.IOException;
024 import java.util.HashMap;
025 import java.util.Hashtable;
026 import java.util.LinkedList;
027 import java.util.List;
028 import java.util.Map;
029
030 import org.mactor.brokers.Message;
031 import org.mactor.brokers.PollingMessageBrokerTemplate;
032 import org.mactor.framework.MactorException;
033 import org.mactor.framework.ParseUtil;
034 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
035 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
036
037 import com.ibm.mq.MQC;
038 import com.ibm.mq.MQException;
039 import com.ibm.mq.MQGetMessageOptions;
040 import com.ibm.mq.MQMessage;
041 import com.ibm.mq.MQMsg2;
042 import com.ibm.mq.MQPutMessageOptions;
043 import com.ibm.mq.MQQueue;
044 import com.ibm.mq.MQQueueManager;
045
046 /**
047 * A message broker for IBM MQ Series (add the com.ibm.mq.jar library from your
048 * version of MQ Series to the MActor classpath)
049 * <p>
050 *
051 * @author Lars Ivar Almli
052 */
053 public class MqSeriesMessageBroker extends PollingMessageBrokerTemplate {
054 private Map<String, MqConnectionWrapper> channelMap = new HashMap<String, MqConnectionWrapper>();
055 private MessageBrokerConfig config;
056 static {
057 MQException.log = null;
058 }
059 public void terminate() {
060 super.terminate();
061 for (MqConnectionWrapper mq : channelMap.values())
062 mq.close();
063 channelMap.clear();
064 }
065 public MqSeriesMessageBroker(MessageBrokerConfig config) {
066 super(config);
067 this.config = config;
068 }
069 private MqConnectionWrapper getMqConnectionWrapper(String channel) throws MactorException {
070 MqConnectionWrapper w = channelMap.get(channel);
071 if (w == null) {
072 w = new MqConnectionWrapper(config.getRequieredChannelConfig(channel));
073 channelMap.put(channel, w);
074 }
075 return w;
076 }
077 protected List<Message> doGetMessages(String channel, int maxMessageCount) throws MactorException {
078 List<Message> messages = new LinkedList<Message>();
079 MqConnectionWrapper cw = getMqConnectionWrapper(channel);
080 for (int i = 0; i < maxMessageCount; i++) {
081 Message m = cw.getMessage();
082 if (m == null)
083 break;
084 messages.add(m);
085 }
086 return messages;
087 }
088 protected void doPublishMessage(String channel, Message message) throws MactorException {
089 getMqConnectionWrapper(channel).sendMessage(message);
090 }
091 private class MqConnectionWrapper {
092 private String channel;
093 private String mqQueueManagerName;
094 private String mqHost;
095 private int mqPort;
096 private String mqQueue;
097 private String mqChannel;
098 private MQQueue mqOutgoingQueue;
099 private MQQueue mqIncomingQueue;
100 private MQQueueManager mqQueueManager;
101 public MqConnectionWrapper(ChannelConfig cf) throws MactorException {
102 this.mqQueueManagerName = cf.getRequieredValue("queue_manager");
103 this.mqHost = cf.getRequieredValue("host");
104 this.mqPort = ParseUtil.tryParseIntVal(cf.getRequieredValue("port"));
105 this.mqQueue = cf.getRequieredValue("queue");
106 this.mqChannel = cf.getRequieredValue("channel");
107 this.channel = cf.getName();
108 }
109 public void sendMessage(Message message) throws MactorException {
110 try {
111 MQMsg2 msg = new MQMsg2();
112 msg.setMessageData(message.getContent().trim().getBytes());
113 MQPutMessageOptions pmo = new MQPutMessageOptions();
114 getOutgoingQueue().putMsg2(msg, pmo);
115 } catch (MQException me) {
116 throw new MactorException("MQ Series error occured with completion code '" + me.completionCode + "' and reeason code '" + me.reasonCode + "' while attempting to send to channel '"
117 + channel + "'");
118 }
119 }
120 private final static String RFH2_FORMAT = "MQHRF2 ";
121 public Message getMessage() throws MactorException {
122 try {
123 MQMessage rcvMessage = new MQMessage();
124 MQGetMessageOptions gmo = new MQGetMessageOptions();
125 gmo.options = MQC.MQGMO_WAIT;
126 gmo.waitInterval = 500;// 500ms
127 getIncomingQueue().get(rcvMessage, gmo);
128 int headerLength = 0;
129 if (RFH2_FORMAT.equals(rcvMessage.format)) { // Skip RFH2
130 // header
131 rcvMessage.seek(4);
132 int version = rcvMessage.readInt();
133 headerLength = rcvMessage.readInt();
134 int encoding = rcvMessage.readInt();
135 int codedCharSetId = rcvMessage.readInt();
136 rcvMessage.skipBytes(8);
137 int flags = rcvMessage.readInt();
138 int nameValueCodedCharSetId = rcvMessage.readInt();
139 rcvMessage.seek(headerLength);
140 }
141 int len = rcvMessage.getTotalMessageLength() - headerLength;
142 if (len > 0) {
143 byte[] buffer = new byte[len];
144 rcvMessage.readFully(buffer);
145 return Message.createMessage(new ByteArrayInputStream(buffer));
146 }
147 } catch (MQException me) {
148 if (me.reasonCode == 2033) {// no messages
149 } else
150 throw new MactorException("MQ Series error occured with completion code '" + me.completionCode + "' and reeason code '" + me.reasonCode
151 + "' while attempting to read from channel '" + channel + "'");
152 } catch (IOException ioe) {
153 ioe.printStackTrace();
154 throw new MactorException("An IOException occured while trying to read a message from channel '" + channel + "'", ioe);
155 }
156 return null;
157 }
158 private Hashtable buildProperties() {
159 Hashtable props = new Hashtable();
160 props.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
161 props.put(MQC.HOST_NAME_PROPERTY, mqHost);
162 props.put(MQC.PORT_PROPERTY, new Integer(mqPort));
163 props.put(MQC.CHANNEL_PROPERTY, mqChannel);
164 return props;
165 }
166 private MQQueueManager getManager() throws MQException {
167 if (mqQueueManager == null) {
168 mqQueueManager = new MQQueueManager(mqQueueManagerName, buildProperties());
169 }
170 return mqQueueManager;
171 }
172 private MQQueue getOutgoingQueue() throws MQException {
173 if (mqOutgoingQueue == null) {
174 mqOutgoingQueue = getManager().accessQueue(mqQueue, MQC.MQOO_OUTPUT);
175 }
176 return mqOutgoingQueue;
177 }
178 private MQQueue getIncomingQueue() throws MQException {
179 if (mqIncomingQueue == null) {
180 mqIncomingQueue = getManager().accessQueue(mqQueue, MQC.MQOO_INPUT_AS_Q_DEF);
181 }
182 return mqIncomingQueue;
183 }
184 @Override
185 protected void finalize() throws Throwable {
186 super.finalize();
187 close();
188 }
189 public void close() {
190 try {
191 if (mqOutgoingQueue != null)
192 mqOutgoingQueue.close();
193 if (mqIncomingQueue != null)
194 mqIncomingQueue.close();
195 mqQueueManager.disconnect();
196 } catch (MQException ex) {
197 log.info("A WebSphere MQ Error occured : Completion Code " + ex.completionCode + " Reason Code " + ex.reasonCode);
198 } catch (Exception ex) {
199 log.info("An IOException occured whilst writing to the message buffer: " + ex);
200 }
201 }
202 }
203 }