001 /****************************************************************************** 002 * Copyright (C) MActor Developers. All rights reserved. * 003 * ---------------------------------------------------------------------------* 004 * This file is part of MActor. * 005 * * 006 * MActor is free software; you can redistribute it and/or modify * 007 * it under the terms of the GNU General Public License as published by * 008 * the Free Software Foundation; either version 2 of the License, or * 009 * (at your option) any later version. * 010 * * 011 * MActor is distributed in the hope that it will be useful, * 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 014 * GNU General Public License for more details. * 015 * * 016 * You should have received a copy of the GNU General Public License * 017 * along with MActor; if not, write to the Free Software * 018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * 019 ******************************************************************************/ 020 package org.mactor.framework; 021 022 import java.util.HashMap; 023 import java.util.LinkedList; 024 import java.util.List; 025 import java.util.Map; 026 027 import org.apache.log4j.Logger; 028 import org.mactor.brokers.Message; 029 import org.mactor.brokers.MessageBrokerManager; 030 import org.mactor.brokers.MessageSubscriber; 031 import org.mactor.framework.TestEvent.EventType; 032 import org.mactor.framework.commandexecutors.ActionCommandExecutor; 033 import org.mactor.framework.commandexecutors.ActionCommandExecutorFactory; 034 import org.mactor.framework.commandexecutors.MessageBuilderCommandExecutorFactory; 035 import org.mactor.framework.commandexecutors.MessageSelectorCommandExecutorFactory; 036 import org.mactor.framework.commandexecutors.ValueCommandExecutor; 037 import org.mactor.framework.commandexecutors.ValueCommandExecutorFactory; 038 import org.mactor.framework.spec.ActionSpec; 039 import org.mactor.framework.spec.ConditionSpec; 040 import org.mactor.framework.spec.LoopSpec; 041 import org.mactor.framework.spec.MessagePublishSpec; 042 import org.mactor.framework.spec.MessageReceiveSpec; 043 import org.mactor.framework.spec.MessageRespondSpec; 044 import org.mactor.framework.spec.MessageSubscribeSpec; 045 import org.mactor.framework.spec.SpecNode; 046 import org.mactor.framework.spec.TestSpec; 047 import org.mactor.framework.spec.ValueSpec; 048 049 /** 050 * The test-runner implementation 051 * 052 * @author Lars Ivar Almli 053 */ 054 public class TestEngine { 055 protected static Logger log = Logger.getLogger(TestEngine.class); 056 protected static Logger test_timing_log = Logger.getLogger("test_timing"); 057 private TestContextImpl context; 058 private TestFeedbackListener feedbackListener; 059 private MessageBrokerManager brokerManager; 060 private Map<String, MessageWaiter> messageWaiters = new HashMap<String, MessageWaiter>(); 061 private TestSpec testSpec; 062 private int testDataIndex; 063 private String testInstanceId = AppUtil.getNextId("TI"); 064 private String testRunInstanceId; 065 public void stop() { 066 log.debug("Stopping test engine instance"); 067 cleanUp(); 068 } 069 private synchronized void cleanUp() { 070 try { 071 for (MessageWaiter mw : messageWaiters.values()) { 072 try { 073 this.brokerManager.unsubscribe(mw.getChannel(), mw); 074 mw.stop(); 075 } catch (Exception e) { 076 log.error("Failed to unsubscribe", e); 077 } 078 mw.stop(); 079 } 080 messageWaiters.clear(); 081 } catch (Exception e) { 082 log.error("Failed to stop test engine instance", e); 083 } 084 } 085 public TestEngine(String testRunInstanceId, TestContextImpl context, TestFeedbackListener feedbackListener) throws MactorException { 086 this.context = context; 087 this.testRunInstanceId = testRunInstanceId; 088 this.brokerManager = MessageBrokerManager.getInstance(); 089 this.testSpec = context.getTestSpec(); 090 this.feedbackListener = feedbackListener; 091 } 092 private void reportNodeStart(SpecNode node) { 093 feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.Start, testSpec, node, testDataIndex, null, true, null), context); 094 } 095 private void reportSuccessfulNodeEnd(SpecNode node, String output) { 096 feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.End, testSpec, node, testDataIndex, output, true, null), context); 097 } 098 private void reportFaultyNodeEnd(SpecNode node, String output, MactorException cause) { 099 feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.End, testSpec, node, testDataIndex, output, false, cause), context); 100 } 101 private void reportTerminated(boolean success, MactorException cause) { 102 feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.End, testSpec, testSpec, testDataIndex, null, success, cause), context); 103 } 104 private void reportTestStart() { 105 feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.Start, testSpec, testSpec, testDataIndex, null, true, null), context); 106 } 107 public void runTest(int testDataIndex) throws MactorException { 108 this.testDataIndex = testDataIndex; 109 if (testSpec.getDelayBeforeStartSeconds() != 0) 110 sleep(testSpec.getDelayBeforeStartSeconds() * 1000); 111 reportTestStart(); 112 MactorException ex = null; 113 try { 114 try { 115 doNodes(testSpec.getSpecNodes()); 116 } catch (MactorException e) { 117 ex = e; 118 throw e; 119 } 120 } finally { 121 cleanUp(); 122 reportTerminated(ex == null, ex); 123 } 124 } 125 private void doNodes(List<SpecNode> nodes) throws MactorException { 126 for (SpecNode node : nodes) { 127 try { 128 reportNodeStart(node); 129 doNode(node); 130 reportSuccessfulNodeEnd(node, "Success"); 131 } catch (MactorException ce) { 132 reportFaultyNodeEnd(node, "Error", ce); 133 throw ce; 134 } catch (RuntimeException e) { 135 MactorException re = new MactorException(e); 136 reportFaultyNodeEnd(node, "Runtime error", re); 137 throw re; 138 } 139 } 140 } 141 private void doNode(SpecNode node) throws MactorException { 142 if (log.isDebugEnabled()) 143 log.debug("starting on node " + node.getName()); 144 long nodeStartTime = 0; 145 Exception ex = null; 146 if (test_timing_log.isInfoEnabled()) { 147 nodeStartTime = System.currentTimeMillis(); 148 } 149 try { 150 if (node instanceof MessageSubscribeSpec) { 151 doMessageSubscribeNode((MessageSubscribeSpec) node); 152 } else if (node instanceof MessagePublishSpec) { 153 doMessagePublishNode((MessagePublishSpec) node); 154 } else if (node instanceof MessageReceiveSpec) { 155 doMessageReceiveNode((MessageReceiveSpec) node); 156 } else if (node instanceof ValueSpec) { 157 doValueNode((ValueSpec) node); 158 } else if (node instanceof ActionSpec) { 159 doActionNode((ActionSpec) node); 160 } else if (node instanceof MessageRespondSpec) { 161 doMessageResponseNode((MessageRespondSpec) node); 162 } else if (node instanceof LoopSpec) { 163 doLoopNode((LoopSpec) node); 164 } else if (node instanceof ConditionSpec) { 165 doConditionNode((ConditionSpec) node); 166 } else { 167 throw new MactorException("Unsupported node:" + node.getName()); 168 } 169 } catch (MactorException me) { 170 ex = me; 171 if (log.isDebugEnabled()) 172 log.debug("%%%%% NODE: " + node.getName()); 173 throw me; 174 } catch (RuntimeException re) { 175 ex = re; 176 throw re; 177 } finally { 178 if (test_timing_log.isInfoEnabled()) { 179 test_timing_log.info(testRunInstanceId + ";" + testInstanceId + ";" + node.getName() + ";" + (ex == null) + ";" + (System.currentTimeMillis() - nodeStartTime)); 180 } 181 } 182 } 183 private void doMessageSubscribeNode(MessageSubscribeSpec spec) throws MactorException { 184 MessageWaiter mw = new MessageWaiter(spec.getChannel()); 185 if (messageWaiters.containsKey(spec.getName())) 186 throw new RuntimeException("Unexpected state"); 187 messageWaiters.put(spec.getName(), mw); 188 this.brokerManager.subscribe(spec.getChannel(), mw, MessageSelectorCommandExecutorFactory.createExecutor(context, spec.getMessageSelector())); 189 } 190 private void doMessagePublishNode(MessagePublishSpec spec) throws MactorException { 191 Message message = MessageBuilderCommandExecutorFactory.createExecutor(context, ((MessagePublishSpec) spec).getMessageBuilder()).buildMessage(context); 192 message.getMessageContextInfo().setNode(spec); 193 context.addOutgoingMessage(spec.getName(), message); 194 Message response = this.brokerManager.publish(spec.getChannel(), message); 195 if (response != null) { 196 context.addReceivedMessage(spec.getName(), response); 197 } 198 } 199 private void doMessageReceiveNode(MessageReceiveSpec spec) throws MactorException { 200 MessageWaiter mw = messageWaiters.get(spec.getMessageSubscribeNodeName()); 201 mw.start(spec.getMaxTimeoutSeconds(), spec.getMinMessageCount(), spec.getMaxMessageCount(), spec.isBlockUntilTimeout()); 202 while (true) { 203 IncomingMessage im = null; 204 try { 205 im = mw.getMessage(); 206 if (im == null) 207 break; 208 im.getMessage().getMessageContextInfo().setNode(spec); 209 if (spec.hasResponseNode()) 210 pushPendingMessage(im); 211 context.addReceivedMessage(spec.getName(), im.message); 212 doNodes(spec.getSpecNodes()); 213 im.completed(); 214 } catch (Throwable t) { 215 if (im != null) 216 im.canceled(); 217 if (t instanceof MactorException) 218 throw (MactorException) t; 219 else 220 throw new MactorException(t); 221 } 222 } 223 } 224 private void doLoopNode(LoopSpec spec) throws MactorException { 225 int count = spec.getCount(); 226 if (count == 0) 227 count = Integer.MAX_VALUE; 228 for (int i = 0; i < count; i++) { 229 doNodes(spec.getSpecNodes()); 230 } 231 } 232 private void doConditionNode(ConditionSpec spec) throws MactorException { 233 if (Boolean.parseBoolean(context.substitute(spec.getExecute()))) { 234 doNodes(spec.getSpecNodes()); 235 } 236 } 237 private void doMessageResponseNode(MessageRespondSpec spec) throws MactorException { 238 Message responseMessage = MessageBuilderCommandExecutorFactory.createExecutor(context, ((MessageRespondSpec) spec).getMessageBuilder()).buildMessage(context); 239 context.addOutgoingMessage(spec.getName(), responseMessage); 240 popPendingMessage().setResponseMessage(responseMessage); 241 } 242 private LinkedList<IncomingMessage> pendingMessages = new LinkedList<IncomingMessage>(); 243 private IncomingMessage popPendingMessage() { 244 return pendingMessages.removeLast(); 245 } 246 private void pushPendingMessage(IncomingMessage rw) { 247 pendingMessages.addLast(rw); 248 } 249 private void doValueNode(ValueSpec spec) throws MactorException { 250 ValueCommandExecutor vce = ValueCommandExecutorFactory.createExecutor(context, spec); 251 context.setValue(spec.getName(), vce.extractValue(context)); 252 } 253 private void doActionNode(ActionSpec spec) throws MactorException { 254 ActionCommandExecutor vce = ActionCommandExecutorFactory.createExecutor(context, spec); 255 vce.perform(context); 256 } 257 public static class IncomingMessage { 258 Message message; 259 Message responseMessage; 260 boolean complete = false; 261 public IncomingMessage(Message message) { 262 this.message = message; 263 } 264 public void setResponseMessage(Message responseMessage) { 265 this.responseMessage = responseMessage; 266 } 267 private Object lock = new Object(); 268 public void completed() { 269 synchronized (lock) { 270 complete = true; 271 message.consume(); 272 lock.notifyAll(); 273 } 274 } 275 public void canceled() { 276 synchronized (lock) { 277 complete = true; 278 lock.notifyAll(); 279 } 280 } 281 Message waitForCompletion() throws InterruptedException { 282 synchronized (lock) { 283 if (!complete) 284 lock.wait(); 285 } 286 return responseMessage; 287 } 288 public Message getMessage() { 289 return message; 290 } 291 } 292 public static class MessageWaiter implements MessageSubscriber { 293 private LinkedList<IncomingMessage> messages = new LinkedList<IncomingMessage>(); 294 private int maxMessageCount; 295 private int minMessageCount; 296 private int maxTimeoutSeconds; 297 private boolean blockUntilTimeout; 298 private Object lock = new Object(); 299 private int count; 300 private int returnedCount; 301 private long timeStart; 302 private String channel; 303 private boolean stopped = false; 304 public String getChannel() { 305 return channel; 306 } 307 public MessageWaiter(String channel) { 308 this.channel = channel; 309 } 310 public void stop() { 311 synchronized (lock) { 312 stopped = true; 313 lock.notifyAll(); 314 } 315 for (IncomingMessage m : messages) { 316 m.canceled(); 317 } 318 } 319 public void start(int maxTimeoutSeconds, int minMessageCount, int maxMessageCount, boolean blockUntilTimeout) { 320 this.returnedCount = 0; 321 this.maxTimeoutSeconds = maxTimeoutSeconds; 322 this.minMessageCount = minMessageCount; 323 this.maxMessageCount = maxMessageCount; 324 this.blockUntilTimeout = blockUntilTimeout; 325 this.timeStart = System.currentTimeMillis(); 326 if (maxTimeoutSeconds == 0) 327 this.maxTimeoutSeconds = 60 * 60 * 24 * 365; // a year 328 if (maxMessageCount == 0) 329 this.maxMessageCount = 1000000000; // one billion 330 } 331 public Message onMessage(Message message) { 332 try { 333 IncomingMessage im = new IncomingMessage(message); 334 synchronized (lock) { 335 if (stopped) { 336 lock.notifyAll(); 337 return null; 338 } 339 this.messages.add(im); 340 count++; 341 lock.notifyAll(); 342 } 343 return im.waitForCompletion(); 344 } catch (Exception e) { 345 e.printStackTrace(); 346 return null; 347 } 348 } 349 public IncomingMessage getMessage() throws MessageTimoutException { 350 while (!stopped) { 351 if (this.returnedCount >= this.maxMessageCount) 352 return null; 353 if (!blockUntilTimeout && this.returnedCount >= this.minMessageCount) 354 return null; 355 long timeLeft = maxTimeoutSeconds * 1000 - (System.currentTimeMillis() - timeStart); 356 if (timeLeft <= 0) { 357 if (this.returnedCount >= this.minMessageCount) 358 return null; 359 else 360 throw new MessageTimoutException("Timout reached after receiving " + this.returnedCount + " messages"); 361 } 362 try { 363 synchronized (lock) { 364 if (messages.size() > 0) { 365 IncomingMessage m = messages.removeLast(); 366 returnedCount++; 367 return m; 368 } else if (timeLeft > 0) { 369 lock.wait(timeLeft); 370 } 371 if (messages.size() > 0) { 372 IncomingMessage m = messages.removeLast(); 373 returnedCount++; 374 return m; 375 } 376 } 377 } catch (InterruptedException ie) { 378 throw new RuntimeException(ie); 379 } 380 } 381 log.debug("Message Worker was stopped"); 382 return null; 383 } 384 } 385 private void sleep(long millis) { 386 try { 387 Thread.sleep(millis); 388 } catch (InterruptedException ie) { 389 } 390 } 391 }