taskqueue.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. /*
  2. * ECAL
  3. *
  4. * Copyright 2020 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. package engine
  11. import (
  12. "bytes"
  13. "fmt"
  14. "math/rand"
  15. "sort"
  16. "sync"
  17. "devt.de/krotik/common/errorutil"
  18. "devt.de/krotik/common/sortutil"
  19. "devt.de/krotik/common/stringutil"
  20. "devt.de/krotik/ecal/engine/pool"
  21. "devt.de/krotik/ecal/engine/pubsub"
  22. )
  23. /*
  24. TaskError datastructure to collect all rule errors of an event.
  25. */
  26. type TaskError struct {
  27. ErrorMap map[string]error // Rule errors (rule name -> error)
  28. Event *Event // Event which caused the error
  29. Monitor Monitor // Event monitor
  30. }
  31. /*
  32. Error returns a string representation of this error.
  33. */
  34. func (te *TaskError) Error() string {
  35. var ret bytes.Buffer
  36. // Collect all errors and sort them by name
  37. errNames := make([]string, 0, len(te.ErrorMap))
  38. for name := range te.ErrorMap {
  39. errNames = append(errNames, name)
  40. }
  41. sort.Strings(errNames)
  42. ret.WriteString(fmt.Sprintf("Taskerror%v:\n", stringutil.Plural(len(errNames))))
  43. for i, name := range errNames {
  44. ret.WriteString(te.Monitor.EventPathString())
  45. ret.WriteString(fmt.Sprintf(" -> %v : %v", name, te.ErrorMap[name]))
  46. if i < len(errNames)-1 {
  47. ret.WriteString("\n")
  48. }
  49. }
  50. return ret.String()
  51. }
  52. /*
  53. Task models a task which is created and executed by the processor.
  54. */
  55. type Task struct {
  56. p Processor // Processor which created the task
  57. m Monitor // Monitor which observes the task execution
  58. e *Event // Event which caused the task creation
  59. }
  60. /*
  61. Run the task.
  62. */
  63. func (t *Task) Run() error {
  64. EventTracer.record(t.e, "Task.Run", "Running task")
  65. errors := t.p.ProcessEvent(t.e, t.m)
  66. if len(errors) > 0 {
  67. EventTracer.record(t.e, "Task.Run", fmt.Sprint("Task had errors:", errors))
  68. return &TaskError{errors, t.e, t.m}
  69. }
  70. return nil
  71. }
  72. /*
  73. Returns a string representation of this task.
  74. */
  75. func (t *Task) String() string {
  76. return fmt.Sprintf("Task: %v %v %v", t.p, t.m, t.e)
  77. }
  78. /*
  79. HandleError handles an error which occurred during the run method.
  80. */
  81. func (t *Task) HandleError(e error) {
  82. t.m.SetErrors(e.(*TaskError))
  83. t.p.(*eventProcessor).notifyRootMonitorErrors(t.m.RootMonitor())
  84. }
  85. /*
  86. TaskQueue models the queue of tasks for a processor.
  87. */
  88. type TaskQueue struct {
  89. lock *sync.Mutex // Lock for queue
  90. queues map[uint64]*sortutil.PriorityQueue // Map from root monitor id to priority queue
  91. messageQueue *pubsub.EventPump // Queue for message passing between components
  92. }
  93. /*
  94. NewTaskQueue creates a new TaskQueue object.
  95. */
  96. func NewTaskQueue(ep *pubsub.EventPump) *TaskQueue {
  97. return &TaskQueue{&sync.Mutex{}, make(map[uint64]*sortutil.PriorityQueue), ep}
  98. }
  99. /*
  100. Clear the queue of all pending tasks.
  101. */
  102. func (tq *TaskQueue) Clear() {
  103. tq.lock.Lock()
  104. defer tq.lock.Unlock()
  105. tq.queues = make(map[uint64]*sortutil.PriorityQueue)
  106. }
  107. /*
  108. Pop returns the next task from the queue.
  109. */
  110. func (tq *TaskQueue) Pop() pool.Task {
  111. tq.lock.Lock()
  112. defer tq.lock.Unlock()
  113. var popQueue *sortutil.PriorityQueue
  114. var idx int
  115. // Pick a random number between 0 and len(tq.queues) - 1
  116. if lq := len(tq.queues); lq > 0 {
  117. idx = rand.Intn(lq)
  118. }
  119. // Go through all queues and pick one - clean up while we are at it
  120. for k, v := range tq.queues {
  121. if v.Size() > 0 {
  122. // Pick a random queue - pick the last if idx does not
  123. // reach 0 before the end of the iteration.
  124. idx--
  125. popQueue = v
  126. if idx <= 0 {
  127. break
  128. }
  129. } else {
  130. // Remove empty queues
  131. delete(tq.queues, k)
  132. }
  133. }
  134. if popQueue != nil {
  135. if res := popQueue.Pop(); res != nil {
  136. return res.(*Task)
  137. }
  138. }
  139. return nil
  140. }
  141. /*
  142. Push adds another task to the queue.
  143. */
  144. func (tq *TaskQueue) Push(t pool.Task) {
  145. tq.lock.Lock()
  146. defer tq.lock.Unlock()
  147. var q *sortutil.PriorityQueue
  148. var ok bool
  149. task := t.(*Task)
  150. rm := task.m.RootMonitor()
  151. id := rm.ID()
  152. if q, ok = tq.queues[id]; !ok {
  153. q = sortutil.NewPriorityQueue()
  154. tq.queues[id] = q
  155. // Add listener for finish
  156. tq.messageQueue.AddObserver(MessageRootMonitorFinished, rm,
  157. func(event string, eventSource interface{}) {
  158. tq.lock.Lock()
  159. defer tq.lock.Unlock()
  160. rm := eventSource.(*RootMonitor)
  161. q := tq.queues[rm.ID()]
  162. // Safeguard that no tasks are ever left over
  163. errorutil.AssertTrue(q == nil || q.Size() == 0,
  164. "Finished monitor left events behind")
  165. tq.messageQueue.RemoveObservers(event, eventSource)
  166. })
  167. }
  168. q.Push(task, task.m.Priority())
  169. }
  170. /*
  171. Size returns the size of the queue.
  172. */
  173. func (tq *TaskQueue) Size() int {
  174. tq.lock.Lock()
  175. defer tq.lock.Unlock()
  176. var ret int
  177. for _, q := range tq.queues {
  178. ret += q.Size()
  179. }
  180. return ret
  181. }