001 /******************************************************************************
002 * Copyright (C) MActor Developers. All rights reserved. *
003 * ---------------------------------------------------------------------------*
004 * This file is part of MActor. *
005 * *
006 * MActor is free software; you can redistribute it and/or modify *
007 * it under the terms of the GNU General Public License as published by *
008 * the Free Software Foundation; either version 2 of the License, or *
009 * (at your option) any later version. *
010 * *
011 * MActor is distributed in the hope that it will be useful, *
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014 * GNU General Public License for more details. *
015 * *
016 * You should have received a copy of the GNU General Public License *
017 * along with MActor; if not, write to the Free Software *
018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019 ******************************************************************************/
020 package org.mactor.brokers.mqseries;
021
022 import java.io.ByteArrayInputStream;
023 import java.io.IOException;
024 import java.util.Hashtable;
025
026 import org.apache.log4j.Logger;
027 import org.mactor.brokers.Message;
028 import org.mactor.framework.MactorException;
029 import org.mactor.framework.ParseUtil;
030 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig;
031
032 import com.ibm.mq.MQC;
033 import com.ibm.mq.MQException;
034 import com.ibm.mq.MQGetMessageOptions;
035 import com.ibm.mq.MQMessage;
036 import com.ibm.mq.MQQueue;
037 import com.ibm.mq.MQQueueManager;
038
039 /**
040 *
041 * @author Lars Ivar Almli
042 */
043 public class MqSeriesQueueBrowser {
044 protected static Logger log = Logger.getLogger(MqSeriesQueueBrowser.class);
045 private MQQueue mqQueue;
046 private MQQueueManager mqQueueManager;
047 private String channel;
048 MQMessage mqMessage = new MQMessage();
049 MQGetMessageOptions gmo = new MQGetMessageOptions();
050 static {
051 MQException.log = null;
052 }
053 public MqSeriesQueueBrowser(ChannelConfig cf) throws MactorException {
054 this.channel = cf.getName();
055 Hashtable props = new Hashtable();
056 try {
057 props.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
058 props.put(MQC.HOST_NAME_PROPERTY, cf.getRequieredValue("host"));
059 props.put(MQC.PORT_PROPERTY, new Integer(ParseUtil.tryParseIntVal(cf.getRequieredValue("port"))));
060 props.put(MQC.CHANNEL_PROPERTY, cf.getRequieredValue("channel"));
061 mqQueueManager = new MQQueueManager(cf.getRequieredValue("queue_manager"), props);
062 mqQueue = mqQueueManager.accessQueue(cf.getRequieredValue("queue"), MQC.MQOO_BROWSE);
063 } catch (MQException e) {
064 throw new MactorException(e);
065 }
066 }
067 public Message browseFirstMessage() throws MactorException {
068 return browseMessage(MQC.MQGMO_NO_WAIT | MQC.MQGMO_BROWSE_FIRST);
069 }
070 public Message browseNextMessage() throws MactorException {
071 return browseMessage(MQC.MQGMO_NO_WAIT | MQC.MQGMO_BROWSE_NEXT);
072 }
073 public void consumeMessage() throws MactorException {
074 try {
075 gmo.options = MQC.MQGMO_MSG_UNDER_CURSOR;
076 mqQueue.get(mqMessage, gmo);
077 } catch (MQException me) {
078 throw new MactorException("Failed to consume the message. Error:" + me.getMessage(), me);
079 }
080 }
081 private Message browseMessage(int options) throws MactorException {
082 try {
083 mqMessage.clearMessage();
084 mqMessage.correlationId = MQC.MQCI_NONE;
085 mqMessage.messageId = MQC.MQMI_NONE;
086 gmo.options = options;
087 mqQueue.get(mqMessage, gmo);
088 int len = mqMessage.getTotalMessageLength();
089 if (len > 0) {
090 byte[] buffer = new byte[mqMessage.getTotalMessageLength()];
091 mqMessage.readFully(buffer);
092 return Message.createMessage(new ByteArrayInputStream(buffer));
093 }
094 } catch (MQException me) {
095 if (me.reasonCode == 2033) {// no messages
096 } else
097 throw new MactorException("MQ Series error occured with completion code '" + me.completionCode + "' and reeason code '" + me.reasonCode + "' while attempting to read from channel '"
098 + channel + "'");
099 } catch (IOException ioe) {
100 ioe.printStackTrace();
101 throw new MactorException("An IOException occured while trying to browse a message from channel '" + channel + "'", ioe);
102 }
103 return null;
104 }
105 @Override
106 protected void finalize() throws Throwable {
107 super.finalize();
108 close();
109 }
110 public void close() {
111 try {
112 if (mqQueue != null)
113 mqQueue.close();
114 mqQueueManager.disconnect();
115 } catch (MQException ex) {
116 log.info("A WebSphere MQ Error occured : Completion Code " + ex.completionCode + " Reason Code " + ex.reasonCode);
117 } catch (Exception ex) {
118 log.info("An IOException occured whilst writing to the message buffer: " + ex);
119 }
120 }
121 }