001    /******************************************************************************
002     * Copyright (C) MActor Developers. All rights reserved.                        *
003     * ---------------------------------------------------------------------------*
004     * This file is part of MActor.                                               *
005     *                                                                            *
006     * MActor is free software; you can redistribute it and/or modify             *
007     * it under the terms of the GNU General Public License as published by       *
008     * the Free Software Foundation; either version 2 of the License, or          *
009     * (at your option) any later version.                                        *
010     *                                                                            *
011     * MActor is distributed in the hope that it will be useful,                  *
012     * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
013     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
014     * GNU General Public License for more details.                               *
015     *                                                                            *
016     * You should have received a copy of the GNU General Public License          *
017     * along with MActor; if not, write to the Free Software                      *
018     * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA *
019     ******************************************************************************/
020    package org.mactor.framework;
021    
022    import java.util.HashMap;
023    import java.util.LinkedList;
024    import java.util.List;
025    import java.util.Map;
026    
027    import org.apache.log4j.Logger;
028    import org.mactor.brokers.Message;
029    import org.mactor.brokers.MessageBrokerManager;
030    import org.mactor.brokers.MessageSubscriber;
031    import org.mactor.framework.TestEvent.EventType;
032    import org.mactor.framework.commandexecutors.ActionCommandExecutor;
033    import org.mactor.framework.commandexecutors.ActionCommandExecutorFactory;
034    import org.mactor.framework.commandexecutors.MessageBuilderCommandExecutorFactory;
035    import org.mactor.framework.commandexecutors.MessageSelectorCommandExecutorFactory;
036    import org.mactor.framework.commandexecutors.ValueCommandExecutor;
037    import org.mactor.framework.commandexecutors.ValueCommandExecutorFactory;
038    import org.mactor.framework.spec.ActionSpec;
039    import org.mactor.framework.spec.ConditionSpec;
040    import org.mactor.framework.spec.LoopSpec;
041    import org.mactor.framework.spec.MessagePublishSpec;
042    import org.mactor.framework.spec.MessageReceiveSpec;
043    import org.mactor.framework.spec.MessageRespondSpec;
044    import org.mactor.framework.spec.MessageSubscribeSpec;
045    import org.mactor.framework.spec.SpecNode;
046    import org.mactor.framework.spec.TestSpec;
047    import org.mactor.framework.spec.ValueSpec;
048    
049    /**
050     * The test-runner implementation
051     * 
052     * @author Lars Ivar Almli
053     */
054    public class TestEngine {
055            protected static Logger log = Logger.getLogger(TestEngine.class);
056            protected static Logger test_timing_log = Logger.getLogger("test_timing");
057            private TestContextImpl context;
058            private TestFeedbackListener feedbackListener;
059            private MessageBrokerManager brokerManager;
060            private Map<String, MessageWaiter> messageWaiters = new HashMap<String, MessageWaiter>();
061            private TestSpec testSpec;
062            private int testDataIndex;
063            private String testInstanceId = AppUtil.getNextId("TI");
064            private String testRunInstanceId;
065            public void stop() {
066                    log.debug("Stopping test engine instance");
067                    cleanUp();
068            }
069            private synchronized void cleanUp() {
070                    try {
071                            for (MessageWaiter mw : messageWaiters.values()) {
072                                    try {
073                                            this.brokerManager.unsubscribe(mw.getChannel(), mw);
074                                            mw.stop();
075                                    } catch (Exception e) {
076                                            log.error("Failed to unsubscribe", e);
077                                    }
078                                    mw.stop();
079                            }
080                            messageWaiters.clear();
081                    } catch (Exception e) {
082                            log.error("Failed to stop test engine instance", e);
083                    }
084            }
085            public TestEngine(String testRunInstanceId, TestContextImpl context, TestFeedbackListener feedbackListener) throws MactorException {
086                    this.context = context;
087                    this.testRunInstanceId = testRunInstanceId;
088                    this.brokerManager = MessageBrokerManager.getInstance();
089                    this.testSpec = context.getTestSpec();
090                    this.feedbackListener = feedbackListener;
091            }
092            private void reportNodeStart(SpecNode node) {
093                    feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.Start, testSpec, node, testDataIndex, null, true, null), context);
094            }
095            private void reportSuccessfulNodeEnd(SpecNode node, String output) {
096                    feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.End, testSpec, node, testDataIndex, output, true, null), context);
097            }
098            private void reportFaultyNodeEnd(SpecNode node, String output, MactorException cause) {
099                    feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.End, testSpec, node, testDataIndex, output, false, cause), context);
100            }
101            private void reportTerminated(boolean success, MactorException cause) {
102                    feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.End, testSpec, testSpec, testDataIndex, null, success, cause), context);
103            }
104            private void reportTestStart() {
105                    feedbackListener.onNodeEvent(new TestEvent(testRunInstanceId, testInstanceId, EventType.Start, testSpec, testSpec, testDataIndex, null, true, null), context);
106            }
107            public void runTest(int testDataIndex) throws MactorException {
108                    this.testDataIndex = testDataIndex;
109                    if (testSpec.getDelayBeforeStartSeconds() != 0)
110                            sleep(testSpec.getDelayBeforeStartSeconds() * 1000);
111                    reportTestStart();
112                    MactorException ex = null;
113                    try {
114                            try {
115                                    doNodes(testSpec.getSpecNodes());
116                            } catch (MactorException e) {
117                                    ex = e;
118                                    throw e;
119                            }
120                    } finally {
121                            cleanUp();
122                            reportTerminated(ex == null, ex);
123                    }
124            }
125            private void doNodes(List<SpecNode> nodes) throws MactorException {
126                    for (SpecNode node : nodes) {
127                            try {
128                                    reportNodeStart(node);
129                                    doNode(node);
130                                    reportSuccessfulNodeEnd(node, "Success");
131                            } catch (MactorException ce) {
132                                    reportFaultyNodeEnd(node, "Error", ce);
133                                    throw ce;
134                            } catch (RuntimeException e) {
135                                    MactorException re = new MactorException(e);
136                                    reportFaultyNodeEnd(node, "Runtime error", re);
137                                    throw re;
138                            }
139                    }
140            }
141            private void doNode(SpecNode node) throws MactorException {
142                    if (log.isDebugEnabled())
143                            log.debug("starting on node " + node.getName());
144                    long nodeStartTime = 0;
145                    Exception ex = null;
146                    if (test_timing_log.isInfoEnabled()) {
147                            nodeStartTime = System.currentTimeMillis();
148                    }
149                    try {
150                            if (node instanceof MessageSubscribeSpec) {
151                                    doMessageSubscribeNode((MessageSubscribeSpec) node);
152                            } else if (node instanceof MessagePublishSpec) {
153                                    doMessagePublishNode((MessagePublishSpec) node);
154                            } else if (node instanceof MessageReceiveSpec) {
155                                    doMessageReceiveNode((MessageReceiveSpec) node);
156                            } else if (node instanceof ValueSpec) {
157                                    doValueNode((ValueSpec) node);
158                            } else if (node instanceof ActionSpec) {
159                                    doActionNode((ActionSpec) node);
160                            } else if (node instanceof MessageRespondSpec) {
161                                    doMessageResponseNode((MessageRespondSpec) node);
162                            } else if (node instanceof LoopSpec) {
163                                    doLoopNode((LoopSpec) node);
164                            } else if (node instanceof ConditionSpec) {
165                                    doConditionNode((ConditionSpec) node);
166                            } else {
167                                    throw new MactorException("Unsupported node:" + node.getName());
168                            }
169                    } catch (MactorException me) {
170                            ex = me;
171                            if (log.isDebugEnabled())
172                                    log.debug("%%%%% NODE: " + node.getName());
173                            throw me;
174                    } catch (RuntimeException re) {
175                            ex = re;
176                            throw re;
177                    } finally {
178                            if (test_timing_log.isInfoEnabled()) {
179                                    test_timing_log.info(testRunInstanceId + ";" + testInstanceId + ";" + node.getName() + ";" + (ex == null) + ";" + (System.currentTimeMillis() - nodeStartTime));
180                            }
181                    }
182            }
183            private void doMessageSubscribeNode(MessageSubscribeSpec spec) throws MactorException {
184                    MessageWaiter mw = new MessageWaiter(spec.getChannel());
185                    if (messageWaiters.containsKey(spec.getName()))
186                            throw new RuntimeException("Unexpected state");
187                    messageWaiters.put(spec.getName(), mw);
188                    this.brokerManager.subscribe(spec.getChannel(), mw, MessageSelectorCommandExecutorFactory.createExecutor(context, spec.getMessageSelector()));
189            }
190            private void doMessagePublishNode(MessagePublishSpec spec) throws MactorException {
191                    Message message = MessageBuilderCommandExecutorFactory.createExecutor(context, ((MessagePublishSpec) spec).getMessageBuilder()).buildMessage(context);
192                    message.getMessageContextInfo().setNode(spec);
193                    context.addOutgoingMessage(spec.getName(), message);
194                    Message response = this.brokerManager.publish(spec.getChannel(), message);
195                    if (response != null) {
196                            context.addReceivedMessage(spec.getName(), response);
197                    }
198            }
199            private void doMessageReceiveNode(MessageReceiveSpec spec) throws MactorException {
200                    MessageWaiter mw = messageWaiters.get(spec.getMessageSubscribeNodeName());
201                    mw.start(spec.getMaxTimeoutSeconds(), spec.getMinMessageCount(), spec.getMaxMessageCount(), spec.isBlockUntilTimeout());
202                    while (true) {
203                            IncomingMessage im = null;
204                            try {
205                                    im = mw.getMessage();
206                                    if (im == null)
207                                            break;
208                                    im.getMessage().getMessageContextInfo().setNode(spec);
209                                    if (spec.hasResponseNode())
210                                            pushPendingMessage(im);
211                                    context.addReceivedMessage(spec.getName(), im.message);
212                                    doNodes(spec.getSpecNodes());
213                                    im.completed();
214                            } catch (Throwable t) {
215                                    if (im != null)
216                                            im.canceled();
217                                    if (t instanceof MactorException)
218                                            throw (MactorException) t;
219                                    else
220                                            throw new MactorException(t);
221                            }
222                    }
223            }
224            private void doLoopNode(LoopSpec spec) throws MactorException {
225                    int count = spec.getCount();
226                    if (count == 0)
227                            count = Integer.MAX_VALUE;
228                    for (int i = 0; i < count; i++) {
229                            doNodes(spec.getSpecNodes());
230                    }
231            }
232            private void doConditionNode(ConditionSpec spec) throws MactorException {
233                    if (Boolean.parseBoolean(context.substitute(spec.getExecute()))) {
234                            doNodes(spec.getSpecNodes());
235                    }
236            }
237            private void doMessageResponseNode(MessageRespondSpec spec) throws MactorException {
238                    Message responseMessage = MessageBuilderCommandExecutorFactory.createExecutor(context, ((MessageRespondSpec) spec).getMessageBuilder()).buildMessage(context);
239                    context.addOutgoingMessage(spec.getName(), responseMessage);
240                    popPendingMessage().setResponseMessage(responseMessage);
241            }
242            private LinkedList<IncomingMessage> pendingMessages = new LinkedList<IncomingMessage>();
243            private IncomingMessage popPendingMessage() {
244                    return pendingMessages.removeLast();
245            }
246            private void pushPendingMessage(IncomingMessage rw) {
247                    pendingMessages.addLast(rw);
248            }
249            private void doValueNode(ValueSpec spec) throws MactorException {
250                    ValueCommandExecutor vce = ValueCommandExecutorFactory.createExecutor(context, spec);
251                    context.setValue(spec.getName(), vce.extractValue(context));
252            }
253            private void doActionNode(ActionSpec spec) throws MactorException {
254                    ActionCommandExecutor vce = ActionCommandExecutorFactory.createExecutor(context, spec);
255                    vce.perform(context);
256            }
257            public static class IncomingMessage {
258                    Message message;
259                    Message responseMessage;
260                    boolean complete = false;
261                    public IncomingMessage(Message message) {
262                            this.message = message;
263                    }
264                    public void setResponseMessage(Message responseMessage) {
265                            this.responseMessage = responseMessage;
266                    }
267                    private Object lock = new Object();
268                    public void completed() {
269                            synchronized (lock) {
270                                    complete = true;
271                                    message.consume();
272                                    lock.notifyAll();
273                            }
274                    }
275                    public void canceled() {
276                            synchronized (lock) {
277                                    complete = true;
278                                    lock.notifyAll();
279                            }
280                    }
281                    Message waitForCompletion() throws InterruptedException {
282                            synchronized (lock) {
283                                    if (!complete)
284                                            lock.wait();
285                            }
286                            return responseMessage;
287                    }
288                    public Message getMessage() {
289                            return message;
290                    }
291            }
292            public static class MessageWaiter implements MessageSubscriber {
293                    private LinkedList<IncomingMessage> messages = new LinkedList<IncomingMessage>();
294                    private int maxMessageCount;
295                    private int minMessageCount;
296                    private int maxTimeoutSeconds;
297                    private boolean blockUntilTimeout;
298                    private Object lock = new Object();
299                    private int count;
300                    private int returnedCount;
301                    private long timeStart;
302                    private String channel;
303                    private boolean stopped = false;
304                    public String getChannel() {
305                            return channel;
306                    }
307                    public MessageWaiter(String channel) {
308                            this.channel = channel;
309                    }
310                    public void stop() {
311                            synchronized (lock) {
312                                    stopped = true;
313                                    lock.notifyAll();
314                            }
315                            for (IncomingMessage m : messages) {
316                                    m.canceled();
317                            }
318                    }
319                    public void start(int maxTimeoutSeconds, int minMessageCount, int maxMessageCount, boolean blockUntilTimeout) {
320                            this.returnedCount = 0;
321                            this.maxTimeoutSeconds = maxTimeoutSeconds;
322                            this.minMessageCount = minMessageCount;
323                            this.maxMessageCount = maxMessageCount;
324                            this.blockUntilTimeout = blockUntilTimeout;
325                            this.timeStart = System.currentTimeMillis();
326                            if (maxTimeoutSeconds == 0)
327                                    this.maxTimeoutSeconds = 60 * 60 * 24 * 365; // a year
328                            if (maxMessageCount == 0)
329                                    this.maxMessageCount = 1000000000; // one billion
330                    }
331                    public Message onMessage(Message message) {
332                            try {
333                                    IncomingMessage im = new IncomingMessage(message);
334                                    synchronized (lock) {
335                                            if (stopped) {
336                                                    lock.notifyAll();
337                                                    return null;
338                                            }
339                                            this.messages.add(im);
340                                            count++;
341                                            lock.notifyAll();
342                                    }
343                                    return im.waitForCompletion();
344                            } catch (Exception e) {
345                                    e.printStackTrace();
346                                    return null;
347                            }
348                    }
349                    public IncomingMessage getMessage() throws MessageTimoutException {
350                            while (!stopped) {
351                                    if (this.returnedCount >= this.maxMessageCount)
352                                            return null;
353                                    if (!blockUntilTimeout && this.returnedCount >= this.minMessageCount)
354                                            return null;
355                                    long timeLeft = maxTimeoutSeconds * 1000 - (System.currentTimeMillis() - timeStart);
356                                    if (timeLeft <= 0) {
357                                            if (this.returnedCount >= this.minMessageCount)
358                                                    return null;
359                                            else
360                                                    throw new MessageTimoutException("Timout reached after receiving " + this.returnedCount + " messages");
361                                    }
362                                    try {
363                                            synchronized (lock) {
364                                                    if (messages.size() > 0) {
365                                                            IncomingMessage m = messages.removeLast();
366                                                            returnedCount++;
367                                                            return m;
368                                                    } else if (timeLeft > 0) {
369                                                            lock.wait(timeLeft);
370                                                    }
371                                                    if (messages.size() > 0) {
372                                                            IncomingMessage m = messages.removeLast();
373                                                            returnedCount++;
374                                                            return m;
375                                                    }
376                                            }
377                                    } catch (InterruptedException ie) {
378                                            throw new RuntimeException(ie);
379                                    }
380                            }
381                            log.debug("Message Worker was stopped");
382                            return null;
383                    }
384            }
385            private void sleep(long millis) {
386                    try {
387                            Thread.sleep(millis);
388                    } catch (InterruptedException ie) {
389                    }
390            }
391    }