monitor.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. /*
  2. * ECAL
  3. *
  4. * Copyright 2020 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. package engine
  11. import (
  12. "bytes"
  13. "container/heap"
  14. "fmt"
  15. "sync"
  16. "devt.de/krotik/common/errorutil"
  17. "devt.de/krotik/common/sortutil"
  18. "devt.de/krotik/ecal/engine/pubsub"
  19. )
  20. /*
  21. Monitor monitors events as they are cascading. Event cascades will produce tree
  22. structures.
  23. */
  24. type Monitor interface {
  25. /*
  26. ID returns the monitor ID.
  27. */
  28. ID() uint64
  29. /*
  30. NewChildMonitor creates a new child monitor of this monitor.
  31. */
  32. NewChildMonitor(priority int) Monitor
  33. /*
  34. Scope returns the rule scope of this monitor.
  35. */
  36. Scope() *RuleScope
  37. /*
  38. Priority returns the monitors priority.
  39. */
  40. Priority() int
  41. /*
  42. Activated returns if this monitor has been activated.
  43. */
  44. IsActivated() bool
  45. /*
  46. Activate activates this monitor with a given event.
  47. */
  48. Activate(e *Event)
  49. /*
  50. Skip finishes this monitor without activation.
  51. */
  52. Skip(e *Event)
  53. /*
  54. Finish finishes this monitor.
  55. */
  56. Finish()
  57. /*
  58. Returns the root monitor of this monitor.
  59. */
  60. RootMonitor() *RootMonitor
  61. /*
  62. Errors returns the error object of this monitor.
  63. */
  64. Errors() *TaskError
  65. /*
  66. SetErrors adds an error object to this monitor.
  67. */
  68. SetErrors(e *TaskError)
  69. /*
  70. EventPath returns the chain of events which created this monitor.
  71. */
  72. EventPath() []*Event
  73. /*
  74. EventPathString returns the event path as a string.
  75. */
  76. EventPathString() string
  77. /*
  78. String returns a string representation of this monitor.
  79. */
  80. String() string
  81. }
  82. /*
  83. monitorBase provides the basic functions and fields for any monitor.
  84. */
  85. type monitorBase struct {
  86. id uint64 // Monitor ID
  87. Parent *monitorBase // Parent monitor
  88. Context map[string]interface{} // Context object of the monitor
  89. Err *TaskError // Errors which occurred during event processing
  90. priority int // Priority of the monitor
  91. rootMonitor *RootMonitor // Root monitor
  92. event *Event // Event which activated this monitor
  93. activated bool // Flag indicating if the monitor was activated
  94. finished bool // Flag indicating if the monitor has finished
  95. }
  96. /*
  97. NewMonitor creates a new monitor.
  98. */
  99. func newMonitorBase(priority int, parent *monitorBase, context map[string]interface{}) *monitorBase {
  100. var ret *monitorBase
  101. if parent != nil {
  102. ret = &monitorBase{newMonID(), parent, context, nil, priority, parent.rootMonitor, nil, false, false}
  103. } else {
  104. ret = &monitorBase{newMonID(), nil, context, nil, priority, nil, nil, false, false}
  105. }
  106. return ret
  107. }
  108. /*
  109. NewChildMonitor creates a new child monitor of this monitor.
  110. */
  111. func (mb *monitorBase) NewChildMonitor(priority int) Monitor {
  112. child := &ChildMonitor{newMonitorBase(priority, mb, mb.Context)}
  113. mb.rootMonitor.descendantCreated(child)
  114. return child
  115. }
  116. /*
  117. ID returns the monitor ID.
  118. */
  119. func (mb *monitorBase) ID() uint64 {
  120. return mb.id
  121. }
  122. /*
  123. RootMonitor returns the root monitor of this monitor.
  124. */
  125. func (mb *monitorBase) RootMonitor() *RootMonitor {
  126. return mb.rootMonitor
  127. }
  128. /*
  129. Scope returns the rule scope of this monitor.
  130. */
  131. func (mb *monitorBase) Scope() *RuleScope {
  132. return mb.rootMonitor.ruleScope
  133. }
  134. /*
  135. Priority returns the priority of this monitor.
  136. */
  137. func (mb *monitorBase) Priority() int {
  138. return mb.priority
  139. }
  140. /*
  141. IsActivated returns if this monitor has been activated.
  142. */
  143. func (mb *monitorBase) IsActivated() bool {
  144. return mb.activated
  145. }
  146. /*
  147. IsFinished returns if this monitor has finished.
  148. */
  149. func (mb *monitorBase) IsFinished() bool {
  150. return mb.finished
  151. }
  152. /*
  153. Activate activates this monitor.
  154. */
  155. func (mb *monitorBase) Activate(e *Event) {
  156. errorutil.AssertTrue(!mb.finished, "Cannot activate a finished monitor")
  157. errorutil.AssertTrue(!mb.activated, "Cannot activate an active monitor")
  158. errorutil.AssertTrue(e != nil, "Monitor can only be activated with an event")
  159. mb.event = e
  160. mb.rootMonitor.descendantActivated(mb.priority)
  161. mb.activated = true
  162. }
  163. /*
  164. Skip finishes this monitor without activation.
  165. */
  166. func (mb *monitorBase) Skip(e *Event) {
  167. errorutil.AssertTrue(!mb.finished, "Cannot skip a finished monitor")
  168. errorutil.AssertTrue(!mb.activated, "Cannot skip an active monitor")
  169. mb.event = e
  170. mb.activated = true
  171. mb.Finish()
  172. }
  173. /*
  174. Finish finishes this monitor.
  175. */
  176. func (mb *monitorBase) Finish() {
  177. errorutil.AssertTrue(mb.activated, "Cannot finish a not active monitor")
  178. errorutil.AssertTrue(!mb.finished, "Cannot finish a finished monitor")
  179. mb.finished = true
  180. mb.rootMonitor.descendantFinished(mb)
  181. }
  182. /*
  183. Errors returns the error object of this monitor.
  184. */
  185. func (mb *monitorBase) Errors() *TaskError {
  186. errorutil.AssertTrue(mb.finished, "Cannot get errors on an unfinished monitor")
  187. return mb.Err
  188. }
  189. /*
  190. SetErrors adds an error object to this monitor.
  191. */
  192. func (mb *monitorBase) SetErrors(e *TaskError) {
  193. mb.Err = e
  194. mb.rootMonitor.descendantFailed(mb)
  195. }
  196. /*
  197. EventPath returns the chain of events which created this monitor.
  198. */
  199. func (mb *monitorBase) EventPath() []*Event {
  200. errorutil.AssertTrue(mb.finished, "Cannot get event path on an unfinished monitor")
  201. path := []*Event{mb.event}
  202. child := mb.Parent
  203. for child != nil {
  204. path = append(path, child.event)
  205. child = child.Parent
  206. }
  207. // Reverse path
  208. for i, j := 0, len(path)-1; i < j; i, j = i+1, j-1 {
  209. path[i], path[j] = path[j], path[i]
  210. }
  211. return path
  212. }
  213. /*
  214. EventPathString returns the event path as a string.
  215. */
  216. func (mb *monitorBase) EventPathString() string {
  217. var buf bytes.Buffer
  218. ep := mb.EventPath()
  219. last := len(ep) - 1
  220. for i, e := range mb.EventPath() {
  221. buf.WriteString(e.name)
  222. if i < last {
  223. buf.WriteString(" -> ")
  224. }
  225. }
  226. return buf.String()
  227. }
  228. /*
  229. String returns a string representation of this monitor.
  230. */
  231. func (mb *monitorBase) String() string {
  232. return fmt.Sprintf("Monitor %v (parent: %v priority: %v activated: %v finished: %v)",
  233. mb.ID(), mb.Parent, mb.priority, mb.activated, mb.finished)
  234. }
  235. // Root Monitor
  236. // ============
  237. /*
  238. RootMonitor is a monitor which is at a beginning of an event cascade.
  239. */
  240. type RootMonitor struct {
  241. *monitorBase
  242. lock *sync.Mutex // Lock for datastructures
  243. incomplete map[int]int // Priority -> Counters of incomplete trackers
  244. priorities *sortutil.IntHeap // List of handled priorities
  245. ruleScope *RuleScope // Rule scope definitions
  246. unfinished int // Counter of all unfinished trackers
  247. messageQueue *pubsub.EventPump // Message passing queue of the processor
  248. errors map[uint64]*monitorBase // Monitors which got errors
  249. finished func(Processor) // Finish handler (can be used externally)
  250. }
  251. /*
  252. NewRootMonitor creates a new root monitor.
  253. */
  254. func newRootMonitor(context map[string]interface{}, scope *RuleScope,
  255. messageQueue *pubsub.EventPump) *RootMonitor {
  256. ret := &RootMonitor{newMonitorBase(0, nil, context), &sync.Mutex{},
  257. make(map[int]int), &sortutil.IntHeap{}, scope, 1, messageQueue,
  258. make(map[uint64]*monitorBase), nil}
  259. // A root monitor is its own parent
  260. ret.rootMonitor = ret
  261. heap.Init(ret.priorities)
  262. return ret
  263. }
  264. /*
  265. SetFinishHandler adds a handler function to this monitor which is called once
  266. this monitor has finished.
  267. */
  268. func (rm *RootMonitor) SetFinishHandler(fh func(Processor)) {
  269. rm.finished = fh
  270. }
  271. /*
  272. HighestPriority returns the highest priority which is handled by this monitor.
  273. */
  274. func (rm *RootMonitor) HighestPriority() int {
  275. rm.lock.Lock()
  276. defer rm.lock.Unlock()
  277. if len(*rm.priorities) > 0 {
  278. return (*rm.priorities)[0]
  279. }
  280. return -1
  281. }
  282. /*
  283. AllErrors returns all error which have been collected in this root monitor.
  284. */
  285. func (rm *RootMonitor) AllErrors() []*TaskError {
  286. rm.lock.Lock()
  287. defer rm.lock.Unlock()
  288. ret := make([]*TaskError, 0, len(rm.errors))
  289. // Sort by monitor id - this should give the corrent order timewise
  290. var ids []uint64
  291. for id := range rm.errors {
  292. ids = append(ids, id)
  293. }
  294. sortutil.UInt64s(ids)
  295. for _, id := range ids {
  296. m := rm.errors[id]
  297. ret = append(ret, m.Errors())
  298. }
  299. return ret
  300. }
  301. /*
  302. descendantCreated notifies this root monitor that a descendant has been created.
  303. */
  304. func (rm *RootMonitor) descendantCreated(monitor Monitor) {
  305. rm.lock.Lock()
  306. defer rm.lock.Unlock()
  307. rm.unfinished++
  308. }
  309. /*
  310. descendantActivated notifies this root monitor that a descendant has been activated.
  311. */
  312. func (rm *RootMonitor) descendantActivated(priority int) {
  313. rm.lock.Lock()
  314. defer rm.lock.Unlock()
  315. val, ok := rm.incomplete[priority]
  316. if !ok {
  317. val = 0
  318. heap.Push(rm.priorities, priority)
  319. }
  320. rm.incomplete[priority] = val + 1
  321. }
  322. /*
  323. descendantFailed notifies this root monitor that a descendant has failed.
  324. */
  325. func (rm *RootMonitor) descendantFailed(monitor *monitorBase) {
  326. rm.lock.Lock()
  327. defer rm.lock.Unlock()
  328. rm.errors[monitor.ID()] = monitor
  329. }
  330. /*
  331. descendantFinished records that this monitor has finished. If it is the last
  332. active monitor in the event tree then send a notification.
  333. */
  334. func (rm *RootMonitor) descendantFinished(m Monitor) {
  335. rm.lock.Lock()
  336. rm.unfinished--
  337. finished := rm.unfinished == 0
  338. if m.IsActivated() {
  339. priority := m.Priority()
  340. rm.incomplete[priority]--
  341. if rm.incomplete[priority] == 0 {
  342. rm.priorities.RemoveFirst(priority)
  343. delete(rm.incomplete, priority)
  344. }
  345. }
  346. rm.lock.Unlock()
  347. // Post notification
  348. if finished {
  349. rm.messageQueue.PostEvent(MessageRootMonitorFinished, rm)
  350. }
  351. }
  352. // Child Monitor
  353. // =============
  354. /*
  355. ChildMonitor is a monitor which is a descendant of a root monitor.
  356. */
  357. type ChildMonitor struct {
  358. *monitorBase
  359. }
  360. // Unique id creation
  361. // ==================
  362. var midcounter uint64 = 1
  363. var midcounterLock = &sync.Mutex{}
  364. /*
  365. newId returns a new unique id.
  366. */
  367. func newMonID() uint64 {
  368. midcounterLock.Lock()
  369. defer midcounterLock.Unlock()
  370. ret := midcounter
  371. midcounter++
  372. return ret
  373. }