001    /******************************************************************************
002     * Copyright (C) MActor Developers. All rights reserved.                        *
003     * ---------------------------------------------------------------------------*
004     * This file is part of MActor.                                               *
005     *                                                                            *
006     * MActor is free software; you can redistribute it and/or modify             *
007     * it under the terms of the GNU General Public License as published by       *
008     * the Free Software Foundation; either version 2 of the License, or          *
009     * (at your option) any later version.                                        *
010     *                                                                            *
011     * MActor is distributed in the hope that it will be useful,                  *
012     * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
013     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
014     * GNU General Public License for more details.                               *
015     *                                                                            *
016     * You should have received a copy of the GNU General Public License          *
017     * along with MActor; if not, write to the Free Software                      *
018     * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA *
019     ******************************************************************************/
020    package org.mactor.framework;
021    
022    import java.util.LinkedList;
023    
024    import org.apache.log4j.Logger;
025    
026    // This primitive pool will grow to the max allowed threads and stay there
027    public class TestEngineThreadPool {
028            static Logger log = Logger.getLogger(TestEngineThreadPool.class);
029            private int maxNumberOfThreads;
030            private boolean stopped = false;
031            public TestEngineThreadPool(int maxNumberOfThreads) {
032                    this.maxNumberOfThreads = maxNumberOfThreads;
033            }
034            LinkedList<Worker> idleWorkers = new LinkedList<Worker>();
035            LinkedList<Worker> activeWorkers = new LinkedList<Worker>();
036            private Object lock = new Object();
037            public void terminate() {
038                    log.debug("Stopping test-engine thread pool...");
039                    stopped = true;
040                    synchronized (lock) {
041                            for (Worker w : activeWorkers) {
042                                    w.stop();
043                            }
044                    }
045                    log.debug("Thread pool stopped");
046            }
047            private Worker getIdleWorker() throws InterruptedException {
048                    synchronized (lock) {
049                            for (; !stopped;) {
050                                    if (idleWorkers.size() > 0) {
051                                            Worker w = idleWorkers.removeLast();
052                                            activeWorkers.add(w);
053                                            return w;
054                                    } else if ((activeWorkers.size() + idleWorkers.size()) < maxNumberOfThreads) {
055                                            Worker w = new Worker();
056                                            activeWorkers.add(w);
057                                            new Thread(w).start();
058                                            return w;
059                                    }
060                                    lock.wait();
061                            }
062                    }
063                    return null;
064            }
065            /**
066             * Block until thread is available
067             * 
068             * @param runnable
069             */
070            public void addJob(TestRunnable runnable) {
071                    try {
072                            synchronized (lock) {
073                                    Worker w = getIdleWorker();
074                                    if (w == null)
075                                            throw new RuntimeException("The thread pool has been stopped. No more jobs can be added");
076                                    w.assignJob(runnable);
077                            }
078                    } catch (InterruptedException ie) {
079                            ie.printStackTrace();
080                    }
081            }
082            public void join() {
083                    try {
084                            synchronized (lock) {
085                                    for (;;) {
086                                            if (activeWorkers.size() == 0)
087                                                    break;
088                                            lock.wait();
089                                    }
090                                    stopped = true;
091                            }
092                            for (Worker w : idleWorkers) {
093                                    w.stop();
094                            }
095                    } catch (InterruptedException ie) {
096                    }
097            }
098            private void completed(Worker w) {
099                    synchronized (lock) {
100                            if (!activeWorkers.remove(w))
101                                    throw new RuntimeException("Unexpected state");
102                            idleWorkers.add(w);
103                            lock.notifyAll();
104                    }
105                    log.debug("active:" + activeWorkers.size());
106            }
107            private class Worker implements Runnable {
108                    TestRunnable job;
109                    TestRunnable runningJob;
110                    private Object lock = new Object();
111                    public void run() {
112                            while (!stopped) {
113                                    try {
114                                            synchronized (lock) {
115                                                    if (job != null)
116                                                            runningJob = job;
117                                                    else
118                                                            lock.wait();
119                                                    if (job != null)
120                                                            runningJob = job;
121                                                    job = null;
122                                            }
123                                            if (runningJob != null) {
124                                                    try {
125                                                            runningJob.run();
126                                                    } finally {
127                                                            runningJob = null;
128                                                            completed(this);
129                                                    }
130                                            }
131                                    } catch (Exception e) {
132                                            e.printStackTrace();
133                                    }
134                            }
135                    }
136                    public void stop() {
137                            log.debug("Stopping test-engine worker");
138                            TestRunnable tmp = runningJob;
139                            if (tmp != null)
140                                    tmp.stop();
141                    }
142                    public void assignJob(TestRunnable job) {
143                            synchronized (this.lock) {
144                                    this.job = job;
145                                    this.lock.notifyAll();
146                            }
147                    }
148            }
149            public static class TestRunnable implements Runnable {
150                    boolean succes = false;
151                    int index;
152                    TestEngine testEngine;
153                    public TestRunnable(int index, TestEngine testEngine) {
154                            this.index = index;
155                            this.testEngine = testEngine;
156                    }
157                    public boolean isSucces() {
158                            return succes;
159                    }
160                    public void stop() {
161                            testEngine.stop();
162                    }
163                    public void run() {
164                            try {
165                                    testEngine.runTest(index);
166                                    this.succes = true;
167                            } catch (MactorException ce) {
168                                    ce.printStackTrace();
169                            }
170                    }
171            }
172    }