threadpool.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. /*
  2. * Public Domain Software
  3. *
  4. * I (Matthias Ladkau) am the author of the source code in this file.
  5. * I have placed the source code in this file in the public domain.
  6. *
  7. * For further information see: http://creativecommons.org/publicdomain/zero/1.0/
  8. */
  9. /*
  10. Package pool contains a thread pool implementation.
  11. */
  12. package pool
  13. import (
  14. "math"
  15. "sync"
  16. "time"
  17. )
  18. /*
  19. Different states of a thread pool.
  20. */
  21. const (
  22. StatusRunning = "Running"
  23. StatusStopping = "Stopping"
  24. StatusStopped = "Stopped"
  25. )
  26. /*
  27. Task is a task which should be run in a thread.
  28. */
  29. type Task interface {
  30. /*
  31. Run the task. The function gets the unique thread ID of the worker
  32. which executes the task.
  33. */
  34. Run(tid uint64) error
  35. /*
  36. HandleError handles an error which occurred during the run method.
  37. */
  38. HandleError(e error)
  39. }
  40. /*
  41. TaskQueue is a queue of tasks for a thread pool.
  42. */
  43. type TaskQueue interface {
  44. /*
  45. Clear the queue of all pending tasks
  46. */
  47. Clear()
  48. /*
  49. Pop returns the next task from the queue.
  50. */
  51. Pop() Task
  52. /*
  53. Push adds another task to the queue.
  54. */
  55. Push(t Task)
  56. /*
  57. Size returns the size of the queue.
  58. */
  59. Size() int
  60. }
  61. /*
  62. DefaultTaskQueue implements a simple (FIFO) task queue for a thread pool.
  63. */
  64. type DefaultTaskQueue struct {
  65. queue []Task
  66. }
  67. /*
  68. Clear the queue of all pending tasks
  69. */
  70. func (tq *DefaultTaskQueue) Clear() {
  71. tq.queue = make([]Task, 0)
  72. }
  73. /*
  74. Pop returns the next task from the queue.
  75. */
  76. func (tq *DefaultTaskQueue) Pop() Task {
  77. var task Task
  78. if len(tq.queue) > 0 {
  79. task = tq.queue[0]
  80. tq.queue = tq.queue[1:]
  81. }
  82. return task
  83. }
  84. /*
  85. Push adds another task to the queue.
  86. */
  87. func (tq *DefaultTaskQueue) Push(t Task) {
  88. tq.queue = append(tq.queue, t)
  89. }
  90. /*
  91. Size returns the size of the queue.
  92. */
  93. func (tq *DefaultTaskQueue) Size() int {
  94. return len(tq.queue)
  95. }
  96. /*
  97. ThreadPool creates a pool of threads which process tasks according to a given
  98. task queue. The threads are kept in an idle state if no more tasks are available.
  99. They resume immediately once a new task is added.
  100. */
  101. type ThreadPool struct {
  102. // Task regulation
  103. queue TaskQueue // Task queue for thread pool
  104. queueLock *sync.Mutex // Lock for queue
  105. // Worker regulation
  106. workerIDCount uint64 // Id counter for worker tasks
  107. workerIDLock *sync.Mutex // Lock for ID generation
  108. workerMap map[uint64]*ThreadPoolWorker // Map of all workers
  109. workerIdleMap map[uint64]*ThreadPoolWorker // Map of all idle workers
  110. workerMapLock *sync.Mutex // Lock for worker map
  111. workerKill int // Count of workers which should die
  112. newTaskCond *sync.Cond // Waiting condition for new tasks
  113. // Callbacks to regulate load
  114. RegulationLock *sync.Mutex // Lock for regulation variables
  115. TooManyThreshold int // Threshold for too many tasks
  116. TooManyCallback func() // Callback for too many tasks
  117. tooManyTriggered bool // Flag if too many tasks threshold was passed
  118. TooFewThreshold int // Threshold for too few tasks
  119. TooFewCallback func() // Callback for too few tasks
  120. tooFewTriggered bool // Flag if too many tasks threshold was passed
  121. }
  122. /*
  123. State returns the current state of the ThreadPool.
  124. */
  125. func (tp *ThreadPool) State() map[string]interface{} {
  126. getIdsFromWorkerMap := func(m map[uint64]*ThreadPoolWorker) []uint64 {
  127. var ret []uint64
  128. for k := range m {
  129. ret = append(ret, k)
  130. }
  131. return ret
  132. }
  133. tp.workerMapLock.Lock()
  134. defer tp.workerMapLock.Unlock()
  135. return map[string]interface{}{
  136. "TaskQueueSize": tp.queue.Size(),
  137. "TotalWorkerThreads": getIdsFromWorkerMap(tp.workerMap),
  138. "IdleWorkerThreads": getIdsFromWorkerMap(tp.workerIdleMap),
  139. }
  140. }
  141. /*
  142. NewThreadPool creates a new thread pool.
  143. */
  144. func NewThreadPool() *ThreadPool {
  145. return NewThreadPoolWithQueue(&DefaultTaskQueue{})
  146. }
  147. /*
  148. NewThreadPoolWithQueue creates a new thread pool with a specific task queue.
  149. */
  150. func NewThreadPoolWithQueue(q TaskQueue) *ThreadPool {
  151. return &ThreadPool{q, &sync.Mutex{},
  152. 1, &sync.Mutex{}, make(map[uint64]*ThreadPoolWorker),
  153. make(map[uint64]*ThreadPoolWorker), &sync.Mutex{},
  154. 0, sync.NewCond(&sync.Mutex{}), &sync.Mutex{},
  155. math.MaxInt32, func() {}, false, 0, func() {}, false}
  156. }
  157. /*
  158. AddTask adds a task to the thread pool.
  159. */
  160. func (tp *ThreadPool) AddTask(t Task) {
  161. tp.queueLock.Lock()
  162. defer tp.queueLock.Unlock()
  163. tp.queue.Push(t)
  164. // Reset too few flag
  165. tp.RegulationLock.Lock()
  166. if tp.tooFewTriggered && tp.TooFewThreshold < tp.queue.Size() {
  167. tp.tooFewTriggered = false
  168. }
  169. // Check too many
  170. if !tp.tooManyTriggered && tp.TooManyThreshold <= tp.queue.Size() {
  171. tp.tooManyTriggered = true
  172. tp.TooManyCallback()
  173. }
  174. tp.RegulationLock.Unlock()
  175. // Wake up a waiting worker
  176. tp.newTaskCond.Signal()
  177. }
  178. /*
  179. getTask is called by a worker to request a new task. The worker is expected to finish
  180. if this function returns nil.
  181. */
  182. func (tp *ThreadPool) getTask() Task {
  183. var returnIdleTask = true
  184. // Check if tasks should be stopped
  185. tp.workerMapLock.Lock()
  186. if tp.workerKill > 0 {
  187. tp.workerKill--
  188. tp.workerMapLock.Unlock()
  189. return nil
  190. } else if tp.workerKill == -1 {
  191. // Check for special worker kill value which is used when workers should
  192. // be killed when no more tasks are available.
  193. returnIdleTask = false
  194. }
  195. tp.workerMapLock.Unlock()
  196. // Check if there is a task available
  197. tp.queueLock.Lock()
  198. task := tp.queue.Pop()
  199. tp.queueLock.Unlock()
  200. if task != nil {
  201. return task
  202. }
  203. tp.RegulationLock.Lock()
  204. // Reset too many flag
  205. if tp.tooManyTriggered && tp.TooManyThreshold > tp.queue.Size() {
  206. tp.tooManyTriggered = false
  207. }
  208. // Check too few
  209. if !tp.tooFewTriggered && tp.TooFewThreshold >= tp.queue.Size() {
  210. tp.tooFewTriggered = true
  211. tp.TooFewCallback()
  212. }
  213. tp.RegulationLock.Unlock()
  214. if returnIdleTask {
  215. // No new task available return idle task
  216. return &idleTask{tp}
  217. }
  218. return nil
  219. }
  220. /*
  221. NewThreadID creates a new thread ID unique to this pool.
  222. */
  223. func (tp *ThreadPool) NewThreadID() uint64 {
  224. tp.workerIDLock.Lock()
  225. res := tp.workerIDCount
  226. tp.workerIDCount++
  227. tp.workerIDLock.Unlock()
  228. return res
  229. }
  230. /*
  231. SetWorkerCount sets the worker count of this pool. If the wait flag is true then
  232. this call will return after the pool has reached the requested worker count.
  233. */
  234. func (tp *ThreadPool) SetWorkerCount(count int, wait bool) {
  235. tp.workerMapLock.Lock()
  236. workerCount := len(tp.workerMap)
  237. tp.workerMapLock.Unlock()
  238. if count < 0 {
  239. count = 0
  240. }
  241. if workerCount < count {
  242. // More workers are needed
  243. tp.workerMapLock.Lock()
  244. // Make sure no more workers are killed
  245. tp.workerKill = 0
  246. for len(tp.workerMap) != count {
  247. tid := tp.NewThreadID()
  248. worker := &ThreadPoolWorker{tid, tp}
  249. go worker.run()
  250. tp.workerMap[tid] = worker
  251. }
  252. tp.workerMapLock.Unlock()
  253. } else if workerCount > count {
  254. // Fewer workers are needed
  255. tp.workerMapLock.Lock()
  256. tp.workerKill = workerCount - count
  257. tp.workerMapLock.Unlock()
  258. tp.newTaskCond.Broadcast()
  259. if wait {
  260. for true {
  261. tp.workerMapLock.Lock()
  262. workerCount = len(tp.workerMap)
  263. tp.workerMapLock.Unlock()
  264. if workerCount == count {
  265. break
  266. }
  267. time.Sleep(5 * time.Nanosecond)
  268. // Broadcast again since sine workers might be now waiting
  269. tp.newTaskCond.Broadcast()
  270. }
  271. }
  272. }
  273. // If a count was set wait until at least one worker is idle
  274. for count > 0 && len(tp.workerIdleMap) == 0 {
  275. time.Sleep(5 * time.Nanosecond)
  276. }
  277. }
  278. /*
  279. Status returns the current status of the thread pool.
  280. */
  281. func (tp *ThreadPool) Status() string {
  282. var status string
  283. tp.workerMapLock.Lock()
  284. workerCount := len(tp.workerMap)
  285. workerKill := tp.workerKill
  286. tp.workerMapLock.Unlock()
  287. if workerCount > 0 {
  288. if workerKill == -1 {
  289. status = StatusStopping
  290. } else {
  291. status = StatusRunning
  292. }
  293. } else {
  294. status = StatusStopped
  295. }
  296. return status
  297. }
  298. /*
  299. WorkerCount returns the current count of workers.
  300. */
  301. func (tp *ThreadPool) WorkerCount() int {
  302. tp.workerMapLock.Lock()
  303. defer tp.workerMapLock.Unlock()
  304. return len(tp.workerMap)
  305. }
  306. /*
  307. WaitAll waits for all workers to become idle.
  308. */
  309. func (tp *ThreadPool) WaitAll() {
  310. // Wake up all workers
  311. tp.newTaskCond.Broadcast()
  312. time.Sleep(5 * time.Nanosecond)
  313. for true {
  314. tp.workerMapLock.Lock()
  315. tp.queueLock.Lock()
  316. // Get total number of workers and idle workers
  317. workerCount := len(tp.workerMap)
  318. workerIdleCount := len(tp.workerIdleMap)
  319. // Get number of pending tasks
  320. tasks := tp.queue.Size()
  321. tp.queueLock.Unlock()
  322. tp.workerMapLock.Unlock()
  323. // Only leave this loop if either no workers are left or if all
  324. // tasks are done and all workers are idle
  325. if workerCount == 0 || (workerCount == workerIdleCount && tasks == 0) {
  326. break
  327. }
  328. time.Sleep(5 * time.Nanosecond)
  329. // Broadcast again and again until all workers are idle
  330. tp.newTaskCond.Broadcast()
  331. }
  332. }
  333. /*
  334. JoinAll processes all remaining tasks and kills off all workers afterwards.
  335. */
  336. func (tp *ThreadPool) JoinAll() {
  337. // Tell all workers to die
  338. tp.workerMapLock.Lock()
  339. tp.workerKill = -1
  340. tp.workerMapLock.Unlock()
  341. tp.newTaskCond.Broadcast()
  342. for true {
  343. tp.workerMapLock.Lock()
  344. tp.queueLock.Lock()
  345. // Get total number of workers
  346. workerCount := len(tp.workerMap)
  347. // Get number of pending tasks
  348. tasks := tp.queue.Size()
  349. tp.queueLock.Unlock()
  350. tp.workerMapLock.Unlock()
  351. // Only leave this loop if no workers are existing and all tasks are done
  352. if workerCount == 0 && tasks == 0 {
  353. break
  354. }
  355. time.Sleep(5 * time.Nanosecond)
  356. // Broadcast again and again until all workers are dead
  357. tp.newTaskCond.Broadcast()
  358. }
  359. }
  360. /*
  361. ThreadPoolWorker models a worker in the thread pool.
  362. */
  363. type ThreadPoolWorker struct {
  364. id uint64 // ID of the thread pool worker
  365. pool *ThreadPool // Thread pool of this worker
  366. }
  367. /*
  368. run lets this worker run tasks.
  369. */
  370. func (w *ThreadPoolWorker) run() {
  371. defer func() {
  372. // Remove worker from workerMap
  373. w.pool.workerMapLock.Lock()
  374. delete(w.pool.workerMap, w.id)
  375. w.pool.workerMapLock.Unlock()
  376. }()
  377. for true {
  378. // Try to get the next task
  379. task := w.pool.getTask()
  380. // Exit if there is not new task
  381. if task == nil {
  382. break
  383. }
  384. _, isIdleTask := task.(*idleTask)
  385. if isIdleTask {
  386. // Register this worker as idle
  387. w.pool.workerMapLock.Lock()
  388. w.pool.workerIdleMap[w.id] = w
  389. w.pool.workerMapLock.Unlock()
  390. }
  391. // Run the task
  392. if err := task.Run(w.id); err != nil {
  393. task.HandleError(err)
  394. }
  395. if isIdleTask {
  396. w.pool.workerMapLock.Lock()
  397. delete(w.pool.workerIdleMap, w.id)
  398. w.pool.workerMapLock.Unlock()
  399. }
  400. }
  401. }
  402. /*
  403. idleTask is the internal idle task.
  404. */
  405. type idleTask struct {
  406. tp *ThreadPool
  407. }
  408. /*
  409. Run the idle task.
  410. */
  411. func (t *idleTask) Run(tid uint64) error {
  412. t.tp.newTaskCond.L.Lock()
  413. defer t.tp.newTaskCond.L.Unlock()
  414. t.tp.newTaskCond.Wait()
  415. return nil
  416. }
  417. func (t *idleTask) HandleError(e error) {
  418. panic(e.Error())
  419. }