001 /****************************************************************************** 002 * Copyright (C) MActor Developers. All rights reserved. * 003 * ---------------------------------------------------------------------------* 004 * This file is part of MActor. * 005 * * 006 * MActor is free software; you can redistribute it and/or modify * 007 * it under the terms of the GNU General Public License as published by * 008 * the Free Software Foundation; either version 2 of the License, or * 009 * (at your option) any later version. * 010 * * 011 * MActor is distributed in the hope that it will be useful, * 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 014 * GNU General Public License for more details. * 015 * * 016 * You should have received a copy of the GNU General Public License * 017 * along with MActor; if not, write to the Free Software * 018 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * 019 ******************************************************************************/ 020 package org.mactor.framework; 021 022 import java.util.LinkedList; 023 024 import org.apache.log4j.Logger; 025 026 // This primitive pool will grow to the max allowed threads and stay there 027 public class TestEngineThreadPool { 028 static Logger log = Logger.getLogger(TestEngineThreadPool.class); 029 private int maxNumberOfThreads; 030 private boolean stopped = false; 031 public TestEngineThreadPool(int maxNumberOfThreads) { 032 this.maxNumberOfThreads = maxNumberOfThreads; 033 } 034 LinkedList<Worker> idleWorkers = new LinkedList<Worker>(); 035 LinkedList<Worker> activeWorkers = new LinkedList<Worker>(); 036 private Object lock = new Object(); 037 public void terminate() { 038 log.debug("Stopping test-engine thread pool..."); 039 stopped = true; 040 synchronized (lock) { 041 for (Worker w : activeWorkers) { 042 w.stop(); 043 } 044 } 045 log.debug("Thread pool stopped"); 046 } 047 private Worker getIdleWorker() throws InterruptedException { 048 synchronized (lock) { 049 for (; !stopped;) { 050 if (idleWorkers.size() > 0) { 051 Worker w = idleWorkers.removeLast(); 052 activeWorkers.add(w); 053 return w; 054 } else if ((activeWorkers.size() + idleWorkers.size()) < maxNumberOfThreads) { 055 Worker w = new Worker(); 056 activeWorkers.add(w); 057 new Thread(w).start(); 058 return w; 059 } 060 lock.wait(); 061 } 062 } 063 return null; 064 } 065 /** 066 * Block until thread is available 067 * 068 * @param runnable 069 */ 070 public void addJob(TestRunnable runnable) { 071 try { 072 synchronized (lock) { 073 Worker w = getIdleWorker(); 074 if (w == null) 075 throw new RuntimeException("The thread pool has been stopped. No more jobs can be added"); 076 w.assignJob(runnable); 077 } 078 } catch (InterruptedException ie) { 079 ie.printStackTrace(); 080 } 081 } 082 public void join() { 083 try { 084 synchronized (lock) { 085 for (;;) { 086 if (activeWorkers.size() == 0) 087 break; 088 lock.wait(); 089 } 090 stopped = true; 091 } 092 for (Worker w : idleWorkers) { 093 w.stop(); 094 } 095 } catch (InterruptedException ie) { 096 } 097 } 098 private void completed(Worker w) { 099 synchronized (lock) { 100 if (!activeWorkers.remove(w)) 101 throw new RuntimeException("Unexpected state"); 102 idleWorkers.add(w); 103 lock.notifyAll(); 104 } 105 log.debug("active:" + activeWorkers.size()); 106 } 107 private class Worker implements Runnable { 108 TestRunnable job; 109 TestRunnable runningJob; 110 private Object lock = new Object(); 111 public void run() { 112 while (!stopped) { 113 try { 114 synchronized (lock) { 115 if (job != null) 116 runningJob = job; 117 else 118 lock.wait(); 119 if (job != null) 120 runningJob = job; 121 job = null; 122 } 123 if (runningJob != null) { 124 try { 125 runningJob.run(); 126 } finally { 127 runningJob = null; 128 completed(this); 129 } 130 } 131 } catch (Exception e) { 132 e.printStackTrace(); 133 } 134 } 135 } 136 public void stop() { 137 log.debug("Stopping test-engine worker"); 138 TestRunnable tmp = runningJob; 139 if (tmp != null) 140 tmp.stop(); 141 } 142 public void assignJob(TestRunnable job) { 143 synchronized (this.lock) { 144 this.job = job; 145 this.lock.notifyAll(); 146 } 147 } 148 } 149 public static class TestRunnable implements Runnable { 150 boolean succes = false; 151 int index; 152 TestEngine testEngine; 153 public TestRunnable(int index, TestEngine testEngine) { 154 this.index = index; 155 this.testEngine = testEngine; 156 } 157 public boolean isSucces() { 158 return succes; 159 } 160 public void stop() { 161 testEngine.stop(); 162 } 163 public void run() { 164 try { 165 testEngine.runTest(index); 166 this.succes = true; 167 } catch (MactorException ce) { 168 ce.printStackTrace(); 169 } 170 } 171 } 172 }