processor.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. /*
  2. * ECAL
  3. *
  4. * Copyright 2020 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. package engine
  11. import (
  12. "fmt"
  13. "os"
  14. "sync"
  15. "devt.de/krotik/ecal/engine/pool"
  16. "devt.de/krotik/ecal/engine/pubsub"
  17. )
  18. /*
  19. Processor is the main object of the event engine. It coordinates the thread pool
  20. and rule index. Rules can only be added if the processor is stopped. Events
  21. can only be added if the processor is not stopped.
  22. */
  23. type Processor interface {
  24. /*
  25. ID returns the processor ID.
  26. */
  27. ID() uint64
  28. /*
  29. ThreadPool returns the thread pool which this processor is using.
  30. */
  31. ThreadPool() *pool.ThreadPool
  32. /*
  33. Workers returns the number of threads of this processor.
  34. */
  35. Workers() int
  36. /*
  37. Reset removes all stored rules from this processor.
  38. */
  39. Reset() error
  40. /*
  41. AddRule adds a new rule to the processor.
  42. */
  43. AddRule(rule *Rule) error
  44. /*
  45. Rules returns all loaded rules.
  46. */
  47. Rules() map[string]*Rule
  48. /*
  49. Start starts this processor.
  50. */
  51. Start()
  52. /*
  53. Finish will finish all remaining tasks and then stop the processor.
  54. */
  55. Finish()
  56. /*
  57. Stopped returns if the processor is stopped.
  58. */
  59. Stopped() bool
  60. /*
  61. Status returns the status of the processor (Running / Stopping / Stopped).
  62. */
  63. Status() string
  64. /*
  65. NewRootMonitor creates a new root monitor for this processor. This monitor is used to add initial
  66. root events.
  67. */
  68. NewRootMonitor(context map[string]interface{}, scope *RuleScope) *RootMonitor
  69. /*
  70. SetRootMonitorErrorObserver specifies an observer which is triggered
  71. when a root monitor of this processor has finished and returns errors.
  72. By default this is set to nil (no observer).
  73. */
  74. SetRootMonitorErrorObserver(func(rm *RootMonitor))
  75. /*
  76. SetFailOnFirstErrorInTriggerSequence sets the behavior when rules return errors.
  77. If set to false (default) then all rules in a trigger sequence for a specific event
  78. are executed. If set to true then the first rule which returns an error will stop
  79. the trigger sequence. Events which have been added by the failing rule are still processed.
  80. */
  81. SetFailOnFirstErrorInTriggerSequence(bool)
  82. /*
  83. AddEventAndWait adds a new event to the processor and waits for the resulting event cascade
  84. to finish. If a monitor is passed then it must be a RootMonitor.
  85. */
  86. AddEventAndWait(event *Event, monitor *RootMonitor) (Monitor, error)
  87. /*
  88. AddEvent adds a new event to the processor. Returns the monitor if the event
  89. triggered a rule and nil if the event was skipped.
  90. */
  91. AddEvent(event *Event, parentMonitor Monitor) (Monitor, error)
  92. /*
  93. IsTriggering checks if a given event triggers a loaded rule. This does not the
  94. actual state matching for speed.
  95. */
  96. IsTriggering(event *Event) bool
  97. /*
  98. ProcessEvent processes an event by determining which rules trigger and match
  99. the given event. This function must receive a unique thread ID from the
  100. executing thread.
  101. */
  102. ProcessEvent(tid uint64, event *Event, parent Monitor) map[string]error
  103. /*
  104. String returns a string representation the processor.
  105. */
  106. String() string
  107. }
  108. /*
  109. eventProcessor main implementation of the Processor interface.
  110. Event cycle:
  111. Process -> Triggering -> Matching -> Fire Rule
  112. */
  113. type eventProcessor struct {
  114. id uint64 // Processor ID
  115. pool *pool.ThreadPool // Thread pool of this processor
  116. workerCount int // Number of threads for this processor
  117. failOnFirstError bool // Stop rule execution on first error in an event trigger sequence
  118. ruleIndex RuleIndex // Container for loaded rules
  119. triggeringCache map[string]bool // Cache which remembers which events are triggering
  120. triggeringCacheLock sync.Mutex // Lock for triggeringg cache
  121. messageQueue *pubsub.EventPump // Queue for message passing between components
  122. rmErrorObserver func(rm *RootMonitor) // Error observer for root monitors
  123. }
  124. /*
  125. NewProcessor creates a new event processor with a given number of workers.
  126. */
  127. func NewProcessor(workerCount int) Processor {
  128. ep := pubsub.NewEventPump()
  129. pool := pool.NewThreadPoolWithQueue(NewTaskQueue(ep))
  130. pool.TooManyThreshold = 10
  131. pool.TooManyCallback = func() {
  132. fmt.Fprintf(os.Stderr, "Warning: The thread pool queue is filling up ...")
  133. }
  134. return &eventProcessor{newProcID(), pool,
  135. workerCount, false, NewRuleIndex(), nil, sync.Mutex{}, ep, nil}
  136. }
  137. /*
  138. ID returns the processor ID.
  139. */
  140. func (p *eventProcessor) ID() uint64 {
  141. return p.id
  142. }
  143. /*
  144. ThreadPool returns the thread pool which this processor is using.
  145. */
  146. func (p *eventProcessor) ThreadPool() *pool.ThreadPool {
  147. return p.pool
  148. }
  149. /*
  150. Workers returns the number of threads of this processor.
  151. */
  152. func (p *eventProcessor) Workers() int {
  153. return p.workerCount
  154. }
  155. /*
  156. Reset removes all stored rules from this processor.
  157. */
  158. func (p *eventProcessor) Reset() error {
  159. // Check that the thread pool is stopped
  160. if p.pool.Status() != pool.StatusStopped {
  161. return fmt.Errorf("Cannot reset processor if it has not stopped")
  162. }
  163. // Invalidate triggering cache
  164. p.triggeringCacheLock.Lock()
  165. p.triggeringCache = nil
  166. p.triggeringCacheLock.Unlock()
  167. // Create a new rule index
  168. p.ruleIndex = NewRuleIndex()
  169. return nil
  170. }
  171. /*
  172. AddRule adds a new rule to the processor.
  173. */
  174. func (p *eventProcessor) AddRule(rule *Rule) error {
  175. // Check that the thread pool is stopped
  176. if p.pool.Status() != pool.StatusStopped {
  177. return fmt.Errorf("Cannot add rule if the processor has not stopped")
  178. }
  179. // Invalidate triggering cache
  180. p.triggeringCacheLock.Lock()
  181. p.triggeringCache = nil
  182. p.triggeringCacheLock.Unlock()
  183. return p.ruleIndex.AddRule(rule)
  184. }
  185. /*
  186. Rules returns all loaded rules.
  187. */
  188. func (p *eventProcessor) Rules() map[string]*Rule {
  189. return p.ruleIndex.Rules()
  190. }
  191. /*
  192. Start starts this processor.
  193. */
  194. func (p *eventProcessor) Start() {
  195. p.pool.SetWorkerCount(p.workerCount, false)
  196. }
  197. /*
  198. Finish will finish all remaining tasks and then stop the processor.
  199. */
  200. func (p *eventProcessor) Finish() {
  201. p.pool.JoinAll()
  202. }
  203. /*
  204. Stopped returns if the processor is stopped.
  205. */
  206. func (p *eventProcessor) Stopped() bool {
  207. return p.pool.Status() == pool.StatusStopped
  208. }
  209. /*
  210. Status returns the status of the processor (Running / Stopping / Stopped).
  211. */
  212. func (p *eventProcessor) Status() string {
  213. return p.pool.Status()
  214. }
  215. /*
  216. NewRootMonitor creates a new root monitor for this processor. This monitor is used to add initial
  217. root events.
  218. */
  219. func (p *eventProcessor) NewRootMonitor(context map[string]interface{}, scope *RuleScope) *RootMonitor {
  220. if scope == nil {
  221. scope = NewRuleScope(map[string]bool{
  222. "": true, // Default root monitor has global scope
  223. })
  224. }
  225. return newRootMonitor(context, scope, p.messageQueue)
  226. }
  227. /*
  228. SetRootMonitorErrorObserver specifies an observer which is triggered
  229. when a root monitor of this processor has finished and returns errors.
  230. By default this is set to nil (no observer).
  231. */
  232. func (p *eventProcessor) SetRootMonitorErrorObserver(rmErrorObserver func(rm *RootMonitor)) {
  233. p.rmErrorObserver = rmErrorObserver
  234. }
  235. /*
  236. SetFailOnFirstErrorInTriggerSequence sets the behavior when rules return errors.
  237. If set to false (default) then all rules in a trigger sequence for a specific event
  238. are executed. If set to true then the first rule which returns an error will stop
  239. the trigger sequence. Events which have been added by the failing rule are still processed.
  240. */
  241. func (p *eventProcessor) SetFailOnFirstErrorInTriggerSequence(v bool) {
  242. p.failOnFirstError = v
  243. }
  244. /*
  245. Notify the root monitor error observer that an error occurred.
  246. */
  247. func (p *eventProcessor) notifyRootMonitorErrors(rm *RootMonitor) {
  248. if p.rmErrorObserver != nil {
  249. p.rmErrorObserver(rm)
  250. }
  251. }
  252. /*
  253. AddEventAndWait adds a new event to the processor and waits for the resulting event cascade
  254. to finish. If a monitor is passed then it must be a RootMonitor.
  255. */
  256. func (p *eventProcessor) AddEventAndWait(event *Event, monitor *RootMonitor) (Monitor, error) {
  257. var wg sync.WaitGroup
  258. wg.Add(1)
  259. if monitor == nil {
  260. monitor = p.NewRootMonitor(nil, nil)
  261. }
  262. p.messageQueue.AddObserver(MessageRootMonitorFinished, monitor,
  263. func(event string, eventSource interface{}) {
  264. // Everything has finished
  265. wg.Done()
  266. p.messageQueue.RemoveObservers(event, eventSource)
  267. })
  268. resMonitor, err := p.AddEvent(event, monitor)
  269. if resMonitor == nil {
  270. // Event was not added
  271. p.messageQueue.RemoveObservers(MessageRootMonitorFinished, monitor)
  272. } else {
  273. // Event was added now wait for it to finish
  274. wg.Wait()
  275. }
  276. return resMonitor, err
  277. }
  278. /*
  279. AddEvent adds a new event to the processor. Returns the monitor if the event
  280. triggered a rule and nil if the event was skipped.
  281. */
  282. func (p *eventProcessor) AddEvent(event *Event, eventMonitor Monitor) (Monitor, error) {
  283. // Check that the thread pool is running
  284. if s := p.pool.Status(); s == pool.StatusStopped || s == pool.StatusStopping {
  285. return nil, fmt.Errorf("Cannot add event if the processor is stopping or not running")
  286. }
  287. EventTracer.record(event, "eventProcessor.AddEvent", "Event added to the processor")
  288. // First check if the event is triggering any rules at all
  289. if !p.IsTriggering(event) {
  290. EventTracer.record(event, "eventProcessor.AddEvent", "Event was skipped")
  291. if eventMonitor != nil {
  292. eventMonitor.Skip(event)
  293. }
  294. return nil, nil
  295. }
  296. // Check if we need to construct a new root monitor
  297. if eventMonitor == nil {
  298. eventMonitor = p.NewRootMonitor(nil, nil)
  299. }
  300. if rootMonitor, ok := eventMonitor.(*RootMonitor); ok {
  301. p.messageQueue.AddObserver(MessageRootMonitorFinished, rootMonitor,
  302. func(event string, eventSource interface{}) {
  303. // Call finish handler if there is one
  304. if rm := eventSource.(*RootMonitor); rm.finished != nil {
  305. rm.finished(p)
  306. }
  307. p.messageQueue.RemoveObservers(event, eventSource)
  308. })
  309. }
  310. eventMonitor.Activate(event)
  311. EventTracer.record(event, "eventProcessor.AddEvent", "Adding task to thread pool")
  312. // Kick off event processing (see Processor.ProcessEvent)
  313. p.pool.AddTask(&Task{p, eventMonitor, event})
  314. return eventMonitor, nil
  315. }
  316. /*
  317. IsTriggering checks if a given event triggers a loaded rule. This does not the
  318. actual state matching for speed.
  319. */
  320. func (p *eventProcessor) IsTriggering(event *Event) bool {
  321. var res, ok bool
  322. p.triggeringCacheLock.Lock()
  323. defer p.triggeringCacheLock.Unlock()
  324. // Ensure the triggering cache exists
  325. if p.triggeringCache == nil {
  326. p.triggeringCache = make(map[string]bool)
  327. }
  328. name := event.Name()
  329. if res, ok = p.triggeringCache[name]; !ok {
  330. res = p.ruleIndex.IsTriggering(event)
  331. p.triggeringCache[name] = res
  332. }
  333. return res
  334. }
  335. /*
  336. ProcessEvent processes an event by determining which rules trigger and match
  337. the given event.
  338. */
  339. func (p *eventProcessor) ProcessEvent(tid uint64, event *Event, parent Monitor) map[string]error {
  340. var rulesTriggering []*Rule
  341. var rulesExecuting []*Rule
  342. scope := parent.Scope()
  343. ruleCandidates := p.ruleIndex.Match(event)
  344. suppressedRules := make(map[string]bool)
  345. EventTracer.record(event, "eventProcessor.ProcessEvent", "Processing event")
  346. // Remove candidates which are out of scope
  347. for _, ruleCandidate := range ruleCandidates {
  348. if scope.IsAllowedAll(ruleCandidate.ScopeMatch) {
  349. rulesTriggering = append(rulesTriggering, ruleCandidate)
  350. // Build up a suppression list
  351. for _, suppressedRule := range ruleCandidate.SuppressionList {
  352. suppressedRules[suppressedRule] = true
  353. }
  354. }
  355. }
  356. // Remove suppressed rules
  357. for _, ruleTriggers := range rulesTriggering {
  358. if _, ok := suppressedRules[ruleTriggers.Name]; ok {
  359. continue
  360. }
  361. rulesExecuting = append(rulesExecuting, ruleTriggers)
  362. }
  363. // Sort rules according to their priority (0 is the highest)
  364. SortRuleSlice(rulesExecuting)
  365. // Run rules which are not suppressed
  366. errors := make(map[string]error)
  367. EventTracer.record(event, "eventProcessor.ProcessEvent", "Running rules: ", rulesExecuting)
  368. for _, rule := range rulesExecuting {
  369. if err := rule.Action(p, parent, event, tid); err != nil {
  370. errors[rule.Name] = err
  371. }
  372. if p.failOnFirstError && len(errors) > 0 {
  373. break
  374. }
  375. }
  376. return errors
  377. }
  378. /*
  379. String returns a string representation the processor.
  380. */
  381. func (p *eventProcessor) String() string {
  382. return fmt.Sprintf("EventProcessor %v (workers:%v)", p.ID(), p.workerCount)
  383. }
  384. // Unique id creation
  385. // ==================
  386. var pidcounter uint64 = 1
  387. var pidcounterLock = &sync.Mutex{}
  388. /*
  389. newProcId returns a new unique id or processors.
  390. */
  391. func newProcID() uint64 {
  392. pidcounterLock.Lock()
  393. defer pidcounterLock.Unlock()
  394. ret := pidcounter
  395. pidcounter++
  396. return ret
  397. }