threadpool.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. /*
  2. * Public Domain Software
  3. *
  4. * I (Matthias Ladkau) am the author of the source code in this file.
  5. * I have placed the source code in this file in the public domain.
  6. *
  7. * For further information see: http://creativecommons.org/publicdomain/zero/1.0/
  8. */
  9. /*
  10. Package pools contains object pooling utilities.
  11. */
  12. package pools
  13. import (
  14. "math"
  15. "sync"
  16. "time"
  17. )
  18. /*
  19. Different states of a thread pool.
  20. */
  21. const (
  22. StatusRunning = "Running"
  23. StatusStopping = "Stopping"
  24. StatusStopped = "Stopped"
  25. )
  26. /*
  27. Task is a task which should be run in a thread.
  28. */
  29. type Task interface {
  30. /*
  31. Run the task.
  32. */
  33. Run() error
  34. /*
  35. HandleError handles an error which occured during the run method.
  36. */
  37. HandleError(e error)
  38. }
  39. /*
  40. TaskQueue is a queue of tasks for a thread pool.
  41. */
  42. type TaskQueue interface {
  43. /*
  44. Clear the queue of all pending tasks
  45. */
  46. Clear()
  47. /*
  48. Pop returns the next task from the queue.
  49. */
  50. Pop() Task
  51. /*
  52. Push adds another task to the queue.
  53. */
  54. Push(t Task)
  55. /*
  56. Size returns the size of the queue.
  57. */
  58. Size() int
  59. }
  60. /*
  61. DefaultTaskQueue implements a simple (FIFO) task queue for a thread pool.
  62. */
  63. type DefaultTaskQueue struct {
  64. queue []Task
  65. }
  66. /*
  67. Clear the queue of all pending tasks
  68. */
  69. func (tq *DefaultTaskQueue) Clear() {
  70. tq.queue = make([]Task, 0)
  71. }
  72. /*
  73. Pop returns the next task from the queue.
  74. */
  75. func (tq *DefaultTaskQueue) Pop() Task {
  76. var task Task
  77. if len(tq.queue) > 0 {
  78. task = tq.queue[0]
  79. tq.queue = tq.queue[1:]
  80. }
  81. return task
  82. }
  83. /*
  84. Push adds another task to the queue.
  85. */
  86. func (tq *DefaultTaskQueue) Push(t Task) {
  87. tq.queue = append(tq.queue, t)
  88. }
  89. /*
  90. Size returns the size of the queue.
  91. */
  92. func (tq *DefaultTaskQueue) Size() int {
  93. return len(tq.queue)
  94. }
  95. /*
  96. ThreadPool creates a pool of threads which process tasks according to a given
  97. task queue. The threads are kept in an idle state if no more tasks are available.
  98. They resume immediately once a new task is added.
  99. */
  100. type ThreadPool struct {
  101. // Task regulation
  102. queue TaskQueue // Task queue for thread pool
  103. queueLock *sync.Mutex // Lock for queue
  104. // Worker regulation
  105. workerIDCount uint64 // Id counter for worker tasks
  106. workerMap map[uint64]*ThreadPoolWorker // Map of all workers
  107. workerIdleMap map[uint64]*ThreadPoolWorker // Map of all idle workers
  108. workerMapLock *sync.Mutex // Lock for worker map
  109. workerKill int // Count of workers which should die
  110. newTaskCond *sync.Cond // Waiting condition for new tasks
  111. // Callbacks to regulate load
  112. RegulationLock *sync.Mutex // Lock for regulation variables
  113. TooManyThreshold int // Threshold for too many tasks
  114. TooManyCallback func() // Callback for too many tasks
  115. tooManyTriggered bool // Flag if too many tasks threshold was passed
  116. TooFewThreshold int // Threshold for too few tasks
  117. TooFewCallback func() // Callback for too few tasks
  118. tooFewTriggered bool // Flag if too many tasks threshold was passed
  119. }
  120. /*
  121. NewThreadPool creates a new thread pool.
  122. */
  123. func NewThreadPool() *ThreadPool {
  124. return NewThreadPoolWithQueue(&DefaultTaskQueue{})
  125. }
  126. /*
  127. NewThreadPoolWithQueue creates a new thread pool with a specific task queue.
  128. */
  129. func NewThreadPoolWithQueue(q TaskQueue) *ThreadPool {
  130. return &ThreadPool{q, &sync.Mutex{},
  131. 0, make(map[uint64]*ThreadPoolWorker),
  132. make(map[uint64]*ThreadPoolWorker), &sync.Mutex{},
  133. 0, sync.NewCond(&sync.Mutex{}), &sync.Mutex{},
  134. math.MaxInt32, func() {}, false, 0, func() {}, false}
  135. }
  136. /*
  137. AddTask adds a task to the thread pool.
  138. */
  139. func (tp *ThreadPool) AddTask(t Task) {
  140. tp.queueLock.Lock()
  141. defer tp.queueLock.Unlock()
  142. tp.queue.Push(t)
  143. // Reset too few flag
  144. tp.RegulationLock.Lock()
  145. if tp.tooFewTriggered && tp.TooFewThreshold < tp.queue.Size() {
  146. tp.tooFewTriggered = false
  147. }
  148. // Check too many
  149. if !tp.tooManyTriggered && tp.TooManyThreshold <= tp.queue.Size() {
  150. tp.tooManyTriggered = true
  151. tp.TooManyCallback()
  152. }
  153. tp.RegulationLock.Unlock()
  154. // Wake up a waiting worker
  155. tp.newTaskCond.Signal()
  156. }
  157. /*
  158. getTask is called by a worker to request a new task. The worker is expected to finish
  159. if this function returns nil.
  160. */
  161. func (tp *ThreadPool) getTask() Task {
  162. var returnIdleTask = true
  163. // Check if tasks should be stopped
  164. tp.workerMapLock.Lock()
  165. if tp.workerKill > 0 {
  166. tp.workerKill--
  167. tp.workerMapLock.Unlock()
  168. return nil
  169. } else if tp.workerKill == -1 {
  170. // Check for special worker kill value which is used when workers should
  171. // be killed when no more tasks are available.
  172. returnIdleTask = false
  173. }
  174. tp.workerMapLock.Unlock()
  175. // Check if there is a task available
  176. tp.queueLock.Lock()
  177. task := tp.queue.Pop()
  178. tp.queueLock.Unlock()
  179. if task != nil {
  180. return task
  181. }
  182. tp.RegulationLock.Lock()
  183. // Reset too many flag
  184. if tp.tooManyTriggered && tp.TooManyThreshold > tp.queue.Size() {
  185. tp.tooManyTriggered = false
  186. }
  187. // Check too few
  188. if !tp.tooFewTriggered && tp.TooFewThreshold >= tp.queue.Size() {
  189. tp.tooFewTriggered = true
  190. tp.TooFewCallback()
  191. }
  192. tp.RegulationLock.Unlock()
  193. if returnIdleTask {
  194. // No new task available return idle task
  195. return &idleTask{tp}
  196. }
  197. return nil
  198. }
  199. /*
  200. SetWorkerCount sets the worker count of this pool. If the wait flag is true then
  201. this call will return after the pool has reached the requested worker count.
  202. */
  203. func (tp *ThreadPool) SetWorkerCount(count int, wait bool) {
  204. tp.workerMapLock.Lock()
  205. workerCount := len(tp.workerMap)
  206. tp.workerMapLock.Unlock()
  207. if count < 0 {
  208. count = 0
  209. }
  210. if workerCount < count {
  211. // More workers are needed
  212. tp.workerMapLock.Lock()
  213. // Make sure no more workers are killed
  214. tp.workerKill = 0
  215. for len(tp.workerMap) != count {
  216. worker := &ThreadPoolWorker{tp.workerIDCount, tp}
  217. go worker.run()
  218. tp.workerMap[tp.workerIDCount] = worker
  219. tp.workerIDCount++
  220. }
  221. tp.workerMapLock.Unlock()
  222. } else if workerCount > count {
  223. // Fewer workers are needed
  224. tp.workerMapLock.Lock()
  225. tp.workerKill = workerCount - count
  226. tp.workerMapLock.Unlock()
  227. tp.newTaskCond.Broadcast()
  228. if wait {
  229. for true {
  230. tp.workerMapLock.Lock()
  231. workerCount = len(tp.workerMap)
  232. tp.workerMapLock.Unlock()
  233. if workerCount == count {
  234. break
  235. }
  236. time.Sleep(5 * time.Nanosecond)
  237. // Broadcast again since sine workers might be now waiting
  238. tp.newTaskCond.Broadcast()
  239. }
  240. }
  241. }
  242. }
  243. /*
  244. Status returns the current status of the thread pool.
  245. */
  246. func (tp *ThreadPool) Status() string {
  247. var status string
  248. tp.workerMapLock.Lock()
  249. workerCount := len(tp.workerMap)
  250. workerKill := tp.workerKill
  251. tp.workerMapLock.Unlock()
  252. if workerCount > 0 {
  253. if workerKill == -1 {
  254. status = StatusStopping
  255. } else {
  256. status = StatusRunning
  257. }
  258. } else {
  259. status = StatusStopped
  260. }
  261. return status
  262. }
  263. /*
  264. WorkerCount returns the current count of workers.
  265. */
  266. func (tp *ThreadPool) WorkerCount() int {
  267. tp.workerMapLock.Lock()
  268. defer tp.workerMapLock.Unlock()
  269. return len(tp.workerMap)
  270. }
  271. /*
  272. WaitAll waits for all workers to become idle.
  273. */
  274. func (tp *ThreadPool) WaitAll() {
  275. // Wake up all workers
  276. tp.newTaskCond.Broadcast()
  277. time.Sleep(5 * time.Nanosecond)
  278. for true {
  279. tp.workerMapLock.Lock()
  280. tp.queueLock.Lock()
  281. // Get total number of workers and idle workers
  282. workerCount := len(tp.workerMap)
  283. workerIdleCount := len(tp.workerIdleMap)
  284. // Get number of pending tasks
  285. tasks := tp.queue.Size()
  286. tp.queueLock.Unlock()
  287. tp.workerMapLock.Unlock()
  288. // Only leave this loop if either no workers are left or if all
  289. // tasks are done and all workers are idle
  290. if workerCount == 0 || (workerCount == workerIdleCount && tasks == 0) {
  291. break
  292. }
  293. time.Sleep(5 * time.Nanosecond)
  294. // Broadcast again and again until all workers are idle
  295. tp.newTaskCond.Broadcast()
  296. }
  297. }
  298. /*
  299. JoinAll processes all remaining tasks and kills off all workers afterwards.
  300. */
  301. func (tp *ThreadPool) JoinAll() {
  302. // Tell all workers to die
  303. tp.workerMapLock.Lock()
  304. tp.workerKill = -1
  305. tp.workerMapLock.Unlock()
  306. tp.newTaskCond.Broadcast()
  307. for true {
  308. tp.workerMapLock.Lock()
  309. tp.queueLock.Lock()
  310. // Get total number of workers
  311. workerCount := len(tp.workerMap)
  312. // Get number of pending tasks
  313. tasks := tp.queue.Size()
  314. tp.queueLock.Unlock()
  315. tp.workerMapLock.Unlock()
  316. // Only leave this loop if no workers are existing and all tasks are done
  317. if workerCount == 0 && tasks == 0 {
  318. break
  319. }
  320. time.Sleep(5 * time.Nanosecond)
  321. // Broadcast again and again until all workers are dead
  322. tp.newTaskCond.Broadcast()
  323. }
  324. }
  325. /*
  326. ThreadPoolWorker models a worker in the thread pool.
  327. */
  328. type ThreadPoolWorker struct {
  329. id uint64 // ID of the thread pool worker
  330. pool *ThreadPool // Thread pool of this worker
  331. }
  332. /*
  333. run lets this worker run tasks.
  334. */
  335. func (w *ThreadPoolWorker) run() {
  336. for true {
  337. // Try to get the next task
  338. task := w.pool.getTask()
  339. // Exit if there is not new task
  340. if task == nil {
  341. break
  342. }
  343. _, isIdleTask := task.(*idleTask)
  344. if isIdleTask {
  345. // Register this worker as idle
  346. w.pool.workerMapLock.Lock()
  347. w.pool.workerIdleMap[w.id] = w
  348. w.pool.workerMapLock.Unlock()
  349. }
  350. // Run the task
  351. if err := task.Run(); err != nil {
  352. task.HandleError(err)
  353. }
  354. if isIdleTask {
  355. w.pool.workerMapLock.Lock()
  356. delete(w.pool.workerIdleMap, w.id)
  357. w.pool.workerMapLock.Unlock()
  358. }
  359. }
  360. // Remove worker from workerMap
  361. w.pool.workerMapLock.Lock()
  362. delete(w.pool.workerMap, w.id)
  363. w.pool.workerMapLock.Unlock()
  364. }
  365. /*
  366. idleTask is the internal idle task.
  367. */
  368. type idleTask struct {
  369. tp *ThreadPool
  370. }
  371. /*
  372. Run the idle task.
  373. */
  374. func (t *idleTask) Run() error {
  375. t.tp.newTaskCond.L.Lock()
  376. t.tp.newTaskCond.Wait()
  377. t.tp.newTaskCond.L.Unlock()
  378. return nil
  379. }
  380. func (t *idleTask) HandleError(e error) {
  381. panic(e.Error())
  382. }