001    /******************************************************************************
002     * Copyright (C) MActor Developers. All rights reserved.                        *
003     * ---------------------------------------------------------------------------*
004     * This file is part of MActor.                                               *
005     *                                                                            *
006     * MActor is free software; you can redistribute it and/or modify             *
007     * it under the terms of the GNU General Public License as published by       *
008     * the Free Software Foundation; either version 2 of the License, or          *
009     * (at your option) any later version.                                        *
010     *                                                                            *
011     * MActor is distributed in the hope that it will be useful,                  *
012     * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
013     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
014     * GNU General Public License for more details.                               *
015     *                                                                            *
016     * You should have received a copy of the GNU General Public License          *
017     * along with MActor; if not, write to the Free Software                      *
018     * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA *
019     ******************************************************************************/
020    package org.mactor.brokers.mqseries;
021    
022    import java.io.ByteArrayInputStream;
023    import java.io.IOException;
024    import java.util.HashMap;
025    import java.util.Hashtable;
026    import java.util.LinkedList;
027    import java.util.List;
028    import java.util.Map;
029    
030    import org.mactor.brokers.Message;
031    import org.mactor.brokers.PollingMessageBrokerTemplate;
032    import org.mactor.framework.MactorException;
033    import org.mactor.framework.ParseUtil;
034    import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig;
035    import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
036    
037    import com.ibm.mq.MQC;
038    import com.ibm.mq.MQException;
039    import com.ibm.mq.MQGetMessageOptions;
040    import com.ibm.mq.MQMessage;
041    import com.ibm.mq.MQMsg2;
042    import com.ibm.mq.MQPutMessageOptions;
043    import com.ibm.mq.MQQueue;
044    import com.ibm.mq.MQQueueManager;
045    
046    /**
047     * A message broker for IBM MQ Series (add the com.ibm.mq.jar library from your
048     * version of MQ Series to the MActor classpath)
049     * <p>
050     * 
051     * @author Lars Ivar Almli
052     */
053    public class MqSeriesMessageBroker extends PollingMessageBrokerTemplate {
054            private Map<String, MqConnectionWrapper> channelMap = new HashMap<String, MqConnectionWrapper>();
055            private MessageBrokerConfig config;
056            static {
057                    MQException.log = null;
058            }
059            public void terminate() {
060                    super.terminate();
061                    for (MqConnectionWrapper mq : channelMap.values())
062                            mq.close();
063                    channelMap.clear();
064            }
065            public MqSeriesMessageBroker(MessageBrokerConfig config) {
066                    super(config);
067                    this.config = config;
068            }
069            private MqConnectionWrapper getMqConnectionWrapper(String channel) throws MactorException {
070                    MqConnectionWrapper w = channelMap.get(channel);
071                    if (w == null) {
072                            w = new MqConnectionWrapper(config.getRequieredChannelConfig(channel));
073                            channelMap.put(channel, w);
074                    }
075                    return w;
076            }
077            protected List<Message> doGetMessages(String channel, int maxMessageCount) throws MactorException {
078                    List<Message> messages = new LinkedList<Message>();
079                    MqConnectionWrapper cw = getMqConnectionWrapper(channel);
080                    for (int i = 0; i < maxMessageCount; i++) {
081                            Message m = cw.getMessage();
082                            if (m == null)
083                                    break;
084                            messages.add(m);
085                    }
086                    return messages;
087            }
088            protected void doPublishMessage(String channel, Message message) throws MactorException {
089                    getMqConnectionWrapper(channel).sendMessage(message);
090            }
091            private class MqConnectionWrapper {
092                    private String channel;
093                    private String mqQueueManagerName;
094                    private String mqHost;
095                    private int mqPort;
096                    private String mqQueue;
097                    private String mqChannel;
098                    private MQQueue mqOutgoingQueue;
099                    private MQQueue mqIncomingQueue;
100                    private MQQueueManager mqQueueManager;
101                    public MqConnectionWrapper(ChannelConfig cf) throws MactorException {
102                            this.mqQueueManagerName = cf.getRequieredValue("queue_manager");
103                            this.mqHost = cf.getRequieredValue("host");
104                            this.mqPort = ParseUtil.tryParseIntVal(cf.getRequieredValue("port"));
105                            this.mqQueue = cf.getRequieredValue("queue");
106                            this.mqChannel = cf.getRequieredValue("channel");
107                            this.channel = cf.getName();
108                    }
109                    public void sendMessage(Message message) throws MactorException {
110                            try {
111                                    MQMsg2 msg = new MQMsg2();
112                                    msg.setMessageData(message.getContent().trim().getBytes());
113                                    MQPutMessageOptions pmo = new MQPutMessageOptions();
114                                    getOutgoingQueue().putMsg2(msg, pmo);
115                            } catch (MQException me) {
116                                    throw new MactorException("MQ Series error occured with completion code '" + me.completionCode + "' and reeason code '" + me.reasonCode + "' while attempting to send to channel  '"
117                                                    + channel + "'");
118                            }
119                    }
120                    private final static String RFH2_FORMAT = "MQHRF2  ";
121                    public Message getMessage() throws MactorException {
122                            try {
123                                    MQMessage rcvMessage = new MQMessage();
124                                    MQGetMessageOptions gmo = new MQGetMessageOptions();
125                                    gmo.options = MQC.MQGMO_WAIT;
126                                    gmo.waitInterval = 500;// 500ms
127                                    getIncomingQueue().get(rcvMessage, gmo);
128                                    int headerLength = 0;
129                                    if (RFH2_FORMAT.equals(rcvMessage.format)) { // Skip RFH2
130                                                                                                                                    // header
131                                            rcvMessage.seek(4);
132                                            int version = rcvMessage.readInt();
133                                            headerLength = rcvMessage.readInt();
134                                            int encoding = rcvMessage.readInt();
135                                            int codedCharSetId = rcvMessage.readInt();
136                                            rcvMessage.skipBytes(8);
137                                            int flags = rcvMessage.readInt();
138                                            int nameValueCodedCharSetId = rcvMessage.readInt();
139                                            rcvMessage.seek(headerLength);
140                                    }
141                                    int len = rcvMessage.getTotalMessageLength() - headerLength;
142                                    if (len > 0) {
143                                            byte[] buffer = new byte[len];
144                                            rcvMessage.readFully(buffer);
145                                            return Message.createMessage(new ByteArrayInputStream(buffer));
146                                    }
147                            } catch (MQException me) {
148                                    if (me.reasonCode == 2033) {// no messages
149                                    } else
150                                            throw new MactorException("MQ Series error occured with completion code '" + me.completionCode + "' and reeason code '" + me.reasonCode
151                                                            + "' while attempting to read from channel '" + channel + "'");
152                            } catch (IOException ioe) {
153                                    ioe.printStackTrace();
154                                    throw new MactorException("An IOException occured while trying to read a message from channel '" + channel + "'", ioe);
155                            }
156                            return null;
157                    }
158                    private Hashtable buildProperties() {
159                            Hashtable props = new Hashtable();
160                            props.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
161                            props.put(MQC.HOST_NAME_PROPERTY, mqHost);
162                            props.put(MQC.PORT_PROPERTY, new Integer(mqPort));
163                            props.put(MQC.CHANNEL_PROPERTY, mqChannel);
164                            return props;
165                    }
166                    private MQQueueManager getManager() throws MQException {
167                            if (mqQueueManager == null) {
168                                    mqQueueManager = new MQQueueManager(mqQueueManagerName, buildProperties());
169                            }
170                            return mqQueueManager;
171                    }
172                    private MQQueue getOutgoingQueue() throws MQException {
173                            if (mqOutgoingQueue == null) {
174                                    mqOutgoingQueue = getManager().accessQueue(mqQueue, MQC.MQOO_OUTPUT);
175                            }
176                            return mqOutgoingQueue;
177                    }
178                    private MQQueue getIncomingQueue() throws MQException {
179                            if (mqIncomingQueue == null) {
180                                    mqIncomingQueue = getManager().accessQueue(mqQueue, MQC.MQOO_INPUT_AS_Q_DEF);
181                            }
182                            return mqIncomingQueue;
183                    }
184                    @Override
185                    protected void finalize() throws Throwable {
186                            super.finalize();
187                            close();
188                    }
189                    public void close() {
190                            try {
191                                    if (mqOutgoingQueue != null)
192                                            mqOutgoingQueue.close();
193                                    if (mqIncomingQueue != null)
194                                            mqIncomingQueue.close();
195                                    mqQueueManager.disconnect();
196                            } catch (MQException ex) {
197                                    log.info("A WebSphere MQ Error occured : Completion Code " + ex.completionCode + " Reason Code " + ex.reasonCode);
198                            } catch (Exception ex) {
199                                    log.info("An IOException occured whilst writing to the message buffer: " + ex);
200                            }
201                    }
202            }
203    }