001    /******************************************************************************
002     * Copyright (C) MActor Developers. All rights reserved.                        *
003     * ---------------------------------------------------------------------------*
004     * This file is part of MActor.                                               *
005     *                                                                            *
006     * MActor is free software; you can redistribute it and/or modify             *
007     * it under the terms of the GNU General Public License as published by       *
008     * the Free Software Foundation; either version 2 of the License, or          *
009     * (at your option) any later version.                                        *
010     *                                                                            *
011     * MActor is distributed in the hope that it will be useful,                  *
012     * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
013     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
014     * GNU General Public License for more details.                               *
015     *                                                                            *
016     * You should have received a copy of the GNU General Public License          *
017     * along with MActor; if not, write to the Free Software                      *
018     * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA *
019     ******************************************************************************/
020    package org.mactor.brokers.mqseries;
021    
022    import java.io.ByteArrayInputStream;
023    import java.io.IOException;
024    import java.util.Hashtable;
025    
026    import org.apache.log4j.Logger;
027    import org.mactor.brokers.Message;
028    import org.mactor.framework.MactorException;
029    import org.mactor.framework.ParseUtil;
030    import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
031    
032    import com.ibm.mq.MQC;
033    import com.ibm.mq.MQException;
034    import com.ibm.mq.MQGetMessageOptions;
035    import com.ibm.mq.MQMessage;
036    import com.ibm.mq.MQQueue;
037    import com.ibm.mq.MQQueueManager;
038    
039    /**
040     * 
041     * @author Lars Ivar Almli
042     */
043    public class MqSeriesQueueBrowser {
044            protected static Logger log = Logger.getLogger(MqSeriesQueueBrowser.class);
045            private MQQueue mqQueue;
046            private MQQueueManager mqQueueManager;
047            private String channel;
048            MQMessage mqMessage = new MQMessage();
049            MQGetMessageOptions gmo = new MQGetMessageOptions();
050            static {
051                    MQException.log = null;
052            }
053            public MqSeriesQueueBrowser(ChannelConfig cf) throws MactorException {
054                    this.channel = cf.getName();
055                    Hashtable props = new Hashtable();
056                    try {
057                            props.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
058                            props.put(MQC.HOST_NAME_PROPERTY, cf.getRequieredValue("host"));
059                            props.put(MQC.PORT_PROPERTY, new Integer(ParseUtil.tryParseIntVal(cf.getRequieredValue("port"))));
060                            props.put(MQC.CHANNEL_PROPERTY, cf.getRequieredValue("channel"));
061                            mqQueueManager = new MQQueueManager(cf.getRequieredValue("queue_manager"), props);
062                            mqQueue = mqQueueManager.accessQueue(cf.getRequieredValue("queue"), MQC.MQOO_BROWSE);
063                    } catch (MQException e) {
064                            throw new MactorException(e);
065                    }
066            }
067            public Message browseFirstMessage() throws MactorException {
068                    return browseMessage(MQC.MQGMO_NO_WAIT | MQC.MQGMO_BROWSE_FIRST);
069            }
070            public Message browseNextMessage() throws MactorException {
071                    return browseMessage(MQC.MQGMO_NO_WAIT | MQC.MQGMO_BROWSE_NEXT);
072            }
073            public void consumeMessage() throws MactorException {
074                    try {
075                            gmo.options = MQC.MQGMO_MSG_UNDER_CURSOR;
076                            mqQueue.get(mqMessage, gmo);
077                    } catch (MQException me) {
078                            throw new MactorException("Failed to consume the message. Error:" + me.getMessage(), me);
079                    }
080            }
081            private Message browseMessage(int options) throws MactorException {
082                    try {
083                            mqMessage.clearMessage();
084                            mqMessage.correlationId = MQC.MQCI_NONE;
085                            mqMessage.messageId = MQC.MQMI_NONE;
086                            gmo.options = options;
087                            mqQueue.get(mqMessage, gmo);
088                            int len = mqMessage.getTotalMessageLength();
089                            if (len > 0) {
090                                    byte[] buffer = new byte[mqMessage.getTotalMessageLength()];
091                                    mqMessage.readFully(buffer);
092                                    return Message.createMessage(new ByteArrayInputStream(buffer));
093                            }
094                    } catch (MQException me) {
095                            if (me.reasonCode == 2033) {// no messages
096                            } else
097                                    throw new MactorException("MQ Series error occured with completion code '" + me.completionCode + "' and reeason code '" + me.reasonCode + "' while attempting to read from channel '"
098                                                    + channel + "'");
099                    } catch (IOException ioe) {
100                            ioe.printStackTrace();
101                            throw new MactorException("An IOException occured while trying to browse a message from channel '" + channel + "'", ioe);
102                    }
103                    return null;
104            }
105            @Override
106            protected void finalize() throws Throwable {
107                    super.finalize();
108                    close();
109            }
110            public void close() {
111                    try {
112                            if (mqQueue != null)
113                                    mqQueue.close();
114                            mqQueueManager.disconnect();
115                    } catch (MQException ex) {
116                            log.info("A WebSphere MQ Error occured : Completion Code " + ex.completionCode + " Reason Code " + ex.reasonCode);
117                    } catch (Exception ex) {
118                            log.info("An IOException occured whilst writing to the message buffer: " + ex);
119                    }
120            }
121    }