taskqueue.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. /*
  2. * ECAL
  3. *
  4. * Copyright 2020 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. package engine
  11. import (
  12. "bytes"
  13. "fmt"
  14. "math/rand"
  15. "sort"
  16. "sync"
  17. "devt.de/krotik/common/errorutil"
  18. "devt.de/krotik/common/sortutil"
  19. "devt.de/krotik/common/stringutil"
  20. "devt.de/krotik/ecal/engine/pool"
  21. "devt.de/krotik/ecal/engine/pubsub"
  22. )
  23. /*
  24. TaskError datastructure to collect all rule errors of an event.
  25. */
  26. type TaskError struct {
  27. ErrorMap map[string]error // Rule errors (rule name -> error)
  28. Event *Event // Event which caused the error
  29. Monitor Monitor // Event monitor
  30. }
  31. /*
  32. Error returns a string representation of this error.
  33. */
  34. func (te *TaskError) Error() string {
  35. var ret bytes.Buffer
  36. // Collect all errors and sort them by name
  37. errNames := make([]string, 0, len(te.ErrorMap))
  38. for name := range te.ErrorMap {
  39. errNames = append(errNames, name)
  40. }
  41. sort.Strings(errNames)
  42. ret.WriteString(fmt.Sprintf("Taskerror%v:\n", stringutil.Plural(len(errNames))))
  43. for i, name := range errNames {
  44. ret.WriteString(te.Monitor.EventPathString())
  45. ret.WriteString(fmt.Sprintf(" -> %v : %v", name, te.ErrorMap[name]))
  46. if i < len(errNames)-1 {
  47. ret.WriteString("\n")
  48. }
  49. }
  50. return ret.String()
  51. }
  52. /*
  53. Task models a task which is created and executed by the processor.
  54. */
  55. type Task struct {
  56. p Processor // Processor which created the task
  57. m Monitor // Monitor which observes the task execution
  58. e *Event // Event which caused the task creation
  59. }
  60. /*
  61. Run the task.
  62. */
  63. func (t *Task) Run() error {
  64. EventTracer.record(t.e, "Task.Run", "Running task")
  65. errors := t.p.ProcessEvent(t.e, t.m)
  66. if len(errors) > 0 {
  67. // Monitor is not declared finished until the errors have been handled
  68. EventTracer.record(t.e, "Task.Run", fmt.Sprint("Task had errors:", errors))
  69. return &TaskError{errors, t.e, t.m}
  70. }
  71. t.m.Finish()
  72. return nil
  73. }
  74. /*
  75. Returns a string representation of this task.
  76. */
  77. func (t *Task) String() string {
  78. return fmt.Sprintf("Task: %v %v %v", t.p, t.m, t.e)
  79. }
  80. /*
  81. HandleError handles an error which occurred during the run method.
  82. */
  83. func (t *Task) HandleError(e error) {
  84. t.m.SetErrors(e.(*TaskError))
  85. t.m.Finish()
  86. t.p.(*eventProcessor).notifyRootMonitorErrors(t.m.RootMonitor())
  87. }
  88. /*
  89. TaskQueue models the queue of tasks for a processor.
  90. */
  91. type TaskQueue struct {
  92. lock *sync.Mutex // Lock for queue
  93. queues map[uint64]*sortutil.PriorityQueue // Map from root monitor id to priority queue
  94. messageQueue *pubsub.EventPump // Queue for message passing between components
  95. }
  96. /*
  97. NewTaskQueue creates a new TaskQueue object.
  98. */
  99. func NewTaskQueue(ep *pubsub.EventPump) *TaskQueue {
  100. return &TaskQueue{&sync.Mutex{}, make(map[uint64]*sortutil.PriorityQueue), ep}
  101. }
  102. /*
  103. Clear the queue of all pending tasks.
  104. */
  105. func (tq *TaskQueue) Clear() {
  106. tq.lock.Lock()
  107. defer tq.lock.Unlock()
  108. tq.queues = make(map[uint64]*sortutil.PriorityQueue)
  109. }
  110. /*
  111. Pop returns the next task from the queue.
  112. */
  113. func (tq *TaskQueue) Pop() pool.Task {
  114. tq.lock.Lock()
  115. defer tq.lock.Unlock()
  116. var popQueue *sortutil.PriorityQueue
  117. var idx int
  118. // Pick a random number between 0 and len(tq.queues) - 1
  119. if lq := len(tq.queues); lq > 0 {
  120. idx = rand.Intn(lq)
  121. }
  122. // Go through all queues and pick one - clean up while we are at it
  123. for k, v := range tq.queues {
  124. if v.Size() > 0 {
  125. // Pick a random queue - pick the last if idx does not
  126. // reach 0 before the end of the iteration.
  127. idx--
  128. popQueue = v
  129. if idx <= 0 {
  130. break
  131. }
  132. } else {
  133. // Remove empty queues
  134. delete(tq.queues, k)
  135. }
  136. }
  137. if popQueue != nil {
  138. if res := popQueue.Pop(); res != nil {
  139. return res.(*Task)
  140. }
  141. }
  142. return nil
  143. }
  144. /*
  145. Push adds another task to the queue.
  146. */
  147. func (tq *TaskQueue) Push(t pool.Task) {
  148. tq.lock.Lock()
  149. defer tq.lock.Unlock()
  150. var q *sortutil.PriorityQueue
  151. var ok bool
  152. task := t.(*Task)
  153. rm := task.m.RootMonitor()
  154. id := rm.ID()
  155. if q, ok = tq.queues[id]; !ok {
  156. q = sortutil.NewPriorityQueue()
  157. tq.queues[id] = q
  158. // Add listener for finish
  159. tq.messageQueue.AddObserver(MessageRootMonitorFinished, rm,
  160. func(event string, eventSource interface{}) {
  161. tq.lock.Lock()
  162. defer tq.lock.Unlock()
  163. rm := eventSource.(*RootMonitor)
  164. q := tq.queues[rm.ID()]
  165. // Safeguard that no tasks are ever left over
  166. errorutil.AssertTrue(q == nil || q.Size() == 0,
  167. "Finished monitor left events behind")
  168. tq.messageQueue.RemoveObservers(event, eventSource)
  169. })
  170. }
  171. q.Push(task, task.m.Priority())
  172. }
  173. /*
  174. Size returns the size of the queue.
  175. */
  176. func (tq *TaskQueue) Size() int {
  177. tq.lock.Lock()
  178. defer tq.lock.Unlock()
  179. var ret int
  180. for _, q := range tq.queues {
  181. ret += q.Size()
  182. }
  183. return ret
  184. }