001 /****************************************************************************** 002 * Copyright (C) MActor Developers. All rights reserved. * 003 * ---------------------------------------------------------------------------* 004 * This file is part of MActor. * 005 * * 006 * MActor is free software; you can redistribute it and/or modify * 007 * it under the terms of the GNU General Public License as published by * 008 * the Free Software Foundation; either version 2 of the License, or * 009 * (at your option) any later version. * 010 * * 011 * MActor is distributed in the hope that it will be useful, * 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 014 * GNU General Public License for more details. * 015 * * 016 * You should have received a copy of the GNU General Public License * 017 * along with MActor; if not, write to the Free Software * 018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * 019 ******************************************************************************/ 020 package org.mactor.brokers.mqseries; 021 022 import java.io.ByteArrayInputStream; 023 import java.io.IOException; 024 import java.util.Hashtable; 025 026 import org.apache.log4j.Logger; 027 import org.mactor.brokers.Message; 028 import org.mactor.framework.MactorException; 029 import org.mactor.framework.ParseUtil; 030 import org.mactor.framework.spec.MessageBrokersConfig.MessageBrokerConfig.ChannelConfig; 031 032 import com.ibm.mq.MQC; 033 import com.ibm.mq.MQException; 034 import com.ibm.mq.MQGetMessageOptions; 035 import com.ibm.mq.MQMessage; 036 import com.ibm.mq.MQQueue; 037 import com.ibm.mq.MQQueueManager; 038 039 /** 040 * 041 * @author Lars Ivar Almli 042 */ 043 public class MqSeriesQueueBrowser { 044 protected static Logger log = Logger.getLogger(MqSeriesQueueBrowser.class); 045 private MQQueue mqQueue; 046 private MQQueueManager mqQueueManager; 047 private String channel; 048 MQMessage mqMessage = new MQMessage(); 049 MQGetMessageOptions gmo = new MQGetMessageOptions(); 050 static { 051 MQException.log = null; 052 } 053 public MqSeriesQueueBrowser(ChannelConfig cf) throws MactorException { 054 this.channel = cf.getName(); 055 Hashtable props = new Hashtable(); 056 try { 057 props.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT); 058 props.put(MQC.HOST_NAME_PROPERTY, cf.getRequieredValue("host")); 059 props.put(MQC.PORT_PROPERTY, new Integer(ParseUtil.tryParseIntVal(cf.getRequieredValue("port")))); 060 props.put(MQC.CHANNEL_PROPERTY, cf.getRequieredValue("channel")); 061 mqQueueManager = new MQQueueManager(cf.getRequieredValue("queue_manager"), props); 062 mqQueue = mqQueueManager.accessQueue(cf.getRequieredValue("queue"), MQC.MQOO_BROWSE); 063 } catch (MQException e) { 064 throw new MactorException(e); 065 } 066 } 067 public Message browseFirstMessage() throws MactorException { 068 return browseMessage(MQC.MQGMO_NO_WAIT | MQC.MQGMO_BROWSE_FIRST); 069 } 070 public Message browseNextMessage() throws MactorException { 071 return browseMessage(MQC.MQGMO_NO_WAIT | MQC.MQGMO_BROWSE_NEXT); 072 } 073 public void consumeMessage() throws MactorException { 074 try { 075 gmo.options = MQC.MQGMO_MSG_UNDER_CURSOR; 076 mqQueue.get(mqMessage, gmo); 077 } catch (MQException me) { 078 throw new MactorException("Failed to consume the message. Error:" + me.getMessage(), me); 079 } 080 } 081 private Message browseMessage(int options) throws MactorException { 082 try { 083 mqMessage.clearMessage(); 084 mqMessage.correlationId = MQC.MQCI_NONE; 085 mqMessage.messageId = MQC.MQMI_NONE; 086 gmo.options = options; 087 mqQueue.get(mqMessage, gmo); 088 int len = mqMessage.getTotalMessageLength(); 089 if (len > 0) { 090 byte[] buffer = new byte[mqMessage.getTotalMessageLength()]; 091 mqMessage.readFully(buffer); 092 return Message.createMessage(new ByteArrayInputStream(buffer)); 093 } 094 } catch (MQException me) { 095 if (me.reasonCode == 2033) {// no messages 096 } else 097 throw new MactorException("MQ Series error occured with completion code '" + me.completionCode + "' and reeason code '" + me.reasonCode + "' while attempting to read from channel '" 098 + channel + "'"); 099 } catch (IOException ioe) { 100 ioe.printStackTrace(); 101 throw new MactorException("An IOException occured while trying to browse a message from channel '" + channel + "'", ioe); 102 } 103 return null; 104 } 105 @Override 106 protected void finalize() throws Throwable { 107 super.finalize(); 108 close(); 109 } 110 public void close() { 111 try { 112 if (mqQueue != null) 113 mqQueue.close(); 114 mqQueueManager.disconnect(); 115 } catch (MQException ex) { 116 log.info("A WebSphere MQ Error occured : Completion Code " + ex.completionCode + " Reason Code " + ex.reasonCode); 117 } catch (Exception ex) { 118 log.info("An IOException occured whilst writing to the message buffer: " + ex); 119 } 120 } 121 }