001 /******************************************************************************
002 * Copyright (C) MActor Developers. All rights reserved. *
003 * ---------------------------------------------------------------------------*
004 * This file is part of MActor. *
005 * *
006 * MActor is free software; you can redistribute it and/or modify *
007 * it under the terms of the GNU General Public License as published by *
008 * the Free Software Foundation; either version 2 of the License, or *
009 * (at your option) any later version. *
010 * *
011 * MActor is distributed in the hope that it will be useful, *
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014 * GNU General Public License for more details. *
015 * *
016 * You should have received a copy of the GNU General Public License *
017 * along with MActor; if not, write to the Free Software *
018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019 ******************************************************************************/
020 package org.mactor.framework;
021
022 import java.util.LinkedList;
023
024 import org.apache.log4j.Logger;
025
026 // This primitive pool will grow to the max allowed threads and stay there
027 public class TestEngineThreadPool {
028 static Logger log = Logger.getLogger(TestEngineThreadPool.class);
029 private int maxNumberOfThreads;
030 private boolean stopped = false;
031 public TestEngineThreadPool(int maxNumberOfThreads) {
032 this.maxNumberOfThreads = maxNumberOfThreads;
033 }
034 LinkedList<Worker> idleWorkers = new LinkedList<Worker>();
035 LinkedList<Worker> activeWorkers = new LinkedList<Worker>();
036 private Object lock = new Object();
037 public void terminate() {
038 log.debug("Stopping test-engine thread pool...");
039 stopped = true;
040 synchronized (lock) {
041 for (Worker w : activeWorkers) {
042 w.stop();
043 }
044 }
045 log.debug("Thread pool stopped");
046 }
047 private Worker getIdleWorker() throws InterruptedException {
048 synchronized (lock) {
049 for (; !stopped;) {
050 if (idleWorkers.size() > 0) {
051 Worker w = idleWorkers.removeLast();
052 activeWorkers.add(w);
053 return w;
054 } else if ((activeWorkers.size() + idleWorkers.size()) < maxNumberOfThreads) {
055 Worker w = new Worker();
056 activeWorkers.add(w);
057 new Thread(w).start();
058 return w;
059 }
060 lock.wait();
061 }
062 }
063 return null;
064 }
065 /**
066 * Block until thread is available
067 *
068 * @param runnable
069 */
070 public void addJob(TestRunnable runnable) {
071 try {
072 synchronized (lock) {
073 Worker w = getIdleWorker();
074 if (w == null)
075 throw new RuntimeException("The thread pool has been stopped. No more jobs can be added");
076 w.assignJob(runnable);
077 }
078 } catch (InterruptedException ie) {
079 ie.printStackTrace();
080 }
081 }
082 public void join() {
083 try {
084 synchronized (lock) {
085 for (;;) {
086 if (activeWorkers.size() == 0)
087 break;
088 lock.wait();
089 }
090 stopped = true;
091 }
092 for (Worker w : idleWorkers) {
093 w.stop();
094 }
095 } catch (InterruptedException ie) {
096 }
097 }
098 private void completed(Worker w) {
099 synchronized (lock) {
100 if (!activeWorkers.remove(w))
101 throw new RuntimeException("Unexpected state");
102 idleWorkers.add(w);
103 lock.notifyAll();
104 }
105 log.debug("active:" + activeWorkers.size());
106 }
107 private class Worker implements Runnable {
108 TestRunnable job;
109 TestRunnable runningJob;
110 private Object lock = new Object();
111 public void run() {
112 while (!stopped) {
113 try {
114 synchronized (lock) {
115 if (job != null)
116 runningJob = job;
117 else
118 lock.wait();
119 if (job != null)
120 runningJob = job;
121 job = null;
122 }
123 if (runningJob != null) {
124 try {
125 runningJob.run();
126 } finally {
127 runningJob = null;
128 completed(this);
129 }
130 }
131 } catch (Exception e) {
132 e.printStackTrace();
133 }
134 }
135 }
136 public void stop() {
137 log.debug("Stopping test-engine worker");
138 TestRunnable tmp = runningJob;
139 if (tmp != null)
140 tmp.stop();
141 }
142 public void assignJob(TestRunnable job) {
143 synchronized (this.lock) {
144 this.job = job;
145 this.lock.notifyAll();
146 }
147 }
148 }
149 public static class TestRunnable implements Runnable {
150 boolean succes = false;
151 int index;
152 TestEngine testEngine;
153 public TestRunnable(int index, TestEngine testEngine) {
154 this.index = index;
155 this.testEngine = testEngine;
156 }
157 public boolean isSucces() {
158 return succes;
159 }
160 public void stop() {
161 testEngine.stop();
162 }
163 public void run() {
164 try {
165 testEngine.runTest(index);
166 this.succes = true;
167 } catch (MactorException ce) {
168 ce.printStackTrace();
169 }
170 }
171 }
172 }