001 /****************************************************************************** 002 * Copyright (C) MActor Developers. All rights reserved. * 003 * ---------------------------------------------------------------------------* 004 * This file is part of MActor. * 005 * * 006 * MActor is free software; you can redistribute it and/or modify * 007 * it under the terms of the GNU General Public License as published by * 008 * the Free Software Foundation; either version 2 of the License, or * 009 * (at your option) any later version. * 010 * * 011 * MActor is distributed in the hope that it will be useful, * 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 014 * GNU General Public License for more details. * 015 * * 016 * You should have received a copy of the GNU General Public License * 017 * along with MActor; if not, write to the Free Software * 018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * 019 ******************************************************************************/ 020 package org.mactor.brokers.mqseries; 021 022 import java.io.ByteArrayInputStream; 023 import java.io.IOException; 024 import java.util.HashMap; 025 import java.util.Hashtable; 026 import java.util.LinkedList; 027 import java.util.List; 028 import java.util.Map; 029 030 import org.mactor.brokers.Message; 031 import org.mactor.brokers.PollingMessageBrokerTemplate; 032 import org.mactor.framework.MactorException; 033 import org.mactor.framework.ParseUtil; 034 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig; 035 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig; 036 037 import com.ibm.mq.MQC; 038 import com.ibm.mq.MQException; 039 import com.ibm.mq.MQGetMessageOptions; 040 import com.ibm.mq.MQMessage; 041 import com.ibm.mq.MQMsg2; 042 import com.ibm.mq.MQPutMessageOptions; 043 import com.ibm.mq.MQQueue; 044 import com.ibm.mq.MQQueueManager; 045 046 /** 047 * A message broker for IBM MQ Series (add the com.ibm.mq.jar library from your 048 * version of MQ Series to the MActor classpath) 049 * <p> 050 * 051 * @author Lars Ivar Almli 052 */ 053 public class MqSeriesMessageBroker extends PollingMessageBrokerTemplate { 054 private Map<String, MqConnectionWrapper> channelMap = new HashMap<String, MqConnectionWrapper>(); 055 private MessageBrokerConfig config; 056 static { 057 MQException.log = null; 058 } 059 public void terminate() { 060 super.terminate(); 061 for (MqConnectionWrapper mq : channelMap.values()) 062 mq.close(); 063 channelMap.clear(); 064 } 065 public MqSeriesMessageBroker(MessageBrokerConfig config) { 066 super(config); 067 this.config = config; 068 } 069 private MqConnectionWrapper getMqConnectionWrapper(String channel) throws MactorException { 070 MqConnectionWrapper w = channelMap.get(channel); 071 if (w == null) { 072 w = new MqConnectionWrapper(config.getRequieredChannelConfig(channel)); 073 channelMap.put(channel, w); 074 } 075 return w; 076 } 077 protected List<Message> doGetMessages(String channel, int maxMessageCount) throws MactorException { 078 List<Message> messages = new LinkedList<Message>(); 079 MqConnectionWrapper cw = getMqConnectionWrapper(channel); 080 for (int i = 0; i < maxMessageCount; i++) { 081 Message m = cw.getMessage(); 082 if (m == null) 083 break; 084 messages.add(m); 085 } 086 return messages; 087 } 088 protected void doPublishMessage(String channel, Message message) throws MactorException { 089 getMqConnectionWrapper(channel).sendMessage(message); 090 } 091 private class MqConnectionWrapper { 092 private String channel; 093 private String mqQueueManagerName; 094 private String mqHost; 095 private int mqPort; 096 private String mqQueue; 097 private String mqChannel; 098 private MQQueue mqOutgoingQueue; 099 private MQQueue mqIncomingQueue; 100 private MQQueueManager mqQueueManager; 101 public MqConnectionWrapper(ChannelConfig cf) throws MactorException { 102 this.mqQueueManagerName = cf.getRequieredValue("queue_manager"); 103 this.mqHost = cf.getRequieredValue("host"); 104 this.mqPort = ParseUtil.tryParseIntVal(cf.getRequieredValue("port")); 105 this.mqQueue = cf.getRequieredValue("queue"); 106 this.mqChannel = cf.getRequieredValue("channel"); 107 this.channel = cf.getName(); 108 } 109 public void sendMessage(Message message) throws MactorException { 110 try { 111 MQMsg2 msg = new MQMsg2(); 112 msg.setMessageData(message.getContent().trim().getBytes()); 113 MQPutMessageOptions pmo = new MQPutMessageOptions(); 114 getOutgoingQueue().putMsg2(msg, pmo); 115 } catch (MQException me) { 116 throw new MactorException("MQ Series error occured with completion code '" + me.completionCode + "' and reeason code '" + me.reasonCode + "' while attempting to send to channel '" 117 + channel + "'"); 118 } 119 } 120 private final static String RFH2_FORMAT = "MQHRF2 "; 121 public Message getMessage() throws MactorException { 122 try { 123 MQMessage rcvMessage = new MQMessage(); 124 MQGetMessageOptions gmo = new MQGetMessageOptions(); 125 gmo.options = MQC.MQGMO_WAIT; 126 gmo.waitInterval = 500;// 500ms 127 getIncomingQueue().get(rcvMessage, gmo); 128 int headerLength = 0; 129 if (RFH2_FORMAT.equals(rcvMessage.format)) { // Skip RFH2 130 // header 131 rcvMessage.seek(4); 132 int version = rcvMessage.readInt(); 133 headerLength = rcvMessage.readInt(); 134 int encoding = rcvMessage.readInt(); 135 int codedCharSetId = rcvMessage.readInt(); 136 rcvMessage.skipBytes(8); 137 int flags = rcvMessage.readInt(); 138 int nameValueCodedCharSetId = rcvMessage.readInt(); 139 rcvMessage.seek(headerLength); 140 } 141 int len = rcvMessage.getTotalMessageLength() - headerLength; 142 if (len > 0) { 143 byte[] buffer = new byte[len]; 144 rcvMessage.readFully(buffer); 145 return Message.createMessage(new ByteArrayInputStream(buffer)); 146 } 147 } catch (MQException me) { 148 if (me.reasonCode == 2033) {// no messages 149 } else 150 throw new MactorException("MQ Series error occured with completion code '" + me.completionCode + "' and reeason code '" + me.reasonCode 151 + "' while attempting to read from channel '" + channel + "'"); 152 } catch (IOException ioe) { 153 ioe.printStackTrace(); 154 throw new MactorException("An IOException occured while trying to read a message from channel '" + channel + "'", ioe); 155 } 156 return null; 157 } 158 private Hashtable buildProperties() { 159 Hashtable props = new Hashtable(); 160 props.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT); 161 props.put(MQC.HOST_NAME_PROPERTY, mqHost); 162 props.put(MQC.PORT_PROPERTY, new Integer(mqPort)); 163 props.put(MQC.CHANNEL_PROPERTY, mqChannel); 164 return props; 165 } 166 private MQQueueManager getManager() throws MQException { 167 if (mqQueueManager == null) { 168 mqQueueManager = new MQQueueManager(mqQueueManagerName, buildProperties()); 169 } 170 return mqQueueManager; 171 } 172 private MQQueue getOutgoingQueue() throws MQException { 173 if (mqOutgoingQueue == null) { 174 mqOutgoingQueue = getManager().accessQueue(mqQueue, MQC.MQOO_OUTPUT); 175 } 176 return mqOutgoingQueue; 177 } 178 private MQQueue getIncomingQueue() throws MQException { 179 if (mqIncomingQueue == null) { 180 mqIncomingQueue = getManager().accessQueue(mqQueue, MQC.MQOO_INPUT_AS_Q_DEF); 181 } 182 return mqIncomingQueue; 183 } 184 @Override 185 protected void finalize() throws Throwable { 186 super.finalize(); 187 close(); 188 } 189 public void close() { 190 try { 191 if (mqOutgoingQueue != null) 192 mqOutgoingQueue.close(); 193 if (mqIncomingQueue != null) 194 mqIncomingQueue.close(); 195 mqQueueManager.disconnect(); 196 } catch (MQException ex) { 197 log.info("A WebSphere MQ Error occured : Completion Code " + ex.completionCode + " Reason Code " + ex.reasonCode); 198 } catch (Exception ex) { 199 log.info("An IOException occured whilst writing to the message buffer: " + ex); 200 } 201 } 202 } 203 }