processor.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. /*
  2. * ECAL
  3. *
  4. * Copyright 2020 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. package engine
  11. import (
  12. "fmt"
  13. "sync"
  14. "devt.de/krotik/common/flowutil"
  15. "devt.de/krotik/common/pools"
  16. )
  17. /*
  18. Processor is the main object of the event engine. It coordinates the thread pool
  19. and rule index. Rules can only be added if the processor is stopped. Events
  20. can only be added if the processor is not stopped.
  21. */
  22. type Processor interface {
  23. /*
  24. ID returns the processor ID.
  25. */
  26. ID() uint64
  27. /*
  28. Workers returns the number of threads of this processor.
  29. */
  30. Workers() int
  31. /*
  32. Reset removes all stored rules from this processor.
  33. */
  34. Reset() error
  35. /*
  36. AddRule adds a new rule to the processor.
  37. */
  38. AddRule(rule *Rule) error
  39. /*
  40. Rules returns all loaded rules.
  41. */
  42. Rules() map[string]*Rule
  43. /*
  44. Start starts this processor.
  45. */
  46. Start()
  47. /*
  48. Finish will finish all remaining tasks and then stop the processor.
  49. */
  50. Finish()
  51. /*
  52. Stopped returns if the processor is stopped.
  53. */
  54. Stopped() bool
  55. /*
  56. Status returns the status of the processor (Running / Stopping / Stopped).
  57. */
  58. Status() string
  59. /*
  60. NewRootMonitor creates a new root monitor for this processor. This monitor is used to add initial
  61. root events.
  62. */
  63. NewRootMonitor(context map[string]interface{}, scope *RuleScope) *RootMonitor
  64. /*
  65. SetRootMonitorErrorObserver specifies an observer which is triggered
  66. when a root monitor of this processor has finished and returns errors.
  67. By default this is set to nil (no observer).
  68. */
  69. SetRootMonitorErrorObserver(func(rm *RootMonitor))
  70. /*
  71. AddEvent adds a new event to the processor. Returns the monitor if the event
  72. triggered a rule and nil if the event was skipped.
  73. */
  74. AddEvent(event *Event, parentMonitor Monitor) (Monitor, error)
  75. /*
  76. IsTriggering checks if a given event triggers a loaded rule. This does not the
  77. actual state matching for speed.
  78. */
  79. IsTriggering(event *Event) bool
  80. /*
  81. ProcessEvent processes an event by determining which rules trigger and match
  82. the given event.
  83. */
  84. ProcessEvent(event *Event, parent Monitor) map[string]error
  85. /*
  86. String returns a string representation the processor.
  87. */
  88. String() string
  89. }
  90. /*
  91. eventProcessor main implementation of the Processor interface.
  92. Event cycle:
  93. Process -> Triggering -> Matching -> Fire Rule
  94. */
  95. type eventProcessor struct {
  96. id uint64 // Processor ID
  97. pool *pools.ThreadPool // Thread pool of this processor
  98. workerCount int // Number of threads for this processor
  99. ruleIndex RuleIndex // Container for loaded rules
  100. triggeringCache map[string]bool // Cache which remembers which events are triggering
  101. triggeringCacheLock sync.Mutex // Lock for triggeringg cache
  102. messageQueue *flowutil.EventPump // Queue for message passing between components
  103. rmErrorObserver func(rm *RootMonitor) // Error observer for root monitors
  104. }
  105. /*
  106. NewProcessor creates a new event processor with a given number of workers.
  107. */
  108. func NewProcessor(workerCount int) Processor {
  109. ep := flowutil.NewEventPump()
  110. return &eventProcessor{newProcID(), pools.NewThreadPoolWithQueue(NewTaskQueue(ep)),
  111. workerCount, NewRuleIndex(), nil, sync.Mutex{}, ep, nil}
  112. }
  113. /*
  114. ID returns the processor ID.
  115. */
  116. func (p *eventProcessor) ID() uint64 {
  117. return p.id
  118. }
  119. /*
  120. Workers returns the number of threads of this processor.
  121. */
  122. func (p *eventProcessor) Workers() int {
  123. return p.workerCount
  124. }
  125. /*
  126. Reset removes all stored rules from this processor.
  127. */
  128. func (p *eventProcessor) Reset() error {
  129. // Check that the thread pool is stopped
  130. if p.pool.Status() != pools.StatusStopped {
  131. return fmt.Errorf("Cannot reset processor if it has not stopped")
  132. }
  133. // Invalidate triggering cache
  134. p.triggeringCacheLock.Lock()
  135. p.triggeringCache = nil
  136. p.triggeringCacheLock.Unlock()
  137. // Create a new rule index
  138. p.ruleIndex = NewRuleIndex()
  139. return nil
  140. }
  141. /*
  142. AddRule adds a new rule to the processor.
  143. */
  144. func (p *eventProcessor) AddRule(rule *Rule) error {
  145. // Check that the thread pool is stopped
  146. if p.pool.Status() != pools.StatusStopped {
  147. return fmt.Errorf("Cannot add rule if the processor has not stopped")
  148. }
  149. // Invalidate triggering cache
  150. p.triggeringCacheLock.Lock()
  151. p.triggeringCache = nil
  152. p.triggeringCacheLock.Unlock()
  153. return p.ruleIndex.AddRule(rule)
  154. }
  155. /*
  156. Rules returns all loaded rules.
  157. */
  158. func (p *eventProcessor) Rules() map[string]*Rule {
  159. return p.ruleIndex.Rules()
  160. }
  161. /*
  162. Start starts this processor.
  163. */
  164. func (p *eventProcessor) Start() {
  165. p.pool.SetWorkerCount(p.workerCount, false)
  166. }
  167. /*
  168. Finish will finish all remaining tasks and then stop the processor.
  169. */
  170. func (p *eventProcessor) Finish() {
  171. p.pool.JoinAll()
  172. }
  173. /*
  174. Stopped returns if the processor is stopped.
  175. */
  176. func (p *eventProcessor) Stopped() bool {
  177. return p.pool.Status() == pools.StatusStopped
  178. }
  179. /*
  180. Status returns the status of the processor (Running / Stopping / Stopped).
  181. */
  182. func (p *eventProcessor) Status() string {
  183. return p.pool.Status()
  184. }
  185. /*
  186. NewRootMonitor creates a new root monitor for this processor. This monitor is used to add initial
  187. root events.
  188. */
  189. func (p *eventProcessor) NewRootMonitor(context map[string]interface{}, scope *RuleScope) *RootMonitor {
  190. if scope == nil {
  191. scope = NewRuleScope(map[string]bool{
  192. "": true, // Default root monitor has global scope
  193. })
  194. }
  195. return newRootMonitor(context, scope, p.messageQueue)
  196. }
  197. /*
  198. SetRootMonitorErrorObserver specifies an observer which is triggered
  199. when a root monitor of this processor has finished and returns errors.
  200. By default this is set to nil (no observer).
  201. */
  202. func (p *eventProcessor) SetRootMonitorErrorObserver(rmErrorObserver func(rm *RootMonitor)) {
  203. p.rmErrorObserver = rmErrorObserver
  204. }
  205. /*
  206. Notify the root monitor error observer that an error occurred.
  207. */
  208. func (p *eventProcessor) notifyRootMonitorErrors(rm *RootMonitor) {
  209. if p.rmErrorObserver != nil {
  210. p.rmErrorObserver(rm)
  211. }
  212. }
  213. /*
  214. AddEvent adds a new event to the processor. Returns the monitor if the event
  215. triggered a rule and nil if the event was skipped.
  216. */
  217. func (p *eventProcessor) AddEvent(event *Event, parentMonitor Monitor) (Monitor, error) {
  218. // Check that the thread pool is running
  219. if p.pool.Status() == pools.StatusStopped {
  220. return nil, fmt.Errorf("Cannot add event if the processor is not running")
  221. }
  222. EventTracer.record(event, "eventProcessor.AddEvent", "Event added to the processor")
  223. // First check if the event is triggering any rules at all
  224. if !p.IsTriggering(event) {
  225. EventTracer.record(event, "eventProcessor.AddEvent", "Event was skipped")
  226. if parentMonitor != nil {
  227. parentMonitor.Skip(event)
  228. }
  229. return nil, nil
  230. }
  231. // Check if we need to construct a new root monitor
  232. if parentMonitor == nil {
  233. parentMonitor = p.NewRootMonitor(nil, nil)
  234. }
  235. if rootMonitor, ok := parentMonitor.(*RootMonitor); ok {
  236. p.messageQueue.AddObserver(MessageRootMonitorFinished, rootMonitor,
  237. func(event string, eventSource interface{}) {
  238. // Call finish handler if there is one
  239. if rm := eventSource.(*RootMonitor); rm.finished != nil {
  240. rm.finished(p)
  241. }
  242. p.messageQueue.RemoveObservers(event, eventSource)
  243. })
  244. }
  245. parentMonitor.Activate(event)
  246. EventTracer.record(event, "eventProcessor.AddEvent", "Adding task to thread pool")
  247. // Kick off event processing (see Processor.processEvent)
  248. p.pool.AddTask(&Task{p, parentMonitor, event})
  249. return parentMonitor, nil
  250. }
  251. /*
  252. IsTriggering checks if a given event triggers a loaded rule. This does not the
  253. actual state matching for speed.
  254. */
  255. func (p *eventProcessor) IsTriggering(event *Event) bool {
  256. var res, ok bool
  257. p.triggeringCacheLock.Lock()
  258. defer p.triggeringCacheLock.Unlock()
  259. // Ensure the triggering cache exists
  260. if p.triggeringCache == nil {
  261. p.triggeringCache = make(map[string]bool)
  262. }
  263. name := event.Name()
  264. if res, ok = p.triggeringCache[name]; !ok {
  265. res = p.ruleIndex.IsTriggering(event)
  266. p.triggeringCache[name] = res
  267. }
  268. return res
  269. }
  270. /*
  271. ProcessEvent processes an event by determining which rules trigger and match
  272. the given event.
  273. */
  274. func (p *eventProcessor) ProcessEvent(event *Event, parent Monitor) map[string]error {
  275. var rulesTriggering []*Rule
  276. var rulesExecuting []*Rule
  277. scope := parent.Scope()
  278. ruleCandidates := p.ruleIndex.Match(event)
  279. suppressedRules := make(map[string]bool)
  280. EventTracer.record(event, "eventProcessor.ProcessEvent", "Processing event")
  281. // Remove candidates which are out of scope
  282. for _, ruleCandidate := range ruleCandidates {
  283. if scope.IsAllowedAll(ruleCandidate.ScopeMatch) {
  284. rulesTriggering = append(rulesTriggering, ruleCandidate)
  285. // Build up a suppression list
  286. for _, suppressedRule := range ruleCandidate.SuppressionList {
  287. suppressedRules[suppressedRule] = true
  288. }
  289. }
  290. }
  291. // Remove suppressed rules
  292. for _, ruleTriggers := range rulesTriggering {
  293. if _, ok := suppressedRules[ruleTriggers.Name]; ok {
  294. continue
  295. }
  296. rulesExecuting = append(rulesExecuting, ruleTriggers)
  297. }
  298. // Sort rules according to their priority (0 is the highest)
  299. SortRuleSlice(rulesExecuting)
  300. // Run rules which are not suppressed
  301. errors := make(map[string]error)
  302. EventTracer.record(event, "eventProcessor.ProcessEvent", "Running rules: ", rulesExecuting)
  303. for _, rule := range rulesExecuting {
  304. if err := rule.Action(p, parent, event); err != nil {
  305. errors[rule.Name] = err
  306. }
  307. }
  308. parent.Finish()
  309. return errors
  310. }
  311. /*
  312. String returns a string representation the processor.
  313. */
  314. func (p *eventProcessor) String() string {
  315. return fmt.Sprintf("RumbleProcessor %v (workers:%v)", p.ID(), p.workerCount)
  316. }
  317. // Unique id creation
  318. // ==================
  319. var pidcounter uint64 = 1
  320. var pidcounterLock = &sync.Mutex{}
  321. /*
  322. newProcId returns a new unique id or processors.
  323. */
  324. func newProcID() uint64 {
  325. pidcounterLock.Lock()
  326. defer pidcounterLock.Unlock()
  327. ret := pidcounter
  328. pidcounter++
  329. return ret
  330. }