threadpool.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. /*
  2. * Public Domain Software
  3. *
  4. * I (Matthias Ladkau) am the author of the source code in this file.
  5. * I have placed the source code in this file in the public domain.
  6. *
  7. * For further information see: http://creativecommons.org/publicdomain/zero/1.0/
  8. */
  9. /*
  10. Package pool contains a thread pool implementation.
  11. */
  12. package pool
  13. import (
  14. "math"
  15. "sync"
  16. "time"
  17. )
  18. /*
  19. Different states of a thread pool.
  20. */
  21. const (
  22. StatusRunning = "Running"
  23. StatusStopping = "Stopping"
  24. StatusStopped = "Stopped"
  25. )
  26. /*
  27. Task is a task which should be run in a thread.
  28. */
  29. type Task interface {
  30. /*
  31. Run the task. The function gets the unique thread ID of the worker
  32. which executes the task.
  33. */
  34. Run(tid uint64) error
  35. /*
  36. HandleError handles an error which occurred during the run method.
  37. */
  38. HandleError(e error)
  39. }
  40. /*
  41. TaskQueue is a queue of tasks for a thread pool.
  42. */
  43. type TaskQueue interface {
  44. /*
  45. Clear the queue of all pending tasks
  46. */
  47. Clear()
  48. /*
  49. Pop returns the next task from the queue.
  50. */
  51. Pop() Task
  52. /*
  53. Push adds another task to the queue.
  54. */
  55. Push(t Task)
  56. /*
  57. Size returns the size of the queue.
  58. */
  59. Size() int
  60. }
  61. /*
  62. DefaultTaskQueue implements a simple (FIFO) task queue for a thread pool.
  63. */
  64. type DefaultTaskQueue struct {
  65. queue []Task
  66. }
  67. /*
  68. Clear the queue of all pending tasks
  69. */
  70. func (tq *DefaultTaskQueue) Clear() {
  71. tq.queue = make([]Task, 0)
  72. }
  73. /*
  74. Pop returns the next task from the queue.
  75. */
  76. func (tq *DefaultTaskQueue) Pop() Task {
  77. var task Task
  78. if len(tq.queue) > 0 {
  79. task = tq.queue[0]
  80. tq.queue = tq.queue[1:]
  81. }
  82. return task
  83. }
  84. /*
  85. Push adds another task to the queue.
  86. */
  87. func (tq *DefaultTaskQueue) Push(t Task) {
  88. tq.queue = append(tq.queue, t)
  89. }
  90. /*
  91. Size returns the size of the queue.
  92. */
  93. func (tq *DefaultTaskQueue) Size() int {
  94. return len(tq.queue)
  95. }
  96. /*
  97. ThreadPool creates a pool of threads which process tasks according to a given
  98. task queue. The threads are kept in an idle state if no more tasks are available.
  99. They resume immediately once a new task is added.
  100. */
  101. type ThreadPool struct {
  102. // Task regulation
  103. queue TaskQueue // Task queue for thread pool
  104. queueLock *sync.Mutex // Lock for queue
  105. // Worker regulation
  106. workerIDCount uint64 // Id counter for worker tasks
  107. workerIDLock *sync.Mutex // Lock for ID generation
  108. workerMap map[uint64]*ThreadPoolWorker // Map of all workers
  109. workerIdleMap map[uint64]*ThreadPoolWorker // Map of all idle workers
  110. workerMapLock *sync.Mutex // Lock for worker map
  111. workerKill int // Count of workers which should die
  112. newTaskCond *sync.Cond // Waiting condition for new tasks
  113. // Callbacks to regulate load
  114. RegulationLock *sync.Mutex // Lock for regulation variables
  115. TooManyThreshold int // Threshold for too many tasks
  116. TooManyCallback func() // Callback for too many tasks
  117. tooManyTriggered bool // Flag if too many tasks threshold was passed
  118. TooFewThreshold int // Threshold for too few tasks
  119. TooFewCallback func() // Callback for too few tasks
  120. tooFewTriggered bool // Flag if too many tasks threshold was passed
  121. }
  122. /*
  123. NewThreadPool creates a new thread pool.
  124. */
  125. func NewThreadPool() *ThreadPool {
  126. return NewThreadPoolWithQueue(&DefaultTaskQueue{})
  127. }
  128. /*
  129. NewThreadPoolWithQueue creates a new thread pool with a specific task queue.
  130. */
  131. func NewThreadPoolWithQueue(q TaskQueue) *ThreadPool {
  132. return &ThreadPool{q, &sync.Mutex{},
  133. 1, &sync.Mutex{}, make(map[uint64]*ThreadPoolWorker),
  134. make(map[uint64]*ThreadPoolWorker), &sync.Mutex{},
  135. 0, sync.NewCond(&sync.Mutex{}), &sync.Mutex{},
  136. math.MaxInt32, func() {}, false, 0, func() {}, false}
  137. }
  138. /*
  139. AddTask adds a task to the thread pool.
  140. */
  141. func (tp *ThreadPool) AddTask(t Task) {
  142. tp.queueLock.Lock()
  143. defer tp.queueLock.Unlock()
  144. tp.queue.Push(t)
  145. // Reset too few flag
  146. tp.RegulationLock.Lock()
  147. if tp.tooFewTriggered && tp.TooFewThreshold < tp.queue.Size() {
  148. tp.tooFewTriggered = false
  149. }
  150. // Check too many
  151. if !tp.tooManyTriggered && tp.TooManyThreshold <= tp.queue.Size() {
  152. tp.tooManyTriggered = true
  153. tp.TooManyCallback()
  154. }
  155. tp.RegulationLock.Unlock()
  156. // Wake up a waiting worker
  157. tp.newTaskCond.Signal()
  158. }
  159. /*
  160. getTask is called by a worker to request a new task. The worker is expected to finish
  161. if this function returns nil.
  162. */
  163. func (tp *ThreadPool) getTask() Task {
  164. var returnIdleTask = true
  165. // Check if tasks should be stopped
  166. tp.workerMapLock.Lock()
  167. if tp.workerKill > 0 {
  168. tp.workerKill--
  169. tp.workerMapLock.Unlock()
  170. return nil
  171. } else if tp.workerKill == -1 {
  172. // Check for special worker kill value which is used when workers should
  173. // be killed when no more tasks are available.
  174. returnIdleTask = false
  175. }
  176. tp.workerMapLock.Unlock()
  177. // Check if there is a task available
  178. tp.queueLock.Lock()
  179. task := tp.queue.Pop()
  180. tp.queueLock.Unlock()
  181. if task != nil {
  182. return task
  183. }
  184. tp.RegulationLock.Lock()
  185. // Reset too many flag
  186. if tp.tooManyTriggered && tp.TooManyThreshold > tp.queue.Size() {
  187. tp.tooManyTriggered = false
  188. }
  189. // Check too few
  190. if !tp.tooFewTriggered && tp.TooFewThreshold >= tp.queue.Size() {
  191. tp.tooFewTriggered = true
  192. tp.TooFewCallback()
  193. }
  194. tp.RegulationLock.Unlock()
  195. if returnIdleTask {
  196. // No new task available return idle task
  197. return &idleTask{tp}
  198. }
  199. return nil
  200. }
  201. /*
  202. NewThreadID creates a new thread ID unique to this pool.
  203. */
  204. func (tp *ThreadPool) NewThreadID() uint64 {
  205. tp.workerIDLock.Lock()
  206. res := tp.workerIDCount
  207. tp.workerIDCount++
  208. tp.workerIDLock.Unlock()
  209. return res
  210. }
  211. /*
  212. SetWorkerCount sets the worker count of this pool. If the wait flag is true then
  213. this call will return after the pool has reached the requested worker count.
  214. */
  215. func (tp *ThreadPool) SetWorkerCount(count int, wait bool) {
  216. tp.workerMapLock.Lock()
  217. workerCount := len(tp.workerMap)
  218. tp.workerMapLock.Unlock()
  219. if count < 0 {
  220. count = 0
  221. }
  222. if workerCount < count {
  223. // More workers are needed
  224. tp.workerMapLock.Lock()
  225. // Make sure no more workers are killed
  226. tp.workerKill = 0
  227. for len(tp.workerMap) != count {
  228. tid := tp.NewThreadID()
  229. worker := &ThreadPoolWorker{tid, tp}
  230. go worker.run()
  231. tp.workerMap[tid] = worker
  232. }
  233. tp.workerMapLock.Unlock()
  234. } else if workerCount > count {
  235. // Fewer workers are needed
  236. tp.workerMapLock.Lock()
  237. tp.workerKill = workerCount - count
  238. tp.workerMapLock.Unlock()
  239. tp.newTaskCond.Broadcast()
  240. if wait {
  241. for true {
  242. tp.workerMapLock.Lock()
  243. workerCount = len(tp.workerMap)
  244. tp.workerMapLock.Unlock()
  245. if workerCount == count {
  246. break
  247. }
  248. time.Sleep(5 * time.Nanosecond)
  249. // Broadcast again since sine workers might be now waiting
  250. tp.newTaskCond.Broadcast()
  251. }
  252. }
  253. }
  254. // If a count was set wait until at least one worker is idle
  255. for count > 0 && len(tp.workerIdleMap) == 0 {
  256. time.Sleep(5 * time.Nanosecond)
  257. }
  258. }
  259. /*
  260. Status returns the current status of the thread pool.
  261. */
  262. func (tp *ThreadPool) Status() string {
  263. var status string
  264. tp.workerMapLock.Lock()
  265. workerCount := len(tp.workerMap)
  266. workerKill := tp.workerKill
  267. tp.workerMapLock.Unlock()
  268. if workerCount > 0 {
  269. if workerKill == -1 {
  270. status = StatusStopping
  271. } else {
  272. status = StatusRunning
  273. }
  274. } else {
  275. status = StatusStopped
  276. }
  277. return status
  278. }
  279. /*
  280. WorkerCount returns the current count of workers.
  281. */
  282. func (tp *ThreadPool) WorkerCount() int {
  283. tp.workerMapLock.Lock()
  284. defer tp.workerMapLock.Unlock()
  285. return len(tp.workerMap)
  286. }
  287. /*
  288. WaitAll waits for all workers to become idle.
  289. */
  290. func (tp *ThreadPool) WaitAll() {
  291. // Wake up all workers
  292. tp.newTaskCond.Broadcast()
  293. time.Sleep(5 * time.Nanosecond)
  294. for true {
  295. tp.workerMapLock.Lock()
  296. tp.queueLock.Lock()
  297. // Get total number of workers and idle workers
  298. workerCount := len(tp.workerMap)
  299. workerIdleCount := len(tp.workerIdleMap)
  300. // Get number of pending tasks
  301. tasks := tp.queue.Size()
  302. tp.queueLock.Unlock()
  303. tp.workerMapLock.Unlock()
  304. // Only leave this loop if either no workers are left or if all
  305. // tasks are done and all workers are idle
  306. if workerCount == 0 || (workerCount == workerIdleCount && tasks == 0) {
  307. break
  308. }
  309. time.Sleep(5 * time.Nanosecond)
  310. // Broadcast again and again until all workers are idle
  311. tp.newTaskCond.Broadcast()
  312. }
  313. }
  314. /*
  315. JoinAll processes all remaining tasks and kills off all workers afterwards.
  316. */
  317. func (tp *ThreadPool) JoinAll() {
  318. // Tell all workers to die
  319. tp.workerMapLock.Lock()
  320. tp.workerKill = -1
  321. tp.workerMapLock.Unlock()
  322. tp.newTaskCond.Broadcast()
  323. for true {
  324. tp.workerMapLock.Lock()
  325. tp.queueLock.Lock()
  326. // Get total number of workers
  327. workerCount := len(tp.workerMap)
  328. // Get number of pending tasks
  329. tasks := tp.queue.Size()
  330. tp.queueLock.Unlock()
  331. tp.workerMapLock.Unlock()
  332. // Only leave this loop if no workers are existing and all tasks are done
  333. if workerCount == 0 && tasks == 0 {
  334. break
  335. }
  336. time.Sleep(5 * time.Nanosecond)
  337. // Broadcast again and again until all workers are dead
  338. tp.newTaskCond.Broadcast()
  339. }
  340. }
  341. /*
  342. ThreadPoolWorker models a worker in the thread pool.
  343. */
  344. type ThreadPoolWorker struct {
  345. id uint64 // ID of the thread pool worker
  346. pool *ThreadPool // Thread pool of this worker
  347. }
  348. /*
  349. run lets this worker run tasks.
  350. */
  351. func (w *ThreadPoolWorker) run() {
  352. defer func() {
  353. // Remove worker from workerMap
  354. w.pool.workerMapLock.Lock()
  355. delete(w.pool.workerMap, w.id)
  356. w.pool.workerMapLock.Unlock()
  357. }()
  358. for true {
  359. // Try to get the next task
  360. task := w.pool.getTask()
  361. // Exit if there is not new task
  362. if task == nil {
  363. break
  364. }
  365. _, isIdleTask := task.(*idleTask)
  366. if isIdleTask {
  367. // Register this worker as idle
  368. w.pool.workerMapLock.Lock()
  369. w.pool.workerIdleMap[w.id] = w
  370. w.pool.workerMapLock.Unlock()
  371. }
  372. // Run the task
  373. if err := task.Run(w.id); err != nil {
  374. task.HandleError(err)
  375. }
  376. if isIdleTask {
  377. w.pool.workerMapLock.Lock()
  378. delete(w.pool.workerIdleMap, w.id)
  379. w.pool.workerMapLock.Unlock()
  380. }
  381. }
  382. }
  383. /*
  384. idleTask is the internal idle task.
  385. */
  386. type idleTask struct {
  387. tp *ThreadPool
  388. }
  389. /*
  390. Run the idle task.
  391. */
  392. func (t *idleTask) Run(tid uint64) error {
  393. t.tp.newTaskCond.L.Lock()
  394. defer t.tp.newTaskCond.L.Unlock()
  395. t.tp.newTaskCond.Wait()
  396. return nil
  397. }
  398. func (t *idleTask) HandleError(e error) {
  399. panic(e.Error())
  400. }