001 /******************************************************************************
002 * Copyright (C) MActor Developers. All rights reserved. *
003 * ---------------------------------------------------------------------------*
004 * This file is part of MActor. *
005 * *
006 * MActor is free software; you can redistribute it and/or modify *
007 * it under the terms of the GNU General Public License as published by *
008 * the Free Software Foundation; either version 2 of the License, or *
009 * (at your option) any later version. *
010 * *
011 * MActor is distributed in the hope that it will be useful, *
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014 * GNU General Public License for more details. *
015 * *
016 * You should have received a copy of the GNU General Public License *
017 * along with MActor; if not, write to the Free Software *
018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019 ******************************************************************************/
020 package org.mactor.framework;
021
022 import java.util.HashMap;
023 import java.util.LinkedList;
024 import java.util.List;
025 import java.util.Map;
026
027 import org.apache.log4j.Logger;
028 import org.mactor.brokers.Message;
029 import org.mactor.brokers.MessageBrokerManager;
030 import org.mactor.brokers.MessageSubscriber;
031 import org.mactor.framework.TestEvent.EventType;
032 import org.mactor.framework.commandexecutors.ActionCommandExecutor;
033 import org.mactor.framework.commandexecutors.ActionCommandExecutorFactory;
034 import org.mactor.framework.commandexecutors.MessageBuilderCommandExecutorFactory;
035 import org.mactor.framework.commandexecutors.MessageSelectorCommandExecutorFactory;
036 import org.mactor.framework.commandexecutors.ValueCommandExecutor;
037 import org.mactor.framework.commandexecutors.ValueCommandExecutorFactory;
038 import org.mactor.framework.spec.ActionSpec;
039 import org.mactor.framework.spec.ConditionSpec;
040 import org.mactor.framework.spec.LoopSpec;
041 import org.mactor.framework.spec.MessagePublishSpec;
042 import org.mactor.framework.spec.MessageReceiveSpec;
043 import org.mactor.framework.spec.MessageRespondSpec;
044 import org.mactor.framework.spec.MessageSubscribeSpec;
045 import org.mactor.framework.spec.SpecNode;
046 import org.mactor.framework.spec.TestSpec;
047 import org.mactor.framework.spec.ValueSpec;
048
049 /**
050 * The test-runner implementation
051 *
052 * @author Lars Ivar Almli
053 */
054 public class TestEngine {
055 protected static Logger log = Logger.getLogger(TestEngine.class);
056 protected static Logger test_timing_log = Logger.getLogger("test_timing");
057 private TestContextImpl context;
058 private TestFeedbackListener feedbackListener;
059 private MessageBrokerManager brokerManager;
060 private Map<String, MessageWaiter> messageWaiters = new HashMap<String, MessageWaiter>();
061 private TestSpec testSpec;
062 private int testDataIndex;
063 private String testInstanceId = AppUtil.getNextId("TI");
064 private String testRunInstanceId;
065 public void stop() {
066 log.debug("Stopping test engine instance");
067 cleanUp();
068 }
069 private synchronized void cleanUp() {
070 try {
071 for (MessageWaiter mw : messageWaiters.values()) {
072 try {
073 this.brokerManager.unsubscribe(mw.getChannel(), mw);
074 mw.stop();
075 } catch (Exception e) {
076 log.error("Failed to unsubscribe", e);
077 }
078 mw.stop();
079 }
080 messageWaiters.clear();
081 } catch (Exception e) {
082 log.error("Failed to stop test engine instance", e);
083 }
084 }
085 public TestEngine(String testRunInstanceId, TestContextImpl context, TestFeedbackListener feedbackListener) throws MactorException {
086 this.context = context;
087 this.testRunInstanceId = testRunInstanceId;
088 this.brokerManager = MessageBrokerManager.getInstance();
089 this.testSpec = context.getTestSpec();
090 this.feedbackListener = feedbackListener;
091 }
092 private void reportNodeStart(SpecNode node) {
093 feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.Start, testSpec, node, testDataIndex, null, true, null), context);
094 }
095 private void reportSuccessfulNodeEnd(SpecNode node, String output) {
096 feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.End, testSpec, node, testDataIndex, output, true, null), context);
097 }
098 private void reportFaultyNodeEnd(SpecNode node, String output, MactorException cause) {
099 feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.End, testSpec, node, testDataIndex, output, false, cause), context);
100 }
101 private void reportTerminated(boolean success, MactorException cause) {
102 feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.End, testSpec, testSpec, testDataIndex, null, success, cause), context);
103 }
104 private void reportTestStart() {
105 feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.Start, testSpec, testSpec, testDataIndex, null, true, null), context);
106 }
107 public void runTest(int testDataIndex) throws MactorException {
108 this.testDataIndex = testDataIndex;
109 if (testSpec.getDelayBeforeStartSeconds() != 0)
110 sleep(testSpec.getDelayBeforeStartSeconds() * 1000);
111 reportTestStart();
112 MactorException ex = null;
113 try {
114 try {
115 doNodes(testSpec.getSpecNodes());
116 } catch (MactorException e) {
117 ex = e;
118 throw e;
119 }
120 } finally {
121 cleanUp();
122 reportTerminated(ex == null, ex);
123 }
124 }
125 private void doNodes(List<SpecNode> nodes) throws MactorException {
126 for (SpecNode node : nodes) {
127 try {
128 reportNodeStart(node);
129 doNode(node);
130 reportSuccessfulNodeEnd(node, "Success");
131 } catch (MactorException ce) {
132 reportFaultyNodeEnd(node, "Error", ce);
133 throw ce;
134 } catch (RuntimeException e) {
135 MactorException re = new MactorException(e);
136 reportFaultyNodeEnd(node, "Runtime error", re);
137 throw re;
138 }
139 }
140 }
141 private void doNode(SpecNode node) throws MactorException {
142 if (log.isDebugEnabled())
143 log.debug("starting on node " + node.getName());
144 long nodeStartTime = 0;
145 Exception ex = null;
146 if (test_timing_log.isInfoEnabled()) {
147 nodeStartTime = System.currentTimeMillis();
148 }
149 try {
150 if (node instanceof MessageSubscribeSpec) {
151 doMessageSubscribeNode((MessageSubscribeSpec) node);
152 } else if (node instanceof MessagePublishSpec) {
153 doMessagePublishNode((MessagePublishSpec) node);
154 } else if (node instanceof MessageReceiveSpec) {
155 doMessageReceiveNode((MessageReceiveSpec) node);
156 } else if (node instanceof ValueSpec) {
157 doValueNode((ValueSpec) node);
158 } else if (node instanceof ActionSpec) {
159 doActionNode((ActionSpec) node);
160 } else if (node instanceof MessageRespondSpec) {
161 doMessageResponseNode((MessageRespondSpec) node);
162 } else if (node instanceof LoopSpec) {
163 doLoopNode((LoopSpec) node);
164 } else if (node instanceof ConditionSpec) {
165 doConditionNode((ConditionSpec) node);
166 } else {
167 throw new MactorException("Unsupported node:" + node.getName());
168 }
169 } catch (MactorException me) {
170 ex = me;
171 if (log.isDebugEnabled())
172 log.debug("%%%%% NODE: " + node.getName());
173 throw me;
174 } catch (RuntimeException re) {
175 ex = re;
176 throw re;
177 } finally {
178 if (test_timing_log.isInfoEnabled()) {
179 test_timing_log.info(testRunInstanceId + ";" + testInstanceId + ";" + node.getName() + ";" + (ex == null) + ";" + (System.currentTimeMillis() - nodeStartTime));
180 }
181 }
182 }
183 private void doMessageSubscribeNode(MessageSubscribeSpec spec) throws MactorException {
184 MessageWaiter mw = new MessageWaiter(spec.getChannel());
185 if (messageWaiters.containsKey(spec.getName()))
186 throw new RuntimeException("Unexpected state");
187 messageWaiters.put(spec.getName(), mw);
188 this.brokerManager.subscribe(spec.getChannel(), mw, MessageSelectorCommandExecutorFactory.createExecutor(context, spec.getMessageSelector()));
189 }
190 private void doMessagePublishNode(MessagePublishSpec spec) throws MactorException {
191 Message message = MessageBuilderCommandExecutorFactory.createExecutor(context, ((MessagePublishSpec) spec).getMessageBuilder()).buildMessage(context);
192 message.getMessageContextInfo().setNode(spec);
193 context.addOutgoingMessage(spec.getName(), message);
194 Message response = this.brokerManager.publish(spec.getChannel(), message);
195 if (response != null) {
196 context.addReceivedMessage(spec.getName(), response);
197 }
198 }
199 private void doMessageReceiveNode(MessageReceiveSpec spec) throws MactorException {
200 MessageWaiter mw = messageWaiters.get(spec.getMessageSubscribeNodeName());
201 mw.start(spec.getMaxTimeoutSeconds(), spec.getMinMessageCount(), spec.getMaxMessageCount(), spec.isBlockUntilTimeout());
202 while (true) {
203 IncomingMessage im = null;
204 try {
205 im = mw.getMessage();
206 if (im == null)
207 break;
208 im.getMessage().getMessageContextInfo().setNode(spec);
209 if (spec.hasResponseNode())
210 pushPendingMessage(im);
211 context.addReceivedMessage(spec.getName(), im.message);
212 doNodes(spec.getSpecNodes());
213 im.completed();
214 } catch (Throwable t) {
215 if (im != null)
216 im.canceled();
217 if (t instanceof MactorException)
218 throw (MactorException) t;
219 else
220 throw new MactorException(t);
221 }
222 }
223 }
224 private void doLoopNode(LoopSpec spec) throws MactorException {
225 int count = spec.getCount();
226 if (count == 0)
227 count = Integer.MAX_VALUE;
228 for (int i = 0; i < count; i++) {
229 doNodes(spec.getSpecNodes());
230 }
231 }
232 private void doConditionNode(ConditionSpec spec) throws MactorException {
233 if (Boolean.parseBoolean(context.substitute(spec.getExecute()))) {
234 doNodes(spec.getSpecNodes());
235 }
236 }
237 private void doMessageResponseNode(MessageRespondSpec spec) throws MactorException {
238 Message responseMessage = MessageBuilderCommandExecutorFactory.createExecutor(context, ((MessageRespondSpec) spec).getMessageBuilder()).buildMessage(context);
239 context.addOutgoingMessage(spec.getName(), responseMessage);
240 popPendingMessage().setResponseMessage(responseMessage);
241 }
242 private LinkedList<IncomingMessage> pendingMessages = new LinkedList<IncomingMessage>();
243 private IncomingMessage popPendingMessage() {
244 return pendingMessages.removeLast();
245 }
246 private void pushPendingMessage(IncomingMessage rw) {
247 pendingMessages.addLast(rw);
248 }
249 private void doValueNode(ValueSpec spec) throws MactorException {
250 ValueCommandExecutor vce = ValueCommandExecutorFactory.createExecutor(context, spec);
251 context.setValue(spec.getName(), vce.extractValue(context));
252 }
253 private void doActionNode(ActionSpec spec) throws MactorException {
254 ActionCommandExecutor vce = ActionCommandExecutorFactory.createExecutor(context, spec);
255 vce.perform(context);
256 }
257 public static class IncomingMessage {
258 Message message;
259 Message responseMessage;
260 boolean complete = false;
261 public IncomingMessage(Message message) {
262 this.message = message;
263 }
264 public void setResponseMessage(Message responseMessage) {
265 this.responseMessage = responseMessage;
266 }
267 private Object lock = new Object();
268 public void completed() {
269 synchronized (lock) {
270 complete = true;
271 message.consume();
272 lock.notifyAll();
273 }
274 }
275 public void canceled() {
276 synchronized (lock) {
277 complete = true;
278 lock.notifyAll();
279 }
280 }
281 Message waitForCompletion() throws InterruptedException {
282 synchronized (lock) {
283 if (!complete)
284 lock.wait();
285 }
286 return responseMessage;
287 }
288 public Message getMessage() {
289 return message;
290 }
291 }
292 public static class MessageWaiter implements MessageSubscriber {
293 private LinkedList<IncomingMessage> messages = new LinkedList<IncomingMessage>();
294 private int maxMessageCount;
295 private int minMessageCount;
296 private int maxTimeoutSeconds;
297 private boolean blockUntilTimeout;
298 private Object lock = new Object();
299 private int count;
300 private int returnedCount;
301 private long timeStart;
302 private String channel;
303 private boolean stopped = false;
304 public String getChannel() {
305 return channel;
306 }
307 public MessageWaiter(String channel) {
308 this.channel = channel;
309 }
310 public void stop() {
311 synchronized (lock) {
312 stopped = true;
313 lock.notifyAll();
314 }
315 for (IncomingMessage m : messages) {
316 m.canceled();
317 }
318 }
319 public void start(int maxTimeoutSeconds, int minMessageCount, int maxMessageCount, boolean blockUntilTimeout) {
320 this.returnedCount = 0;
321 this.maxTimeoutSeconds = maxTimeoutSeconds;
322 this.minMessageCount = minMessageCount;
323 this.maxMessageCount = maxMessageCount;
324 this.blockUntilTimeout = blockUntilTimeout;
325 this.timeStart = System.currentTimeMillis();
326 if (maxTimeoutSeconds == 0)
327 this.maxTimeoutSeconds = 60 * 60 * 24 * 365; // a year
328 if (maxMessageCount == 0)
329 this.maxMessageCount = 1000000000; // one billion
330 }
331 public Message onMessage(Message message) {
332 try {
333 IncomingMessage im = new IncomingMessage(message);
334 synchronized (lock) {
335 if (stopped) {
336 lock.notifyAll();
337 return null;
338 }
339 this.messages.add(im);
340 count++;
341 lock.notifyAll();
342 }
343 return im.waitForCompletion();
344 } catch (Exception e) {
345 e.printStackTrace();
346 return null;
347 }
348 }
349 public IncomingMessage getMessage() throws MessageTimoutException {
350 while (!stopped) {
351 if (this.returnedCount >= this.maxMessageCount)
352 return null;
353 if (!blockUntilTimeout && this.returnedCount >= this.minMessageCount)
354 return null;
355 long timeLeft = maxTimeoutSeconds * 1000 - (System.currentTimeMillis() - timeStart);
356 if (timeLeft <= 0) {
357 if (this.returnedCount >= this.minMessageCount)
358 return null;
359 else
360 throw new MessageTimoutException("Timout reached after receiving " + this.returnedCount + " messages");
361 }
362 try {
363 synchronized (lock) {
364 if (messages.size() > 0) {
365 IncomingMessage m = messages.removeLast();
366 returnedCount++;
367 return m;
368 } else if (timeLeft > 0) {
369 lock.wait(timeLeft);
370 }
371 if (messages.size() > 0) {
372 IncomingMessage m = messages.removeLast();
373 returnedCount++;
374 return m;
375 }
376 }
377 } catch (InterruptedException ie) {
378 throw new RuntimeException(ie);
379 }
380 }
381 log.debug("Message Worker was stopped");
382 return null;
383 }
384 }
385 private void sleep(long millis) {
386 try {
387 Thread.sleep(millis);
388 } catch (InterruptedException ie) {
389 }
390 }
391 }