123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537 |
- /*
- * Public Domain Software
- *
- * I (Matthias Ladkau) am the author of the source code in this file.
- * I have placed the source code in this file in the public domain.
- *
- * For further information see: http://creativecommons.org/publicdomain/zero/1.0/
- */
- package pools
- import (
- "math"
- "sync"
- "time"
- )
- /*
- Different states of a thread pool.
- */
- const (
- StatusRunning = "Running"
- StatusStopping = "Stopping"
- StatusStopped = "Stopped"
- )
- /*
- Task is a task which should be run in a thread.
- */
- type Task interface {
- /*
- Run the task. The function gets the unique thread ID of the worker
- which executes the task.
- */
- Run(tid uint64) error
- /*
- HandleError handles an error which occurred during the run method.
- */
- HandleError(e error)
- }
- /*
- TaskQueue is a queue of tasks for a thread pool.
- */
- type TaskQueue interface {
- /*
- Clear the queue of all pending tasks
- */
- Clear()
- /*
- Pop returns the next task from the queue.
- */
- Pop() Task
- /*
- Push adds another task to the queue.
- */
- Push(t Task)
- /*
- Size returns the size of the queue.
- */
- Size() int
- }
- /*
- DefaultTaskQueue implements a simple (FIFO) task queue for a thread pool.
- */
- type DefaultTaskQueue struct {
- queue []Task
- }
- /*
- Clear the queue of all pending tasks
- */
- func (tq *DefaultTaskQueue) Clear() {
- tq.queue = make([]Task, 0)
- }
- /*
- Pop returns the next task from the queue.
- */
- func (tq *DefaultTaskQueue) Pop() Task {
- var task Task
- if len(tq.queue) > 0 {
- task = tq.queue[0]
- tq.queue = tq.queue[1:]
- }
- return task
- }
- /*
- Push adds another task to the queue.
- */
- func (tq *DefaultTaskQueue) Push(t Task) {
- tq.queue = append(tq.queue, t)
- }
- /*
- Size returns the size of the queue.
- */
- func (tq *DefaultTaskQueue) Size() int {
- return len(tq.queue)
- }
- /*
- ThreadPool creates a pool of threads which process tasks according to a given
- task queue. The threads are kept in an idle state if no more tasks are available.
- They resume immediately once a new task is added.
- */
- type ThreadPool struct {
- // Task regulation
- queue TaskQueue // Task queue for thread pool
- queueLock *sync.Mutex // Lock for queue
- // Worker regulation
- workerIDCount uint64 // Id counter for worker tasks
- workerIDLock *sync.Mutex // Lock for ID generation
- workerMap map[uint64]*ThreadPoolWorker // Map of all workers
- workerIdleMap map[uint64]*ThreadPoolWorker // Map of all idle workers
- workerMapLock *sync.Mutex // Lock for worker map
- workerKill int // Count of workers which should die
- newTaskCond *sync.Cond // Waiting condition for new tasks
- // Callbacks to regulate load
- RegulationLock *sync.Mutex // Lock for regulation variables
- TooManyThreshold int // Threshold for too many tasks
- TooManyCallback func() // Callback for too many tasks
- tooManyTriggered bool // Flag if too many tasks threshold was passed
- TooFewThreshold int // Threshold for too few tasks
- TooFewCallback func() // Callback for too few tasks
- tooFewTriggered bool // Flag if too many tasks threshold was passed
- }
- /*
- NewThreadPool creates a new thread pool.
- */
- func NewThreadPool() *ThreadPool {
- return NewThreadPoolWithQueue(&DefaultTaskQueue{})
- }
- /*
- NewThreadPoolWithQueue creates a new thread pool with a specific task queue.
- */
- func NewThreadPoolWithQueue(q TaskQueue) *ThreadPool {
- return &ThreadPool{q, &sync.Mutex{},
- 1, &sync.Mutex{}, make(map[uint64]*ThreadPoolWorker),
- make(map[uint64]*ThreadPoolWorker), &sync.Mutex{},
- 0, sync.NewCond(&sync.Mutex{}), &sync.Mutex{},
- math.MaxInt32, func() {}, false, 0, func() {}, false}
- }
- /*
- AddTask adds a task to the thread pool.
- */
- func (tp *ThreadPool) AddTask(t Task) {
- tp.queueLock.Lock()
- defer tp.queueLock.Unlock()
- tp.queue.Push(t)
- // Reset too few flag
- tp.RegulationLock.Lock()
- if tp.tooFewTriggered && tp.TooFewThreshold < tp.queue.Size() {
- tp.tooFewTriggered = false
- }
- // Check too many
- if !tp.tooManyTriggered && tp.TooManyThreshold <= tp.queue.Size() {
- tp.tooManyTriggered = true
- tp.TooManyCallback()
- }
- tp.RegulationLock.Unlock()
- // Wake up a waiting worker
- tp.newTaskCond.Signal()
- }
- /*
- getTask is called by a worker to request a new task. The worker is expected to finish
- if this function returns nil.
- */
- func (tp *ThreadPool) getTask() Task {
- var returnIdleTask = true
- // Check if tasks should be stopped
- tp.workerMapLock.Lock()
- if tp.workerKill > 0 {
- tp.workerKill--
- tp.workerMapLock.Unlock()
- return nil
- } else if tp.workerKill == -1 {
- // Check for special worker kill value which is used when workers should
- // be killed when no more tasks are available.
- returnIdleTask = false
- }
- tp.workerMapLock.Unlock()
- // Check if there is a task available
- tp.queueLock.Lock()
- task := tp.queue.Pop()
- tp.queueLock.Unlock()
- if task != nil {
- return task
- }
- tp.RegulationLock.Lock()
- // Reset too many flag
- if tp.tooManyTriggered && tp.TooManyThreshold > tp.queue.Size() {
- tp.tooManyTriggered = false
- }
- // Check too few
- if !tp.tooFewTriggered && tp.TooFewThreshold >= tp.queue.Size() {
- tp.tooFewTriggered = true
- tp.TooFewCallback()
- }
- tp.RegulationLock.Unlock()
- if returnIdleTask {
- // No new task available return idle task
- return &idleTask{tp}
- }
- return nil
- }
- /*
- NewThreadID creates a new thread ID unique to this pool.
- */
- func (tp *ThreadPool) NewThreadID() uint64 {
- tp.workerIDLock.Lock()
- res := tp.workerIDCount
- tp.workerIDCount++
- tp.workerIDLock.Unlock()
- return res
- }
- /*
- SetWorkerCount sets the worker count of this pool. If the wait flag is true then
- this call will return after the pool has reached the requested worker count.
- */
- func (tp *ThreadPool) SetWorkerCount(count int, wait bool) {
- tp.workerMapLock.Lock()
- workerCount := len(tp.workerMap)
- tp.workerMapLock.Unlock()
- if count < 0 {
- count = 0
- }
- if workerCount < count {
- // More workers are needed
- tp.workerMapLock.Lock()
- // Make sure no more workers are killed
- tp.workerKill = 0
- for len(tp.workerMap) != count {
- tid := tp.NewThreadID()
- worker := &ThreadPoolWorker{tid, tp}
- go worker.run()
- tp.workerMap[tid] = worker
- }
- tp.workerMapLock.Unlock()
- } else if workerCount > count {
- // Fewer workers are needed
- tp.workerMapLock.Lock()
- tp.workerKill = workerCount - count
- tp.workerMapLock.Unlock()
- tp.newTaskCond.Broadcast()
- if wait {
- for true {
- tp.workerMapLock.Lock()
- workerCount = len(tp.workerMap)
- tp.workerMapLock.Unlock()
- if workerCount == count {
- break
- }
- time.Sleep(5 * time.Nanosecond)
- // Broadcast again since sine workers might be now waiting
- tp.newTaskCond.Broadcast()
- }
- }
- }
- // If a count was set wait until at least one worker is idle
- for count > 0 && len(tp.workerIdleMap) == 0 {
- time.Sleep(5 * time.Nanosecond)
- }
- }
- /*
- Status returns the current status of the thread pool.
- */
- func (tp *ThreadPool) Status() string {
- var status string
- tp.workerMapLock.Lock()
- workerCount := len(tp.workerMap)
- workerKill := tp.workerKill
- tp.workerMapLock.Unlock()
- if workerCount > 0 {
- if workerKill == -1 {
- status = StatusStopping
- } else {
- status = StatusRunning
- }
- } else {
- status = StatusStopped
- }
- return status
- }
- /*
- WorkerCount returns the current count of workers.
- */
- func (tp *ThreadPool) WorkerCount() int {
- tp.workerMapLock.Lock()
- defer tp.workerMapLock.Unlock()
- return len(tp.workerMap)
- }
- /*
- WaitAll waits for all workers to become idle.
- */
- func (tp *ThreadPool) WaitAll() {
- // Wake up all workers
- tp.newTaskCond.Broadcast()
- time.Sleep(5 * time.Nanosecond)
- for true {
- tp.workerMapLock.Lock()
- tp.queueLock.Lock()
- // Get total number of workers and idle workers
- workerCount := len(tp.workerMap)
- workerIdleCount := len(tp.workerIdleMap)
- // Get number of pending tasks
- tasks := tp.queue.Size()
- tp.queueLock.Unlock()
- tp.workerMapLock.Unlock()
- // Only leave this loop if either no workers are left or if all
- // tasks are done and all workers are idle
- if workerCount == 0 || (workerCount == workerIdleCount && tasks == 0) {
- break
- }
- time.Sleep(5 * time.Nanosecond)
- // Broadcast again and again until all workers are idle
- tp.newTaskCond.Broadcast()
- }
- }
- /*
- JoinAll processes all remaining tasks and kills off all workers afterwards.
- */
- func (tp *ThreadPool) JoinAll() {
- // Tell all workers to die
- tp.workerMapLock.Lock()
- tp.workerKill = -1
- tp.workerMapLock.Unlock()
- tp.newTaskCond.Broadcast()
- for true {
- tp.workerMapLock.Lock()
- tp.queueLock.Lock()
- // Get total number of workers
- workerCount := len(tp.workerMap)
- // Get number of pending tasks
- tasks := tp.queue.Size()
- tp.queueLock.Unlock()
- tp.workerMapLock.Unlock()
- // Only leave this loop if no workers are existing and all tasks are done
- if workerCount == 0 && tasks == 0 {
- break
- }
- time.Sleep(5 * time.Nanosecond)
- // Broadcast again and again until all workers are dead
- tp.newTaskCond.Broadcast()
- }
- }
- /*
- ThreadPoolWorker models a worker in the thread pool.
- */
- type ThreadPoolWorker struct {
- id uint64 // ID of the thread pool worker
- pool *ThreadPool // Thread pool of this worker
- }
- /*
- run lets this worker run tasks.
- */
- func (w *ThreadPoolWorker) run() {
- defer func() {
- // Remove worker from workerMap
- w.pool.workerMapLock.Lock()
- delete(w.pool.workerMap, w.id)
- w.pool.workerMapLock.Unlock()
- }()
- for true {
- // Try to get the next task
- task := w.pool.getTask()
- // Exit if there is not new task
- if task == nil {
- break
- }
- _, isIdleTask := task.(*idleTask)
- if isIdleTask {
- // Register this worker as idle
- w.pool.workerMapLock.Lock()
- w.pool.workerIdleMap[w.id] = w
- w.pool.workerMapLock.Unlock()
- }
- // Run the task
- if err := task.Run(w.id); err != nil {
- task.HandleError(err)
- }
- if isIdleTask {
- w.pool.workerMapLock.Lock()
- delete(w.pool.workerIdleMap, w.id)
- w.pool.workerMapLock.Unlock()
- }
- }
- }
- /*
- idleTask is the internal idle task.
- */
- type idleTask struct {
- tp *ThreadPool
- }
- /*
- Run the idle task.
- */
- func (t *idleTask) Run(tid uint64) error {
- t.tp.newTaskCond.L.Lock()
- defer t.tp.newTaskCond.L.Unlock()
- t.tp.newTaskCond.Wait()
- return nil
- }
- func (t *idleTask) HandleError(e error) {
- panic(e.Error())
- }
|