monitor.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. /*
  2. * ECAL
  3. *
  4. * Copyright 2020 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. package engine
  11. import (
  12. "bytes"
  13. "container/heap"
  14. "fmt"
  15. "sync"
  16. "devt.de/krotik/common/errorutil"
  17. "devt.de/krotik/common/flowutil"
  18. "devt.de/krotik/common/sortutil"
  19. )
  20. /*
  21. Monitor monitors events as they are cascading. Event cascades will produce tree
  22. structures.
  23. */
  24. type Monitor interface {
  25. /*
  26. ID returns the monitor ID.
  27. */
  28. ID() uint64
  29. /*
  30. NewChildMonitor creates a new child monitor of this monitor.
  31. */
  32. NewChildMonitor(priority int) Monitor
  33. /*
  34. Scope returns the rule scope of this monitor.
  35. */
  36. Scope() *RuleScope
  37. /*
  38. Priority returns the monitors priority.
  39. */
  40. Priority() int
  41. /*
  42. Activated returns if this monitor has been activated.
  43. */
  44. IsActivated() bool
  45. /*
  46. Activate activates this monitor with a given event.
  47. */
  48. Activate(e *Event)
  49. /*
  50. Skip finishes this monitor without activation.
  51. */
  52. Skip(e *Event)
  53. /*
  54. Finish finishes this monitor.
  55. */
  56. Finish()
  57. /*
  58. Returns the root monitor of this monitor.
  59. */
  60. RootMonitor() *RootMonitor
  61. /*
  62. Errors returns the error object of this monitor.
  63. */
  64. Errors() *TaskError
  65. /*
  66. SetErrors adds an error object to this monitor.
  67. */
  68. SetErrors(e *TaskError)
  69. /*
  70. EventPath returns the chain of events which created this monitor.
  71. */
  72. EventPath() []*Event
  73. /*
  74. EventPathString returns the event path as a string.
  75. */
  76. EventPathString() string
  77. /*
  78. String returns a string representation of this monitor.
  79. */
  80. String() string
  81. }
  82. /*
  83. monitorBase provides the basic functions and fields for any monitor.
  84. */
  85. type monitorBase struct {
  86. id uint64 // Monitor ID
  87. Parent *monitorBase // Parent monitor
  88. Context map[string]interface{} // Context object of the monitor
  89. Err *TaskError // Errors which occurred during event processing
  90. priority int // Priority of the monitor
  91. rootMonitor *RootMonitor // Root monitor
  92. event *Event // Event which activated this monitor
  93. activated bool // Flag indicating if the monitor was activated
  94. finished bool // Flag indicating if the monitor has finished
  95. }
  96. /*
  97. NewMonitor creates a new monitor.
  98. */
  99. func newMonitorBase(priority int, parent *monitorBase, context map[string]interface{}) *monitorBase {
  100. var ret *monitorBase
  101. if parent != nil {
  102. ret = &monitorBase{newMonID(), parent, context, nil, priority, parent.rootMonitor, nil, false, false}
  103. } else {
  104. ret = &monitorBase{newMonID(), nil, context, nil, priority, nil, nil, false, false}
  105. }
  106. return ret
  107. }
  108. /*
  109. NewChildMonitor creates a new child monitor of this monitor.
  110. */
  111. func (mb *monitorBase) NewChildMonitor(priority int) Monitor {
  112. child := &ChildMonitor{newMonitorBase(priority, mb, mb.Context)}
  113. mb.rootMonitor.descendantCreated(child)
  114. return child
  115. }
  116. /*
  117. ID returns the monitor ID.
  118. */
  119. func (mb *monitorBase) ID() uint64 {
  120. return mb.id
  121. }
  122. /*
  123. RootMonitor returns the root monitor of this monitor.
  124. */
  125. func (mb *monitorBase) RootMonitor() *RootMonitor {
  126. return mb.rootMonitor
  127. }
  128. /*
  129. Scope returns the rule scope of this monitor.
  130. */
  131. func (mb *monitorBase) Scope() *RuleScope {
  132. return mb.rootMonitor.ruleScope
  133. }
  134. /*
  135. Priority returns the priority of this monitor.
  136. */
  137. func (mb *monitorBase) Priority() int {
  138. return mb.priority
  139. }
  140. /*
  141. IsActivated returns if this monitor has been activated.
  142. */
  143. func (mb *monitorBase) IsActivated() bool {
  144. return mb.activated
  145. }
  146. /*
  147. IsFinished returns if this monitor has finished.
  148. */
  149. func (mb *monitorBase) IsFinished() bool {
  150. return mb.finished
  151. }
  152. /*
  153. Activate activates this monitor.
  154. */
  155. func (mb *monitorBase) Activate(e *Event) {
  156. errorutil.AssertTrue(!mb.finished, "Cannot activate a finished monitor")
  157. errorutil.AssertTrue(!mb.activated, "Cannot activate an active monitor")
  158. errorutil.AssertTrue(e != nil, "Monitor can only be activated with an event")
  159. mb.event = e
  160. mb.rootMonitor.descendantActivated(mb.priority)
  161. mb.activated = true
  162. }
  163. /*
  164. Skip finishes this monitor without activation.
  165. */
  166. func (mb *monitorBase) Skip(e *Event) {
  167. errorutil.AssertTrue(!mb.finished, "Cannot skip a finished monitor")
  168. errorutil.AssertTrue(!mb.activated, "Cannot skip an active monitor")
  169. mb.event = e
  170. mb.activated = true
  171. mb.finished = true
  172. }
  173. /*
  174. Finish finishes this monitor.
  175. */
  176. func (mb *monitorBase) Finish() {
  177. errorutil.AssertTrue(mb.activated, "Cannot finish a not active monitor")
  178. errorutil.AssertTrue(!mb.finished, "Cannot finish a finished monitor")
  179. mb.finished = true
  180. mb.rootMonitor.descendantFinished(mb)
  181. }
  182. /*
  183. Errors returns the error object of this monitor.
  184. */
  185. func (mb *monitorBase) Errors() *TaskError {
  186. errorutil.AssertTrue(mb.finished, "Cannot get errors on an unfinished monitor")
  187. return mb.Err
  188. }
  189. /*
  190. SetErrors adds an error object to this monitor.
  191. */
  192. func (mb *monitorBase) SetErrors(e *TaskError) {
  193. errorutil.AssertTrue(mb.finished, "Cannot set errors on an unfinished monitor")
  194. mb.Err = e
  195. mb.rootMonitor.descendantFailed(mb)
  196. }
  197. /*
  198. EventPath returns the chain of events which created this monitor.
  199. */
  200. func (mb *monitorBase) EventPath() []*Event {
  201. errorutil.AssertTrue(mb.finished, "Cannot get event path on an unfinished monitor")
  202. path := []*Event{mb.event}
  203. child := mb.Parent
  204. for child != nil {
  205. path = append(path, child.event)
  206. child = child.Parent
  207. }
  208. // Reverse path
  209. for i, j := 0, len(path)-1; i < j; i, j = i+1, j-1 {
  210. path[i], path[j] = path[j], path[i]
  211. }
  212. return path
  213. }
  214. /*
  215. EventPathString returns the event path as a string.
  216. */
  217. func (mb *monitorBase) EventPathString() string {
  218. var buf bytes.Buffer
  219. ep := mb.EventPath()
  220. last := len(ep) - 1
  221. for i, e := range mb.EventPath() {
  222. buf.WriteString(e.name)
  223. if i < last {
  224. buf.WriteString(" -> ")
  225. }
  226. }
  227. return buf.String()
  228. }
  229. /*
  230. String returns a string representation of this monitor.
  231. */
  232. func (mb *monitorBase) String() string {
  233. return fmt.Sprintf("Monitor %v (parent: %v priority: %v activated: %v finished: %v)",
  234. mb.ID(), mb.Parent, mb.priority, mb.activated, mb.finished)
  235. }
  236. // Root Monitor
  237. // ============
  238. /*
  239. RootMonitor is a monitor which is at a beginning of an event cascade.
  240. */
  241. type RootMonitor struct {
  242. *monitorBase
  243. lock *sync.Mutex // Lock for datastructures
  244. incomplete map[int]int // Priority -> Counters of incomplete trackers
  245. priorities *sortutil.IntHeap // List of handled priorities
  246. ruleScope *RuleScope // Rule scope definitions
  247. unfinished int // Counter of all unfinished trackers
  248. messageQueue *flowutil.EventPump // Message passing queue of the processor
  249. errors map[uint64]*monitorBase // Monitors which got errors
  250. finished func(Processor) // Finish handler (can be used externally)
  251. }
  252. /*
  253. NewRootMonitor creates a new root monitor.
  254. */
  255. func newRootMonitor(context map[string]interface{}, scope *RuleScope,
  256. messageQueue *flowutil.EventPump) *RootMonitor {
  257. ret := &RootMonitor{newMonitorBase(0, nil, context), &sync.Mutex{},
  258. make(map[int]int), &sortutil.IntHeap{}, scope, 1, messageQueue,
  259. make(map[uint64]*monitorBase), nil}
  260. // A root monitor is its own parent
  261. ret.rootMonitor = ret
  262. heap.Init(ret.priorities)
  263. return ret
  264. }
  265. /*
  266. SetFinishHandler adds a handler function to this monitor which is called once
  267. this monitor has finished.
  268. */
  269. func (rm *RootMonitor) SetFinishHandler(fh func(Processor)) {
  270. rm.finished = fh
  271. }
  272. /*
  273. HighestPriority returns the highest priority which is handled by this monitor.
  274. */
  275. func (rm *RootMonitor) HighestPriority() int {
  276. rm.lock.Lock()
  277. defer rm.lock.Unlock()
  278. if len(*rm.priorities) > 0 {
  279. return (*rm.priorities)[0]
  280. }
  281. return -1
  282. }
  283. /*
  284. AllErrors returns all error which have been collected in this root monitor.
  285. */
  286. func (rm *RootMonitor) AllErrors() []*TaskError {
  287. rm.lock.Lock()
  288. defer rm.lock.Unlock()
  289. ret := make([]*TaskError, 0, len(rm.errors))
  290. // Sort by monitor id - this should give the corrent order timewise
  291. var ids []uint64
  292. for id := range rm.errors {
  293. ids = append(ids, id)
  294. }
  295. sortutil.UInt64s(ids)
  296. for _, id := range ids {
  297. m := rm.errors[id]
  298. ret = append(ret, m.Errors())
  299. }
  300. return ret
  301. }
  302. /*
  303. descendantCreated notifies this root monitor that a descendant has been created.
  304. */
  305. func (rm *RootMonitor) descendantCreated(monitor Monitor) {
  306. rm.lock.Lock()
  307. defer rm.lock.Unlock()
  308. rm.unfinished++
  309. }
  310. /*
  311. descendantActivated notifies this root monitor that a descendant has been activated.
  312. */
  313. func (rm *RootMonitor) descendantActivated(priority int) {
  314. rm.lock.Lock()
  315. defer rm.lock.Unlock()
  316. val, ok := rm.incomplete[priority]
  317. if !ok {
  318. val = 0
  319. heap.Push(rm.priorities, priority)
  320. }
  321. rm.incomplete[priority] = val + 1
  322. }
  323. /*
  324. descendantFailed notifies this root monitor that a descendant has failed.
  325. */
  326. func (rm *RootMonitor) descendantFailed(monitor *monitorBase) {
  327. rm.lock.Lock()
  328. defer rm.lock.Unlock()
  329. rm.errors[monitor.ID()] = monitor
  330. }
  331. /*
  332. descendantFinished records that this monitor has finished. If it is the last
  333. active monitor in the event tree then send a notification.
  334. */
  335. func (rm *RootMonitor) descendantFinished(m Monitor) {
  336. rm.lock.Lock()
  337. rm.unfinished--
  338. finished := rm.unfinished == 0
  339. if m.IsActivated() {
  340. priority := m.Priority()
  341. rm.incomplete[priority]--
  342. if rm.incomplete[priority] == 0 {
  343. rm.priorities.RemoveFirst(priority)
  344. delete(rm.incomplete, priority)
  345. }
  346. }
  347. rm.lock.Unlock()
  348. // Post notification
  349. if finished {
  350. rm.messageQueue.PostEvent(MessageRootMonitorFinished, rm)
  351. }
  352. }
  353. // Child Monitor
  354. // =============
  355. /*
  356. ChildMonitor is a monitor which is a descendant of a root monitor.
  357. */
  358. type ChildMonitor struct {
  359. *monitorBase
  360. }
  361. // Unique id creation
  362. // ==================
  363. var midcounter uint64 = 1
  364. var midcounterLock = &sync.Mutex{}
  365. /*
  366. newId returns a new unique id.
  367. */
  368. func newMonID() uint64 {
  369. midcounterLock.Lock()
  370. defer midcounterLock.Unlock()
  371. ret := midcounter
  372. midcounter++
  373. return ret
  374. }