processor.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. /*
  2. * ECAL
  3. *
  4. * Copyright 2020 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. package engine
  11. import (
  12. "fmt"
  13. "sync"
  14. "devt.de/krotik/ecal/engine/pool"
  15. "devt.de/krotik/ecal/engine/pubsub"
  16. )
  17. /*
  18. Processor is the main object of the event engine. It coordinates the thread pool
  19. and rule index. Rules can only be added if the processor is stopped. Events
  20. can only be added if the processor is not stopped.
  21. */
  22. type Processor interface {
  23. /*
  24. ID returns the processor ID.
  25. */
  26. ID() uint64
  27. /*
  28. Workers returns the number of threads of this processor.
  29. */
  30. Workers() int
  31. /*
  32. Reset removes all stored rules from this processor.
  33. */
  34. Reset() error
  35. /*
  36. AddRule adds a new rule to the processor.
  37. */
  38. AddRule(rule *Rule) error
  39. /*
  40. Rules returns all loaded rules.
  41. */
  42. Rules() map[string]*Rule
  43. /*
  44. Start starts this processor.
  45. */
  46. Start()
  47. /*
  48. Finish will finish all remaining tasks and then stop the processor.
  49. */
  50. Finish()
  51. /*
  52. Stopped returns if the processor is stopped.
  53. */
  54. Stopped() bool
  55. /*
  56. Status returns the status of the processor (Running / Stopping / Stopped).
  57. */
  58. Status() string
  59. /*
  60. NewRootMonitor creates a new root monitor for this processor. This monitor is used to add initial
  61. root events.
  62. */
  63. NewRootMonitor(context map[string]interface{}, scope *RuleScope) *RootMonitor
  64. /*
  65. SetRootMonitorErrorObserver specifies an observer which is triggered
  66. when a root monitor of this processor has finished and returns errors.
  67. By default this is set to nil (no observer).
  68. */
  69. SetRootMonitorErrorObserver(func(rm *RootMonitor))
  70. /*
  71. SetFailOnFirstErrorInTriggerSequence sets the behavior when rules return errors.
  72. If set to false (default) then all rules in a trigger sequence for a specific event
  73. are executed. If set to true then the first rule which returns an error will stop
  74. the trigger sequence. Events which have been added by the failing rule are still processed.
  75. */
  76. SetFailOnFirstErrorInTriggerSequence(bool)
  77. /*
  78. AddEventAndWait adds a new event to the processor and waits for the resulting event cascade
  79. to finish. If a monitor is passed then it must be a RootMonitor.
  80. */
  81. AddEventAndWait(event *Event, monitor *RootMonitor) (Monitor, error)
  82. /*
  83. AddEvent adds a new event to the processor. Returns the monitor if the event
  84. triggered a rule and nil if the event was skipped.
  85. */
  86. AddEvent(event *Event, parentMonitor Monitor) (Monitor, error)
  87. /*
  88. IsTriggering checks if a given event triggers a loaded rule. This does not the
  89. actual state matching for speed.
  90. */
  91. IsTriggering(event *Event) bool
  92. /*
  93. ProcessEvent processes an event by determining which rules trigger and match
  94. the given event.
  95. */
  96. ProcessEvent(event *Event, parent Monitor) map[string]error
  97. /*
  98. String returns a string representation the processor.
  99. */
  100. String() string
  101. }
  102. /*
  103. eventProcessor main implementation of the Processor interface.
  104. Event cycle:
  105. Process -> Triggering -> Matching -> Fire Rule
  106. */
  107. type eventProcessor struct {
  108. id uint64 // Processor ID
  109. pool *pool.ThreadPool // Thread pool of this processor
  110. workerCount int // Number of threads for this processor
  111. failOnFirstError bool // Stop rule execution on first error in an event trigger sequence
  112. ruleIndex RuleIndex // Container for loaded rules
  113. triggeringCache map[string]bool // Cache which remembers which events are triggering
  114. triggeringCacheLock sync.Mutex // Lock for triggeringg cache
  115. messageQueue *pubsub.EventPump // Queue for message passing between components
  116. rmErrorObserver func(rm *RootMonitor) // Error observer for root monitors
  117. }
  118. /*
  119. NewProcessor creates a new event processor with a given number of workers.
  120. */
  121. func NewProcessor(workerCount int) Processor {
  122. ep := pubsub.NewEventPump()
  123. return &eventProcessor{newProcID(), pool.NewThreadPoolWithQueue(NewTaskQueue(ep)),
  124. workerCount, false, NewRuleIndex(), nil, sync.Mutex{}, ep, nil}
  125. }
  126. /*
  127. ID returns the processor ID.
  128. */
  129. func (p *eventProcessor) ID() uint64 {
  130. return p.id
  131. }
  132. /*
  133. Workers returns the number of threads of this processor.
  134. */
  135. func (p *eventProcessor) Workers() int {
  136. return p.workerCount
  137. }
  138. /*
  139. Reset removes all stored rules from this processor.
  140. */
  141. func (p *eventProcessor) Reset() error {
  142. // Check that the thread pool is stopped
  143. if p.pool.Status() != pool.StatusStopped {
  144. return fmt.Errorf("Cannot reset processor if it has not stopped")
  145. }
  146. // Invalidate triggering cache
  147. p.triggeringCacheLock.Lock()
  148. p.triggeringCache = nil
  149. p.triggeringCacheLock.Unlock()
  150. // Create a new rule index
  151. p.ruleIndex = NewRuleIndex()
  152. return nil
  153. }
  154. /*
  155. AddRule adds a new rule to the processor.
  156. */
  157. func (p *eventProcessor) AddRule(rule *Rule) error {
  158. // Check that the thread pool is stopped
  159. if p.pool.Status() != pool.StatusStopped {
  160. return fmt.Errorf("Cannot add rule if the processor has not stopped")
  161. }
  162. // Invalidate triggering cache
  163. p.triggeringCacheLock.Lock()
  164. p.triggeringCache = nil
  165. p.triggeringCacheLock.Unlock()
  166. return p.ruleIndex.AddRule(rule)
  167. }
  168. /*
  169. Rules returns all loaded rules.
  170. */
  171. func (p *eventProcessor) Rules() map[string]*Rule {
  172. return p.ruleIndex.Rules()
  173. }
  174. /*
  175. Start starts this processor.
  176. */
  177. func (p *eventProcessor) Start() {
  178. p.pool.SetWorkerCount(p.workerCount, false)
  179. }
  180. /*
  181. Finish will finish all remaining tasks and then stop the processor.
  182. */
  183. func (p *eventProcessor) Finish() {
  184. p.pool.JoinAll()
  185. }
  186. /*
  187. Stopped returns if the processor is stopped.
  188. */
  189. func (p *eventProcessor) Stopped() bool {
  190. return p.pool.Status() == pool.StatusStopped
  191. }
  192. /*
  193. Status returns the status of the processor (Running / Stopping / Stopped).
  194. */
  195. func (p *eventProcessor) Status() string {
  196. return p.pool.Status()
  197. }
  198. /*
  199. NewRootMonitor creates a new root monitor for this processor. This monitor is used to add initial
  200. root events.
  201. */
  202. func (p *eventProcessor) NewRootMonitor(context map[string]interface{}, scope *RuleScope) *RootMonitor {
  203. if scope == nil {
  204. scope = NewRuleScope(map[string]bool{
  205. "": true, // Default root monitor has global scope
  206. })
  207. }
  208. return newRootMonitor(context, scope, p.messageQueue)
  209. }
  210. /*
  211. SetRootMonitorErrorObserver specifies an observer which is triggered
  212. when a root monitor of this processor has finished and returns errors.
  213. By default this is set to nil (no observer).
  214. */
  215. func (p *eventProcessor) SetRootMonitorErrorObserver(rmErrorObserver func(rm *RootMonitor)) {
  216. p.rmErrorObserver = rmErrorObserver
  217. }
  218. /*
  219. SetFailOnFirstErrorInTriggerSequence sets the behavior when rules return errors.
  220. If set to false (default) then all rules in a trigger sequence for a specific event
  221. are executed. If set to true then the first rule which returns an error will stop
  222. the trigger sequence. Events which have been added by the failing rule are still processed.
  223. */
  224. func (p *eventProcessor) SetFailOnFirstErrorInTriggerSequence(v bool) {
  225. p.failOnFirstError = v
  226. }
  227. /*
  228. Notify the root monitor error observer that an error occurred.
  229. */
  230. func (p *eventProcessor) notifyRootMonitorErrors(rm *RootMonitor) {
  231. if p.rmErrorObserver != nil {
  232. p.rmErrorObserver(rm)
  233. }
  234. }
  235. /*
  236. AddEventAndWait adds a new event to the processor and waits for the resulting event cascade
  237. to finish. If a monitor is passed then it must be a RootMonitor.
  238. */
  239. func (p *eventProcessor) AddEventAndWait(event *Event, monitor *RootMonitor) (Monitor, error) {
  240. var wg sync.WaitGroup
  241. wg.Add(1)
  242. if monitor == nil {
  243. monitor = p.NewRootMonitor(nil, nil)
  244. }
  245. p.messageQueue.AddObserver(MessageRootMonitorFinished, monitor,
  246. func(event string, eventSource interface{}) {
  247. // Everything has finished
  248. wg.Done()
  249. p.messageQueue.RemoveObservers(event, eventSource)
  250. })
  251. resMonitor, err := p.AddEvent(event, monitor)
  252. if resMonitor == nil {
  253. // Event was not added
  254. p.messageQueue.RemoveObservers(MessageRootMonitorFinished, monitor)
  255. } else {
  256. // Event was added now wait for it to finish
  257. wg.Wait()
  258. }
  259. return resMonitor, err
  260. }
  261. /*
  262. AddEvent adds a new event to the processor. Returns the monitor if the event
  263. triggered a rule and nil if the event was skipped.
  264. */
  265. func (p *eventProcessor) AddEvent(event *Event, eventMonitor Monitor) (Monitor, error) {
  266. // Check that the thread pool is running
  267. if p.pool.Status() == pool.StatusStopped {
  268. return nil, fmt.Errorf("Cannot add event if the processor is not running")
  269. }
  270. EventTracer.record(event, "eventProcessor.AddEvent", "Event added to the processor")
  271. // First check if the event is triggering any rules at all
  272. if !p.IsTriggering(event) {
  273. EventTracer.record(event, "eventProcessor.AddEvent", "Event was skipped")
  274. if eventMonitor != nil {
  275. eventMonitor.Skip(event)
  276. }
  277. return nil, nil
  278. }
  279. // Check if we need to construct a new root monitor
  280. if eventMonitor == nil {
  281. eventMonitor = p.NewRootMonitor(nil, nil)
  282. }
  283. if rootMonitor, ok := eventMonitor.(*RootMonitor); ok {
  284. p.messageQueue.AddObserver(MessageRootMonitorFinished, rootMonitor,
  285. func(event string, eventSource interface{}) {
  286. // Call finish handler if there is one
  287. if rm := eventSource.(*RootMonitor); rm.finished != nil {
  288. rm.finished(p)
  289. }
  290. p.messageQueue.RemoveObservers(event, eventSource)
  291. })
  292. }
  293. eventMonitor.Activate(event)
  294. EventTracer.record(event, "eventProcessor.AddEvent", "Adding task to thread pool")
  295. // Kick off event processing (see Processor.ProcessEvent)
  296. p.pool.AddTask(&Task{p, eventMonitor, event})
  297. return eventMonitor, nil
  298. }
  299. /*
  300. IsTriggering checks if a given event triggers a loaded rule. This does not the
  301. actual state matching for speed.
  302. */
  303. func (p *eventProcessor) IsTriggering(event *Event) bool {
  304. var res, ok bool
  305. p.triggeringCacheLock.Lock()
  306. defer p.triggeringCacheLock.Unlock()
  307. // Ensure the triggering cache exists
  308. if p.triggeringCache == nil {
  309. p.triggeringCache = make(map[string]bool)
  310. }
  311. name := event.Name()
  312. if res, ok = p.triggeringCache[name]; !ok {
  313. res = p.ruleIndex.IsTriggering(event)
  314. p.triggeringCache[name] = res
  315. }
  316. return res
  317. }
  318. /*
  319. ProcessEvent processes an event by determining which rules trigger and match
  320. the given event.
  321. */
  322. func (p *eventProcessor) ProcessEvent(event *Event, parent Monitor) map[string]error {
  323. var rulesTriggering []*Rule
  324. var rulesExecuting []*Rule
  325. scope := parent.Scope()
  326. ruleCandidates := p.ruleIndex.Match(event)
  327. suppressedRules := make(map[string]bool)
  328. EventTracer.record(event, "eventProcessor.ProcessEvent", "Processing event")
  329. // Remove candidates which are out of scope
  330. for _, ruleCandidate := range ruleCandidates {
  331. if scope.IsAllowedAll(ruleCandidate.ScopeMatch) {
  332. rulesTriggering = append(rulesTriggering, ruleCandidate)
  333. // Build up a suppression list
  334. for _, suppressedRule := range ruleCandidate.SuppressionList {
  335. suppressedRules[suppressedRule] = true
  336. }
  337. }
  338. }
  339. // Remove suppressed rules
  340. for _, ruleTriggers := range rulesTriggering {
  341. if _, ok := suppressedRules[ruleTriggers.Name]; ok {
  342. continue
  343. }
  344. rulesExecuting = append(rulesExecuting, ruleTriggers)
  345. }
  346. // Sort rules according to their priority (0 is the highest)
  347. SortRuleSlice(rulesExecuting)
  348. // Run rules which are not suppressed
  349. errors := make(map[string]error)
  350. EventTracer.record(event, "eventProcessor.ProcessEvent", "Running rules: ", rulesExecuting)
  351. for _, rule := range rulesExecuting {
  352. if err := rule.Action(p, parent, event); err != nil {
  353. errors[rule.Name] = err
  354. }
  355. if p.failOnFirstError && len(errors) > 0 {
  356. break
  357. }
  358. }
  359. return errors
  360. }
  361. /*
  362. String returns a string representation the processor.
  363. */
  364. func (p *eventProcessor) String() string {
  365. return fmt.Sprintf("RumbleProcessor %v (workers:%v)", p.ID(), p.workerCount)
  366. }
  367. // Unique id creation
  368. // ==================
  369. var pidcounter uint64 = 1
  370. var pidcounterLock = &sync.Mutex{}
  371. /*
  372. newProcId returns a new unique id or processors.
  373. */
  374. func newProcID() uint64 {
  375. pidcounterLock.Lock()
  376. defer pidcounterLock.Unlock()
  377. ret := pidcounter
  378. pidcounter++
  379. return ret
  380. }