/* * ECAL * * Copyright 2020 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the MIT * License, If a copy of the MIT License was not distributed with this * file, You can obtain one at https://opensource.org/licenses/MIT. */ package engine import ( "fmt" "sync" "devt.de/krotik/common/flowutil" "devt.de/krotik/common/pools" ) /* Processor is the main object of the event engine. It coordinates the thread pool and rule index. Rules can only be added if the processor is stopped. Events can only be added if the processor is not stopped. */ type Processor interface { /* ID returns the processor ID. */ ID() uint64 /* Workers returns the number of threads of this processor. */ Workers() int /* Reset removes all stored rules from this processor. */ Reset() error /* AddRule adds a new rule to the processor. */ AddRule(rule *Rule) error /* Rules returns all loaded rules. */ Rules() map[string]*Rule /* Start starts this processor. */ Start() /* Finish will finish all remaining tasks and then stop the processor. */ Finish() /* Stopped returns if the processor is stopped. */ Stopped() bool /* Status returns the status of the processor (Running / Stopping / Stopped). */ Status() string /* NewRootMonitor creates a new root monitor for this processor. This monitor is used to add initial root events. */ NewRootMonitor(context map[string]interface{}, scope *RuleScope) *RootMonitor /* SetRootMonitorErrorObserver specifies an observer which is triggered when a root monitor of this processor has finished and returns errors. By default this is set to nil (no observer). */ SetRootMonitorErrorObserver(func(rm *RootMonitor)) /* AddEvent adds a new event to the processor. Returns the monitor if the event triggered a rule and nil if the event was skipped. */ AddEvent(event *Event, parentMonitor Monitor) (Monitor, error) /* IsTriggering checks if a given event triggers a loaded rule. This does not the actual state matching for speed. */ IsTriggering(event *Event) bool /* ProcessEvent processes an event by determining which rules trigger and match the given event. */ ProcessEvent(event *Event, parent Monitor) map[string]error /* String returns a string representation the processor. */ String() string } /* eventProcessor main implementation of the Processor interface. Event cycle: Process -> Triggering -> Matching -> Fire Rule */ type eventProcessor struct { id uint64 // Processor ID pool *pools.ThreadPool // Thread pool of this processor workerCount int // Number of threads for this processor ruleIndex RuleIndex // Container for loaded rules triggeringCache map[string]bool // Cache which remembers which events are triggering triggeringCacheLock sync.Mutex // Lock for triggeringg cache messageQueue *flowutil.EventPump // Queue for message passing between components rmErrorObserver func(rm *RootMonitor) // Error observer for root monitors } /* NewProcessor creates a new event processor with a given number of workers. */ func NewProcessor(workerCount int) Processor { ep := flowutil.NewEventPump() return &eventProcessor{newProcID(), pools.NewThreadPoolWithQueue(NewTaskQueue(ep)), workerCount, NewRuleIndex(), nil, sync.Mutex{}, ep, nil} } /* ID returns the processor ID. */ func (p *eventProcessor) ID() uint64 { return p.id } /* Workers returns the number of threads of this processor. */ func (p *eventProcessor) Workers() int { return p.workerCount } /* Reset removes all stored rules from this processor. */ func (p *eventProcessor) Reset() error { // Check that the thread pool is stopped if p.pool.Status() != pools.StatusStopped { return fmt.Errorf("Cannot reset processor if it has not stopped") } // Invalidate triggering cache p.triggeringCacheLock.Lock() p.triggeringCache = nil p.triggeringCacheLock.Unlock() // Create a new rule index p.ruleIndex = NewRuleIndex() return nil } /* AddRule adds a new rule to the processor. */ func (p *eventProcessor) AddRule(rule *Rule) error { // Check that the thread pool is stopped if p.pool.Status() != pools.StatusStopped { return fmt.Errorf("Cannot add rule if the processor has not stopped") } // Invalidate triggering cache p.triggeringCacheLock.Lock() p.triggeringCache = nil p.triggeringCacheLock.Unlock() return p.ruleIndex.AddRule(rule) } /* Rules returns all loaded rules. */ func (p *eventProcessor) Rules() map[string]*Rule { return p.ruleIndex.Rules() } /* Start starts this processor. */ func (p *eventProcessor) Start() { p.pool.SetWorkerCount(p.workerCount, false) } /* Finish will finish all remaining tasks and then stop the processor. */ func (p *eventProcessor) Finish() { p.pool.JoinAll() } /* Stopped returns if the processor is stopped. */ func (p *eventProcessor) Stopped() bool { return p.pool.Status() == pools.StatusStopped } /* Status returns the status of the processor (Running / Stopping / Stopped). */ func (p *eventProcessor) Status() string { return p.pool.Status() } /* NewRootMonitor creates a new root monitor for this processor. This monitor is used to add initial root events. */ func (p *eventProcessor) NewRootMonitor(context map[string]interface{}, scope *RuleScope) *RootMonitor { if scope == nil { scope = NewRuleScope(map[string]bool{ "": true, // Default root monitor has global scope }) } return newRootMonitor(context, scope, p.messageQueue) } /* SetRootMonitorErrorObserver specifies an observer which is triggered when a root monitor of this processor has finished and returns errors. By default this is set to nil (no observer). */ func (p *eventProcessor) SetRootMonitorErrorObserver(rmErrorObserver func(rm *RootMonitor)) { p.rmErrorObserver = rmErrorObserver } /* Notify the root monitor error observer that an error occurred. */ func (p *eventProcessor) notifyRootMonitorErrors(rm *RootMonitor) { if p.rmErrorObserver != nil { p.rmErrorObserver(rm) } } /* AddEvent adds a new event to the processor. Returns the monitor if the event triggered a rule and nil if the event was skipped. */ func (p *eventProcessor) AddEvent(event *Event, parentMonitor Monitor) (Monitor, error) { // Check that the thread pool is running if p.pool.Status() == pools.StatusStopped { return nil, fmt.Errorf("Cannot add event if the processor is not running") } EventTracer.record(event, "eventProcessor.AddEvent", "Event added to the processor") // First check if the event is triggering any rules at all if !p.IsTriggering(event) { EventTracer.record(event, "eventProcessor.AddEvent", "Event was skipped") if parentMonitor != nil { parentMonitor.Skip(event) } return nil, nil } // Check if we need to construct a new root monitor if parentMonitor == nil { parentMonitor = p.NewRootMonitor(nil, nil) } if rootMonitor, ok := parentMonitor.(*RootMonitor); ok { p.messageQueue.AddObserver(MessageRootMonitorFinished, rootMonitor, func(event string, eventSource interface{}) { // Call finish handler if there is one if rm := eventSource.(*RootMonitor); rm.finished != nil { rm.finished(p) } p.messageQueue.RemoveObservers(event, eventSource) }) } parentMonitor.Activate(event) EventTracer.record(event, "eventProcessor.AddEvent", "Adding task to thread pool") // Kick off event processing (see Processor.processEvent) p.pool.AddTask(&Task{p, parentMonitor, event}) return parentMonitor, nil } /* IsTriggering checks if a given event triggers a loaded rule. This does not the actual state matching for speed. */ func (p *eventProcessor) IsTriggering(event *Event) bool { var res, ok bool p.triggeringCacheLock.Lock() defer p.triggeringCacheLock.Unlock() // Ensure the triggering cache exists if p.triggeringCache == nil { p.triggeringCache = make(map[string]bool) } name := event.Name() if res, ok = p.triggeringCache[name]; !ok { res = p.ruleIndex.IsTriggering(event) p.triggeringCache[name] = res } return res } /* ProcessEvent processes an event by determining which rules trigger and match the given event. */ func (p *eventProcessor) ProcessEvent(event *Event, parent Monitor) map[string]error { var rulesTriggering []*Rule var rulesExecuting []*Rule scope := parent.Scope() ruleCandidates := p.ruleIndex.Match(event) suppressedRules := make(map[string]bool) EventTracer.record(event, "eventProcessor.ProcessEvent", "Processing event") // Remove candidates which are out of scope for _, ruleCandidate := range ruleCandidates { if scope.IsAllowedAll(ruleCandidate.ScopeMatch) { rulesTriggering = append(rulesTriggering, ruleCandidate) // Build up a suppression list for _, suppressedRule := range ruleCandidate.SuppressionList { suppressedRules[suppressedRule] = true } } } // Remove suppressed rules for _, ruleTriggers := range rulesTriggering { if _, ok := suppressedRules[ruleTriggers.Name]; ok { continue } rulesExecuting = append(rulesExecuting, ruleTriggers) } // Sort rules according to their priority (0 is the highest) SortRuleSlice(rulesExecuting) // Run rules which are not suppressed errors := make(map[string]error) EventTracer.record(event, "eventProcessor.ProcessEvent", "Running rules: ", rulesExecuting) for _, rule := range rulesExecuting { if err := rule.Action(p, parent, event); err != nil { errors[rule.Name] = err } } parent.Finish() return errors } /* String returns a string representation the processor. */ func (p *eventProcessor) String() string { return fmt.Sprintf("RumbleProcessor %v (workers:%v)", p.ID(), p.workerCount) } // Unique id creation // ================== var pidcounter uint64 = 1 var pidcounterLock = &sync.Mutex{} /* newProcId returns a new unique id or processors. */ func newProcID() uint64 { pidcounterLock.Lock() defer pidcounterLock.Unlock() ret := pidcounter pidcounter++ return ret }