processor.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510
  1. /*
  2. * ECAL
  3. *
  4. * Copyright 2020 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. package engine
  11. import (
  12. "fmt"
  13. "sync"
  14. "devt.de/krotik/ecal/engine/pool"
  15. "devt.de/krotik/ecal/engine/pubsub"
  16. )
  17. /*
  18. Processor is the main object of the event engine. It coordinates the thread pool
  19. and rule index. Rules can only be added if the processor is stopped. Events
  20. can only be added if the processor is not stopped.
  21. */
  22. type Processor interface {
  23. /*
  24. ID returns the processor ID.
  25. */
  26. ID() uint64
  27. /*
  28. ThreadPool returns the thread pool which this processor is using.
  29. */
  30. ThreadPool() *pool.ThreadPool
  31. /*
  32. Workers returns the number of threads of this processor.
  33. */
  34. Workers() int
  35. /*
  36. Reset removes all stored rules from this processor.
  37. */
  38. Reset() error
  39. /*
  40. AddRule adds a new rule to the processor.
  41. */
  42. AddRule(rule *Rule) error
  43. /*
  44. Rules returns all loaded rules.
  45. */
  46. Rules() map[string]*Rule
  47. /*
  48. Start starts this processor.
  49. */
  50. Start()
  51. /*
  52. Finish will finish all remaining tasks and then stop the processor.
  53. */
  54. Finish()
  55. /*
  56. Stopped returns if the processor is stopped.
  57. */
  58. Stopped() bool
  59. /*
  60. Status returns the status of the processor (Running / Stopping / Stopped).
  61. */
  62. Status() string
  63. /*
  64. NewRootMonitor creates a new root monitor for this processor. This monitor is used to add initial
  65. root events.
  66. */
  67. NewRootMonitor(context map[string]interface{}, scope *RuleScope) *RootMonitor
  68. /*
  69. SetRootMonitorErrorObserver specifies an observer which is triggered
  70. when a root monitor of this processor has finished and returns errors.
  71. By default this is set to nil (no observer).
  72. */
  73. SetRootMonitorErrorObserver(func(rm *RootMonitor))
  74. /*
  75. SetFailOnFirstErrorInTriggerSequence sets the behavior when rules return errors.
  76. If set to false (default) then all rules in a trigger sequence for a specific event
  77. are executed. If set to true then the first rule which returns an error will stop
  78. the trigger sequence. Events which have been added by the failing rule are still processed.
  79. */
  80. SetFailOnFirstErrorInTriggerSequence(bool)
  81. /*
  82. AddEventAndWait adds a new event to the processor and waits for the resulting event cascade
  83. to finish. If a monitor is passed then it must be a RootMonitor.
  84. */
  85. AddEventAndWait(event *Event, monitor *RootMonitor) (Monitor, error)
  86. /*
  87. AddEvent adds a new event to the processor. Returns the monitor if the event
  88. triggered a rule and nil if the event was skipped.
  89. */
  90. AddEvent(event *Event, parentMonitor Monitor) (Monitor, error)
  91. /*
  92. IsTriggering checks if a given event triggers a loaded rule. This does not the
  93. actual state matching for speed.
  94. */
  95. IsTriggering(event *Event) bool
  96. /*
  97. ProcessEvent processes an event by determining which rules trigger and match
  98. the given event. This function must receive a unique thread ID from the
  99. executing thread.
  100. */
  101. ProcessEvent(tid uint64, event *Event, parent Monitor) map[string]error
  102. /*
  103. String returns a string representation the processor.
  104. */
  105. String() string
  106. }
  107. /*
  108. eventProcessor main implementation of the Processor interface.
  109. Event cycle:
  110. Process -> Triggering -> Matching -> Fire Rule
  111. */
  112. type eventProcessor struct {
  113. id uint64 // Processor ID
  114. pool *pool.ThreadPool // Thread pool of this processor
  115. workerCount int // Number of threads for this processor
  116. failOnFirstError bool // Stop rule execution on first error in an event trigger sequence
  117. ruleIndex RuleIndex // Container for loaded rules
  118. triggeringCache map[string]bool // Cache which remembers which events are triggering
  119. triggeringCacheLock sync.Mutex // Lock for triggeringg cache
  120. messageQueue *pubsub.EventPump // Queue for message passing between components
  121. rmErrorObserver func(rm *RootMonitor) // Error observer for root monitors
  122. }
  123. /*
  124. NewProcessor creates a new event processor with a given number of workers.
  125. */
  126. func NewProcessor(workerCount int) Processor {
  127. ep := pubsub.NewEventPump()
  128. return &eventProcessor{newProcID(), pool.NewThreadPoolWithQueue(NewTaskQueue(ep)),
  129. workerCount, false, NewRuleIndex(), nil, sync.Mutex{}, ep, nil}
  130. }
  131. /*
  132. ID returns the processor ID.
  133. */
  134. func (p *eventProcessor) ID() uint64 {
  135. return p.id
  136. }
  137. /*
  138. ThreadPool returns the thread pool which this processor is using.
  139. */
  140. func (p *eventProcessor) ThreadPool() *pool.ThreadPool {
  141. return p.pool
  142. }
  143. /*
  144. Workers returns the number of threads of this processor.
  145. */
  146. func (p *eventProcessor) Workers() int {
  147. return p.workerCount
  148. }
  149. /*
  150. Reset removes all stored rules from this processor.
  151. */
  152. func (p *eventProcessor) Reset() error {
  153. // Check that the thread pool is stopped
  154. if p.pool.Status() != pool.StatusStopped {
  155. return fmt.Errorf("Cannot reset processor if it has not stopped")
  156. }
  157. // Invalidate triggering cache
  158. p.triggeringCacheLock.Lock()
  159. p.triggeringCache = nil
  160. p.triggeringCacheLock.Unlock()
  161. // Create a new rule index
  162. p.ruleIndex = NewRuleIndex()
  163. return nil
  164. }
  165. /*
  166. AddRule adds a new rule to the processor.
  167. */
  168. func (p *eventProcessor) AddRule(rule *Rule) error {
  169. // Check that the thread pool is stopped
  170. if p.pool.Status() != pool.StatusStopped {
  171. return fmt.Errorf("Cannot add rule if the processor has not stopped")
  172. }
  173. // Invalidate triggering cache
  174. p.triggeringCacheLock.Lock()
  175. p.triggeringCache = nil
  176. p.triggeringCacheLock.Unlock()
  177. return p.ruleIndex.AddRule(rule)
  178. }
  179. /*
  180. Rules returns all loaded rules.
  181. */
  182. func (p *eventProcessor) Rules() map[string]*Rule {
  183. return p.ruleIndex.Rules()
  184. }
  185. /*
  186. Start starts this processor.
  187. */
  188. func (p *eventProcessor) Start() {
  189. p.pool.SetWorkerCount(p.workerCount, false)
  190. }
  191. /*
  192. Finish will finish all remaining tasks and then stop the processor.
  193. */
  194. func (p *eventProcessor) Finish() {
  195. p.pool.JoinAll()
  196. }
  197. /*
  198. Stopped returns if the processor is stopped.
  199. */
  200. func (p *eventProcessor) Stopped() bool {
  201. return p.pool.Status() == pool.StatusStopped
  202. }
  203. /*
  204. Status returns the status of the processor (Running / Stopping / Stopped).
  205. */
  206. func (p *eventProcessor) Status() string {
  207. return p.pool.Status()
  208. }
  209. /*
  210. NewRootMonitor creates a new root monitor for this processor. This monitor is used to add initial
  211. root events.
  212. */
  213. func (p *eventProcessor) NewRootMonitor(context map[string]interface{}, scope *RuleScope) *RootMonitor {
  214. if scope == nil {
  215. scope = NewRuleScope(map[string]bool{
  216. "": true, // Default root monitor has global scope
  217. })
  218. }
  219. return newRootMonitor(context, scope, p.messageQueue)
  220. }
  221. /*
  222. SetRootMonitorErrorObserver specifies an observer which is triggered
  223. when a root monitor of this processor has finished and returns errors.
  224. By default this is set to nil (no observer).
  225. */
  226. func (p *eventProcessor) SetRootMonitorErrorObserver(rmErrorObserver func(rm *RootMonitor)) {
  227. p.rmErrorObserver = rmErrorObserver
  228. }
  229. /*
  230. SetFailOnFirstErrorInTriggerSequence sets the behavior when rules return errors.
  231. If set to false (default) then all rules in a trigger sequence for a specific event
  232. are executed. If set to true then the first rule which returns an error will stop
  233. the trigger sequence. Events which have been added by the failing rule are still processed.
  234. */
  235. func (p *eventProcessor) SetFailOnFirstErrorInTriggerSequence(v bool) {
  236. p.failOnFirstError = v
  237. }
  238. /*
  239. Notify the root monitor error observer that an error occurred.
  240. */
  241. func (p *eventProcessor) notifyRootMonitorErrors(rm *RootMonitor) {
  242. if p.rmErrorObserver != nil {
  243. p.rmErrorObserver(rm)
  244. }
  245. }
  246. /*
  247. AddEventAndWait adds a new event to the processor and waits for the resulting event cascade
  248. to finish. If a monitor is passed then it must be a RootMonitor.
  249. */
  250. func (p *eventProcessor) AddEventAndWait(event *Event, monitor *RootMonitor) (Monitor, error) {
  251. var wg sync.WaitGroup
  252. wg.Add(1)
  253. if monitor == nil {
  254. monitor = p.NewRootMonitor(nil, nil)
  255. }
  256. p.messageQueue.AddObserver(MessageRootMonitorFinished, monitor,
  257. func(event string, eventSource interface{}) {
  258. // Everything has finished
  259. wg.Done()
  260. p.messageQueue.RemoveObservers(event, eventSource)
  261. })
  262. resMonitor, err := p.AddEvent(event, monitor)
  263. if resMonitor == nil {
  264. // Event was not added
  265. p.messageQueue.RemoveObservers(MessageRootMonitorFinished, monitor)
  266. } else {
  267. // Event was added now wait for it to finish
  268. wg.Wait()
  269. }
  270. return resMonitor, err
  271. }
  272. /*
  273. AddEvent adds a new event to the processor. Returns the monitor if the event
  274. triggered a rule and nil if the event was skipped.
  275. */
  276. func (p *eventProcessor) AddEvent(event *Event, eventMonitor Monitor) (Monitor, error) {
  277. // Check that the thread pool is running
  278. if s := p.pool.Status(); s == pool.StatusStopped || s == pool.StatusStopping {
  279. return nil, fmt.Errorf("Cannot add event if the processor is stopping or not running")
  280. }
  281. EventTracer.record(event, "eventProcessor.AddEvent", "Event added to the processor")
  282. // First check if the event is triggering any rules at all
  283. if !p.IsTriggering(event) {
  284. EventTracer.record(event, "eventProcessor.AddEvent", "Event was skipped")
  285. if eventMonitor != nil {
  286. eventMonitor.Skip(event)
  287. }
  288. return nil, nil
  289. }
  290. // Check if we need to construct a new root monitor
  291. if eventMonitor == nil {
  292. eventMonitor = p.NewRootMonitor(nil, nil)
  293. }
  294. if rootMonitor, ok := eventMonitor.(*RootMonitor); ok {
  295. p.messageQueue.AddObserver(MessageRootMonitorFinished, rootMonitor,
  296. func(event string, eventSource interface{}) {
  297. // Call finish handler if there is one
  298. if rm := eventSource.(*RootMonitor); rm.finished != nil {
  299. rm.finished(p)
  300. }
  301. p.messageQueue.RemoveObservers(event, eventSource)
  302. })
  303. }
  304. eventMonitor.Activate(event)
  305. EventTracer.record(event, "eventProcessor.AddEvent", "Adding task to thread pool")
  306. // Kick off event processing (see Processor.ProcessEvent)
  307. p.pool.AddTask(&Task{p, eventMonitor, event})
  308. return eventMonitor, nil
  309. }
  310. /*
  311. IsTriggering checks if a given event triggers a loaded rule. This does not the
  312. actual state matching for speed.
  313. */
  314. func (p *eventProcessor) IsTriggering(event *Event) bool {
  315. var res, ok bool
  316. p.triggeringCacheLock.Lock()
  317. defer p.triggeringCacheLock.Unlock()
  318. // Ensure the triggering cache exists
  319. if p.triggeringCache == nil {
  320. p.triggeringCache = make(map[string]bool)
  321. }
  322. name := event.Name()
  323. if res, ok = p.triggeringCache[name]; !ok {
  324. res = p.ruleIndex.IsTriggering(event)
  325. p.triggeringCache[name] = res
  326. }
  327. return res
  328. }
  329. /*
  330. ProcessEvent processes an event by determining which rules trigger and match
  331. the given event.
  332. */
  333. func (p *eventProcessor) ProcessEvent(tid uint64, event *Event, parent Monitor) map[string]error {
  334. var rulesTriggering []*Rule
  335. var rulesExecuting []*Rule
  336. scope := parent.Scope()
  337. ruleCandidates := p.ruleIndex.Match(event)
  338. suppressedRules := make(map[string]bool)
  339. EventTracer.record(event, "eventProcessor.ProcessEvent", "Processing event")
  340. // Remove candidates which are out of scope
  341. for _, ruleCandidate := range ruleCandidates {
  342. if scope.IsAllowedAll(ruleCandidate.ScopeMatch) {
  343. rulesTriggering = append(rulesTriggering, ruleCandidate)
  344. // Build up a suppression list
  345. for _, suppressedRule := range ruleCandidate.SuppressionList {
  346. suppressedRules[suppressedRule] = true
  347. }
  348. }
  349. }
  350. // Remove suppressed rules
  351. for _, ruleTriggers := range rulesTriggering {
  352. if _, ok := suppressedRules[ruleTriggers.Name]; ok {
  353. continue
  354. }
  355. rulesExecuting = append(rulesExecuting, ruleTriggers)
  356. }
  357. // Sort rules according to their priority (0 is the highest)
  358. SortRuleSlice(rulesExecuting)
  359. // Run rules which are not suppressed
  360. errors := make(map[string]error)
  361. EventTracer.record(event, "eventProcessor.ProcessEvent", "Running rules: ", rulesExecuting)
  362. for _, rule := range rulesExecuting {
  363. if err := rule.Action(p, parent, event, tid); err != nil {
  364. errors[rule.Name] = err
  365. }
  366. if p.failOnFirstError && len(errors) > 0 {
  367. break
  368. }
  369. }
  370. return errors
  371. }
  372. /*
  373. String returns a string representation the processor.
  374. */
  375. func (p *eventProcessor) String() string {
  376. return fmt.Sprintf("RumbleProcessor %v (workers:%v)", p.ID(), p.workerCount)
  377. }
  378. // Unique id creation
  379. // ==================
  380. var pidcounter uint64 = 1
  381. var pidcounterLock = &sync.Mutex{}
  382. /*
  383. newProcId returns a new unique id or processors.
  384. */
  385. func newProcID() uint64 {
  386. pidcounterLock.Lock()
  387. defer pidcounterLock.Unlock()
  388. ret := pidcounter
  389. pidcounter++
  390. return ret
  391. }