threadpool.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. /*
  2. * Public Domain Software
  3. *
  4. * I (Matthias Ladkau) am the author of the source code in this file.
  5. * I have placed the source code in this file in the public domain.
  6. *
  7. * For further information see: http://creativecommons.org/publicdomain/zero/1.0/
  8. */
  9. package pools
  10. import (
  11. "math"
  12. "sync"
  13. "time"
  14. )
  15. /*
  16. Different states of a thread pool.
  17. */
  18. const (
  19. StatusRunning = "Running"
  20. StatusStopping = "Stopping"
  21. StatusStopped = "Stopped"
  22. )
  23. /*
  24. Task is a task which should be run in a thread.
  25. */
  26. type Task interface {
  27. /*
  28. Run the task. The function gets the unique thread ID of the worker
  29. which executes the task.
  30. */
  31. Run(tid uint64) error
  32. /*
  33. HandleError handles an error which occurred during the run method.
  34. */
  35. HandleError(e error)
  36. }
  37. /*
  38. TaskQueue is a queue of tasks for a thread pool.
  39. */
  40. type TaskQueue interface {
  41. /*
  42. Clear the queue of all pending tasks
  43. */
  44. Clear()
  45. /*
  46. Pop returns the next task from the queue.
  47. */
  48. Pop() Task
  49. /*
  50. Push adds another task to the queue.
  51. */
  52. Push(t Task)
  53. /*
  54. Size returns the size of the queue.
  55. */
  56. Size() int
  57. }
  58. /*
  59. DefaultTaskQueue implements a simple (FIFO) task queue for a thread pool.
  60. */
  61. type DefaultTaskQueue struct {
  62. queue []Task
  63. }
  64. /*
  65. Clear the queue of all pending tasks
  66. */
  67. func (tq *DefaultTaskQueue) Clear() {
  68. tq.queue = make([]Task, 0)
  69. }
  70. /*
  71. Pop returns the next task from the queue.
  72. */
  73. func (tq *DefaultTaskQueue) Pop() Task {
  74. var task Task
  75. if len(tq.queue) > 0 {
  76. task = tq.queue[0]
  77. tq.queue = tq.queue[1:]
  78. }
  79. return task
  80. }
  81. /*
  82. Push adds another task to the queue.
  83. */
  84. func (tq *DefaultTaskQueue) Push(t Task) {
  85. tq.queue = append(tq.queue, t)
  86. }
  87. /*
  88. Size returns the size of the queue.
  89. */
  90. func (tq *DefaultTaskQueue) Size() int {
  91. return len(tq.queue)
  92. }
  93. /*
  94. ThreadPool creates a pool of threads which process tasks according to a given
  95. task queue. The threads are kept in an idle state if no more tasks are available.
  96. They resume immediately once a new task is added.
  97. */
  98. type ThreadPool struct {
  99. // Task regulation
  100. queue TaskQueue // Task queue for thread pool
  101. queueLock *sync.Mutex // Lock for queue
  102. // Worker regulation
  103. workerIDCount uint64 // Id counter for worker tasks
  104. workerIDLock *sync.Mutex // Lock for ID generation
  105. workerMap map[uint64]*ThreadPoolWorker // Map of all workers
  106. workerIdleMap map[uint64]*ThreadPoolWorker // Map of all idle workers
  107. workerMapLock *sync.Mutex // Lock for worker map
  108. workerKill int // Count of workers which should die
  109. newTaskCond *sync.Cond // Waiting condition for new tasks
  110. // Callbacks to regulate load
  111. RegulationLock *sync.Mutex // Lock for regulation variables
  112. TooManyThreshold int // Threshold for too many tasks
  113. TooManyCallback func() // Callback for too many tasks
  114. tooManyTriggered bool // Flag if too many tasks threshold was passed
  115. TooFewThreshold int // Threshold for too few tasks
  116. TooFewCallback func() // Callback for too few tasks
  117. tooFewTriggered bool // Flag if too many tasks threshold was passed
  118. }
  119. /*
  120. NewThreadPool creates a new thread pool.
  121. */
  122. func NewThreadPool() *ThreadPool {
  123. return NewThreadPoolWithQueue(&DefaultTaskQueue{})
  124. }
  125. /*
  126. NewThreadPoolWithQueue creates a new thread pool with a specific task queue.
  127. */
  128. func NewThreadPoolWithQueue(q TaskQueue) *ThreadPool {
  129. return &ThreadPool{q, &sync.Mutex{},
  130. 1, &sync.Mutex{}, make(map[uint64]*ThreadPoolWorker),
  131. make(map[uint64]*ThreadPoolWorker), &sync.Mutex{},
  132. 0, sync.NewCond(&sync.Mutex{}), &sync.Mutex{},
  133. math.MaxInt32, func() {}, false, 0, func() {}, false}
  134. }
  135. /*
  136. AddTask adds a task to the thread pool.
  137. */
  138. func (tp *ThreadPool) AddTask(t Task) {
  139. tp.queueLock.Lock()
  140. defer tp.queueLock.Unlock()
  141. tp.queue.Push(t)
  142. // Reset too few flag
  143. tp.RegulationLock.Lock()
  144. if tp.tooFewTriggered && tp.TooFewThreshold < tp.queue.Size() {
  145. tp.tooFewTriggered = false
  146. }
  147. // Check too many
  148. if !tp.tooManyTriggered && tp.TooManyThreshold <= tp.queue.Size() {
  149. tp.tooManyTriggered = true
  150. tp.TooManyCallback()
  151. }
  152. tp.RegulationLock.Unlock()
  153. // Wake up a waiting worker
  154. tp.newTaskCond.Signal()
  155. }
  156. /*
  157. getTask is called by a worker to request a new task. The worker is expected to finish
  158. if this function returns nil.
  159. */
  160. func (tp *ThreadPool) getTask() Task {
  161. var returnIdleTask = true
  162. // Check if tasks should be stopped
  163. tp.workerMapLock.Lock()
  164. if tp.workerKill > 0 {
  165. tp.workerKill--
  166. tp.workerMapLock.Unlock()
  167. return nil
  168. } else if tp.workerKill == -1 {
  169. // Check for special worker kill value which is used when workers should
  170. // be killed when no more tasks are available.
  171. returnIdleTask = false
  172. }
  173. tp.workerMapLock.Unlock()
  174. // Check if there is a task available
  175. tp.queueLock.Lock()
  176. task := tp.queue.Pop()
  177. tp.queueLock.Unlock()
  178. if task != nil {
  179. return task
  180. }
  181. tp.RegulationLock.Lock()
  182. // Reset too many flag
  183. if tp.tooManyTriggered && tp.TooManyThreshold > tp.queue.Size() {
  184. tp.tooManyTriggered = false
  185. }
  186. // Check too few
  187. if !tp.tooFewTriggered && tp.TooFewThreshold >= tp.queue.Size() {
  188. tp.tooFewTriggered = true
  189. tp.TooFewCallback()
  190. }
  191. tp.RegulationLock.Unlock()
  192. if returnIdleTask {
  193. // No new task available return idle task
  194. return &idleTask{tp}
  195. }
  196. return nil
  197. }
  198. /*
  199. NewThreadID creates a new thread ID unique to this pool.
  200. */
  201. func (tp *ThreadPool) NewThreadID() uint64 {
  202. tp.workerIDLock.Lock()
  203. res := tp.workerIDCount
  204. tp.workerIDCount++
  205. tp.workerIDLock.Unlock()
  206. return res
  207. }
  208. /*
  209. SetWorkerCount sets the worker count of this pool. If the wait flag is true then
  210. this call will return after the pool has reached the requested worker count.
  211. */
  212. func (tp *ThreadPool) SetWorkerCount(count int, wait bool) {
  213. tp.workerMapLock.Lock()
  214. workerCount := len(tp.workerMap)
  215. tp.workerMapLock.Unlock()
  216. if count < 0 {
  217. count = 0
  218. }
  219. if workerCount < count {
  220. // More workers are needed
  221. tp.workerMapLock.Lock()
  222. // Make sure no more workers are killed
  223. tp.workerKill = 0
  224. for len(tp.workerMap) != count {
  225. tid := tp.NewThreadID()
  226. worker := &ThreadPoolWorker{tid, tp}
  227. go worker.run()
  228. tp.workerMap[tid] = worker
  229. }
  230. tp.workerMapLock.Unlock()
  231. } else if workerCount > count {
  232. // Fewer workers are needed
  233. tp.workerMapLock.Lock()
  234. tp.workerKill = workerCount - count
  235. tp.workerMapLock.Unlock()
  236. tp.newTaskCond.Broadcast()
  237. if wait {
  238. for true {
  239. tp.workerMapLock.Lock()
  240. workerCount = len(tp.workerMap)
  241. tp.workerMapLock.Unlock()
  242. if workerCount == count {
  243. break
  244. }
  245. time.Sleep(5 * time.Nanosecond)
  246. // Broadcast again since sine workers might be now waiting
  247. tp.newTaskCond.Broadcast()
  248. }
  249. }
  250. }
  251. // If a count was set wait until at least one worker is idle
  252. for count > 0 && len(tp.workerIdleMap) == 0 {
  253. time.Sleep(5 * time.Nanosecond)
  254. }
  255. }
  256. /*
  257. Status returns the current status of the thread pool.
  258. */
  259. func (tp *ThreadPool) Status() string {
  260. var status string
  261. tp.workerMapLock.Lock()
  262. workerCount := len(tp.workerMap)
  263. workerKill := tp.workerKill
  264. tp.workerMapLock.Unlock()
  265. if workerCount > 0 {
  266. if workerKill == -1 {
  267. status = StatusStopping
  268. } else {
  269. status = StatusRunning
  270. }
  271. } else {
  272. status = StatusStopped
  273. }
  274. return status
  275. }
  276. /*
  277. WorkerCount returns the current count of workers.
  278. */
  279. func (tp *ThreadPool) WorkerCount() int {
  280. tp.workerMapLock.Lock()
  281. defer tp.workerMapLock.Unlock()
  282. return len(tp.workerMap)
  283. }
  284. /*
  285. WaitAll waits for all workers to become idle.
  286. */
  287. func (tp *ThreadPool) WaitAll() {
  288. // Wake up all workers
  289. tp.newTaskCond.Broadcast()
  290. time.Sleep(5 * time.Nanosecond)
  291. for true {
  292. tp.workerMapLock.Lock()
  293. tp.queueLock.Lock()
  294. // Get total number of workers and idle workers
  295. workerCount := len(tp.workerMap)
  296. workerIdleCount := len(tp.workerIdleMap)
  297. // Get number of pending tasks
  298. tasks := tp.queue.Size()
  299. tp.queueLock.Unlock()
  300. tp.workerMapLock.Unlock()
  301. // Only leave this loop if either no workers are left or if all
  302. // tasks are done and all workers are idle
  303. if workerCount == 0 || (workerCount == workerIdleCount && tasks == 0) {
  304. break
  305. }
  306. time.Sleep(5 * time.Nanosecond)
  307. // Broadcast again and again until all workers are idle
  308. tp.newTaskCond.Broadcast()
  309. }
  310. }
  311. /*
  312. JoinAll processes all remaining tasks and kills off all workers afterwards.
  313. */
  314. func (tp *ThreadPool) JoinAll() {
  315. // Tell all workers to die
  316. tp.workerMapLock.Lock()
  317. tp.workerKill = -1
  318. tp.workerMapLock.Unlock()
  319. tp.newTaskCond.Broadcast()
  320. for true {
  321. tp.workerMapLock.Lock()
  322. tp.queueLock.Lock()
  323. // Get total number of workers
  324. workerCount := len(tp.workerMap)
  325. // Get number of pending tasks
  326. tasks := tp.queue.Size()
  327. tp.queueLock.Unlock()
  328. tp.workerMapLock.Unlock()
  329. // Only leave this loop if no workers are existing and all tasks are done
  330. if workerCount == 0 && tasks == 0 {
  331. break
  332. }
  333. time.Sleep(5 * time.Nanosecond)
  334. // Broadcast again and again until all workers are dead
  335. tp.newTaskCond.Broadcast()
  336. }
  337. }
  338. /*
  339. ThreadPoolWorker models a worker in the thread pool.
  340. */
  341. type ThreadPoolWorker struct {
  342. id uint64 // ID of the thread pool worker
  343. pool *ThreadPool // Thread pool of this worker
  344. }
  345. /*
  346. run lets this worker run tasks.
  347. */
  348. func (w *ThreadPoolWorker) run() {
  349. defer func() {
  350. // Remove worker from workerMap
  351. w.pool.workerMapLock.Lock()
  352. delete(w.pool.workerMap, w.id)
  353. w.pool.workerMapLock.Unlock()
  354. }()
  355. for true {
  356. // Try to get the next task
  357. task := w.pool.getTask()
  358. // Exit if there is not new task
  359. if task == nil {
  360. break
  361. }
  362. _, isIdleTask := task.(*idleTask)
  363. if isIdleTask {
  364. // Register this worker as idle
  365. w.pool.workerMapLock.Lock()
  366. w.pool.workerIdleMap[w.id] = w
  367. w.pool.workerMapLock.Unlock()
  368. }
  369. // Run the task
  370. if err := task.Run(w.id); err != nil {
  371. task.HandleError(err)
  372. }
  373. if isIdleTask {
  374. w.pool.workerMapLock.Lock()
  375. delete(w.pool.workerIdleMap, w.id)
  376. w.pool.workerMapLock.Unlock()
  377. }
  378. }
  379. }
  380. /*
  381. idleTask is the internal idle task.
  382. */
  383. type idleTask struct {
  384. tp *ThreadPool
  385. }
  386. /*
  387. Run the idle task.
  388. */
  389. func (t *idleTask) Run(tid uint64) error {
  390. t.tp.newTaskCond.L.Lock()
  391. defer t.tp.newTaskCond.L.Unlock()
  392. t.tp.newTaskCond.Wait()
  393. return nil
  394. }
  395. func (t *idleTask) HandleError(e error) {
  396. panic(e.Error())
  397. }