Browse Source

feat: Adding ECA engine

Matthias Ladkau 3 years ago
parent
commit
6ba51d74fe
12 changed files with 3959 additions and 0 deletions
  1. 124 0
      engine.md
  2. 124 0
      engine/debug.go
  3. 60 0
      engine/event.go
  4. 468 0
      engine/monitor.go
  5. 431 0
      engine/processor.go
  6. 742 0
      engine/processor_test.go
  7. 804 0
      engine/rule.go
  8. 509 0
      engine/rule_test.go
  9. 236 0
      engine/taskqueue.go
  10. 223 0
      engine/taskqueue_test.go
  11. 161 0
      engine/util.go
  12. 77 0
      engine/util_test.go

+ 124 - 0
engine.md

@@ -0,0 +1,124 @@
+ECA Engine
+==========
+The ECA engine is ECAL's low-level event engine which does the actual concurrent event processing. Through ECAL a user can define rules which execute certain actions under certain conditions. The engine is defined in `ecal.engine`.
+
+Priorities
+----------
+The event-based system relies heavily on priorities for control flow. Both events and rules (which are triggered by events) have priorities. By default events and rules have the priority 0 which is the highest priority. Events are processed according to their priority and all triggering rules of a single event are executed according to their priority.
+
+Processor
+---------
+The processor is the central piece of the event engine. It controls the thread pool, contains the rule index and handles the event processing.
+
+The engines behaviour is solely defined by rules. These rules are added before the engine is started. Each added rule has a priority which determines their execution order if multiple rules are triggered by the same event. The main processing cycle, once the engine has been started, can be described as:
+
+Event injection -> Triggering check -> Rule Matching -> Fire Rules
+
+When injecting a new event it is possible to also pass a monitor with a certain scope and a priority. The scope is used by the processor to narrow down the triggering rules. A possible scenario for scopes are different types of analysis (e.g. quick analysis or deep analysis - only a subset of rules is required for the quick analysis). The priority determines when an event is processed - higher priority events are processed first.
+
+After an event is injected the Processor first checks if anything triggers on the event. The result of this is cached. The trigger check is just a first quick check to determine if the event can be discarded right away - even if the event passes the check, it is possible, that no rule will actually fire.
+
+After the first triggering check passed, the event is handed over to a task which runs in the thread pool. The task uses the rule index to determine all triggering rules. After filtering rules which are out of scope or which are suppressed by other rules, the remaining rules are sorted by their priority and then their action is executed.
+
+A rule action can inject new events into the processor which starts the processing cycle again.
+
+
+Monitor
+-------
+For every event there is a monitor following the event. Monitors form trees as the events cascade. Monitor objects hold additional information such as priority (how quickly should the associated event be processed), processing errors, rule scope, as well as context objects.
+
+
+Rules
+-----
+Rules define the conditions under which a particular action should be executed. Every rule must have the following properties:
+
+- [Name] A name which identifies the rule.
+- [KindMatch] Match on event kinds: A list of strings in dot notation which describes event kinds. May contain '*' characters as wildcards (e.g. core.tests.*).
+- [ScopeMatch] Match on event cascade scope: A list of strings in dot notation which describe the required scopes which are required for this rule to trigger. The included / excluded scopes for an event are stored in its monitor.
+- [StateMatch] Match on event state: A simple list of required key / value states in the event state. Nil values can be used as wildcards (i.e. match is only on key).
+- [Priority] Rules are sorted by their priority before their actions are executed.
+- [SuppressionList] A list of rules (identified by their name) which should be suppressed if this rule fires.
+- [Action] A function which will be executed if this rule fires.
+
+
+Events
+------
+Events are injected into the processor and cause rules to fire. An event is a simple object which contains:
+
+- [Name] A name which identifies the event.
+- [Kind] An event kind - this is checked against the kind match of rules during the triggering check.
+- [State] An event state which contains additional data.
+
+Events are always processed together with a monitor which is either implicitly created or explicitly given together with the event. If the monitor is explicitly given it is possible to specify an event scope which limits the triggering rules and a priority which determines the event processing order. An event with a lower priority is guaranteed to be processed after all events of a higher priority if these have been added before the lower priority event.
+
+Example
+-------
+- A client instantiates a new Processor giving the number of worker threads which should be used to process rules (a good number here are the cores of the physical processor).
+
+```
+proc := NewProcessor(1)
+```
+
+- The client adds rules to the processor.
+
+```
+rule1 := &Rule{
+		"TestRule1",                            // Name
+		"My test rule",                         // Description
+		[]string{"core.main.event1"},           // Kind match
+		[]string{"data"},                       // Match on event cascade scope
+		nil,                                    // No state match
+		2,                                      // Priority of the rule
+		[]string{"TestRule3", "TestRule2"},     // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+      ... code of the rule
+
+			p.AddEvent(&Event{
+							"Next Event",
+							[]string{"core", "main", "event2"},
+							nil,
+						}, m.NewChildMonitor(1))        // New monitor with priority for new event
+		},
+	}
+
+proc.AddRule(rule1)
+...
+```
+
+- The processor is started. At this point the thread pool inside the processor is waiting for tasks with the defined number of worker threads.
+
+```
+proc.SetRootMonitorErrorObserver(func(rm *RootMonitor) { // Called once a root monitor has finished
+	errs := rm.AllErrors()
+	...
+})
+
+proc.Start()
+```
+
+- A root monitor is instantiated and an initial event is added.
+
+```
+e := NewEvent(
+  "InitialEvent",                      // Name
+  []string{"core", "main", "event1"},  // Kind
+  map[interface{}]interface{}{         // State
+    "foo":  "bar",
+  },
+)
+
+rootm := proc.NewRootMonitor(nil, nil)
+
+rootm.SetFinishHandler(func(p Processor) { // Handler for end of event cascade
+  ...
+})
+
+proc.AddEvent(e, rootm)
+```
+
+- The processor can run as long as needed and can be finished when the application should be terminated.
+
+```
+proc.Finish()
+```
+Calling `Finish()` will finish all remaining tasks and then stop the processor.

+ 124 - 0
engine/debug.go

@@ -0,0 +1,124 @@
+/*
+ * ECAL
+ *
+ * Copyright 2020 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package engine
+
+import (
+	"fmt"
+	"io"
+	"os"
+	"regexp"
+	"strings"
+	"sync"
+
+	"devt.de/krotik/common/stringutil"
+)
+
+/*
+EventTracer is a debugging interface to the engine
+*/
+var EventTracer = &eventTrace{lock: &sync.Mutex{}, out: os.Stdout}
+
+/*
+eventTrace handles low-level event tracing for debugging purposes
+*/
+type eventTrace struct {
+	lock            *sync.Mutex
+	eventTraceKind  []string
+	eventTraceState []map[interface{}]interface{}
+	out             io.Writer
+}
+
+/*
+MonitorEvent adds a request to monitor certain events. The events to monitor
+should match the given kind and have the given state values (nil values match
+only on the key).
+*/
+func (et *eventTrace) MonitorEvent(kind string, state map[interface{}]interface{}) {
+	et.lock.Lock()
+	defer et.lock.Unlock()
+
+	et.eventTraceKind = append(et.eventTraceKind, kind)
+	et.eventTraceState = append(et.eventTraceState, state)
+}
+
+/*
+Reset removes all added monitoring requests.
+*/
+func (et *eventTrace) Reset() {
+	et.lock.Lock()
+	defer et.lock.Unlock()
+
+	et.eventTraceKind = nil
+	et.eventTraceState = nil
+}
+
+/*
+record records an event action.
+*/
+func (et *eventTrace) record(which *Event, where string, what ...interface{}) {
+	et.lock.Lock()
+	defer et.lock.Unlock()
+
+	if et.eventTraceKind == nil {
+
+		// Return in the normal case
+
+		return
+	}
+
+	whichKind := strings.Join(which.Kind(), ".")
+
+	// Check if the event matches
+
+	for i, tkind := range et.eventTraceKind {
+		tstate := et.eventTraceState[i]
+
+		regexMatch, _ := regexp.MatchString(tkind, whichKind)
+
+		if whichKind == tkind || regexMatch {
+
+			if tstate == nil || stateMatch(tstate, which.State()) {
+
+				fmt.Fprintln(et.out, fmt.Sprintf("%v %v", tkind, where))
+
+				for _, w := range what {
+					fmt.Fprintln(et.out, fmt.Sprintf("    %v",
+						stringutil.ConvertToString(w)))
+				}
+
+				fmt.Fprintln(et.out, fmt.Sprintf("    %v", which))
+			}
+		}
+	}
+}
+
+// Helper functions
+// ================
+
+/*
+stateMatch checks if a given template matches a given event state.
+*/
+func stateMatch(template, state map[interface{}]interface{}) bool {
+
+	for k, v := range template {
+		if sv, ok := state[k]; !ok {
+			return false
+		} else if v != nil {
+			regexMatch, _ := regexp.MatchString(fmt.Sprint(v), fmt.Sprint(sv))
+
+			if v != sv && !regexMatch {
+				return false
+			}
+		}
+	}
+
+	return true
+}

+ 60 - 0
engine/event.go

@@ -0,0 +1,60 @@
+/*
+ * ECAL
+ *
+ * Copyright 2020 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package engine
+
+import (
+	"fmt"
+	"strings"
+
+	"devt.de/krotik/common/stringutil"
+)
+
+/*
+Event data structure
+*/
+type Event struct {
+	name  string                      // Name of the event
+	kind  []string                    // Kind of the event (dot notation expressed as array)
+	state map[interface{}]interface{} // Event state
+}
+
+/*
+NewEvent returns a new event object.
+*/
+func NewEvent(name string, kind []string, state map[interface{}]interface{}) *Event {
+	return &Event{name, kind, state}
+}
+
+/*
+Name returns the event name.
+*/
+func (e *Event) Name() string {
+	return e.name
+}
+
+/*
+Kind returns the event kind.
+*/
+func (e *Event) Kind() []string {
+	return e.kind
+}
+
+/*
+State returns the event state.
+*/
+func (e *Event) State() map[interface{}]interface{} {
+	return e.state
+}
+
+func (e *Event) String() string {
+	return fmt.Sprintf("Event: %v %v %v", e.name, strings.Join(e.kind, "."),
+		stringutil.ConvertToString(e.state))
+}

+ 468 - 0
engine/monitor.go

@@ -0,0 +1,468 @@
+/*
+ * ECAL
+ *
+ * Copyright 2020 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package engine
+
+import (
+	"bytes"
+	"container/heap"
+	"fmt"
+	"sync"
+
+	"devt.de/krotik/common/errorutil"
+	"devt.de/krotik/common/flowutil"
+	"devt.de/krotik/common/sortutil"
+)
+
+/*
+Monitor monitors events as they are cascading. Event cascades will produce tree
+structures.
+*/
+type Monitor interface {
+
+	/*
+	   ID returns the monitor ID.
+	*/
+	ID() uint64
+
+	/*
+	   NewChildMonitor creates a new child monitor of this monitor.
+	*/
+	NewChildMonitor(priority int) Monitor
+
+	/*
+	   Scope returns the rule scope of this monitor.
+	*/
+	Scope() *RuleScope
+
+	/*
+	   Priority returns the monitors priority.
+	*/
+	Priority() int
+
+	/*
+		Activated returns if this monitor has been activated.
+	*/
+	IsActivated() bool
+
+	/*
+		Activate activates this monitor with a given event.
+	*/
+	Activate(e *Event)
+
+	/*
+	   Skip finishes this monitor without activation.
+	*/
+	Skip(e *Event)
+
+	/*
+		Finish finishes this monitor.
+	*/
+	Finish()
+
+	/*
+		Returns the root monitor of this monitor.
+	*/
+	RootMonitor() *RootMonitor
+
+	/*
+		Errors returns the error object of this monitor.
+	*/
+	Errors() *TaskError
+
+	/*
+		SetErrors adds an error object to this monitor.
+	*/
+	SetErrors(e *TaskError)
+
+	/*
+		EventPath returns the chain of events which created this monitor.
+	*/
+	EventPath() []*Event
+
+	/*
+	   EventPathString returns the event path as a string.
+	*/
+	EventPathString() string
+
+	/*
+		String returns a string representation of this monitor.
+	*/
+	String() string
+}
+
+/*
+monitorBase provides the basic functions and fields for any monitor.
+*/
+type monitorBase struct {
+	id      uint64                 // Monitor ID
+	Parent  *monitorBase           // Parent monitor
+	Context map[string]interface{} // Context object of the monitor
+	Err     *TaskError             // Errors which occurred during event processing
+
+	priority    int          // Priority of the monitor
+	rootMonitor *RootMonitor // Root monitor
+	event       *Event       // Event which activated this monitor
+	activated   bool         // Flag indicating if the monitor was activated
+	finished    bool         // Flag indicating if the monitor has finished
+}
+
+/*
+NewMonitor creates a new monitor.
+*/
+func newMonitorBase(priority int, parent *monitorBase, context map[string]interface{}) *monitorBase {
+
+	var ret *monitorBase
+
+	if parent != nil {
+		ret = &monitorBase{newMonID(), parent, context, nil, priority, parent.rootMonitor, nil, false, false}
+	} else {
+		ret = &monitorBase{newMonID(), nil, context, nil, priority, nil, nil, false, false}
+	}
+
+	return ret
+}
+
+/*
+NewChildMonitor creates a new child monitor of this monitor.
+*/
+func (mb *monitorBase) NewChildMonitor(priority int) Monitor {
+	child := &ChildMonitor{newMonitorBase(priority, mb, mb.Context)}
+
+	mb.rootMonitor.descendantCreated(child)
+
+	return child
+}
+
+/*
+ID returns the monitor ID.
+*/
+func (mb *monitorBase) ID() uint64 {
+	return mb.id
+}
+
+/*
+RootMonitor returns the root monitor of this monitor.
+*/
+func (mb *monitorBase) RootMonitor() *RootMonitor {
+	return mb.rootMonitor
+}
+
+/*
+Scope returns the rule scope of this monitor.
+*/
+func (mb *monitorBase) Scope() *RuleScope {
+	return mb.rootMonitor.ruleScope
+}
+
+/*
+Priority returns the priority of this monitor.
+*/
+func (mb *monitorBase) Priority() int {
+	return mb.priority
+}
+
+/*
+IsActivated returns if this monitor has been activated.
+*/
+func (mb *monitorBase) IsActivated() bool {
+	return mb.activated
+}
+
+/*
+IsFinished returns if this monitor has finished.
+*/
+func (mb *monitorBase) IsFinished() bool {
+	return mb.finished
+}
+
+/*
+Activate activates this monitor.
+*/
+func (mb *monitorBase) Activate(e *Event) {
+	errorutil.AssertTrue(!mb.finished, "Cannot activate a finished monitor")
+	errorutil.AssertTrue(!mb.activated, "Cannot activate an active monitor")
+	errorutil.AssertTrue(e != nil, "Monitor can only be activated with an event")
+
+	mb.event = e
+	mb.rootMonitor.descendantActivated(mb.priority)
+	mb.activated = true
+}
+
+/*
+Skip finishes this monitor without activation.
+*/
+func (mb *monitorBase) Skip(e *Event) {
+	errorutil.AssertTrue(!mb.finished, "Cannot skip a finished monitor")
+	errorutil.AssertTrue(!mb.activated, "Cannot skip an active monitor")
+	mb.event = e
+	mb.activated = true
+	mb.finished = true
+}
+
+/*
+Finish finishes this monitor.
+*/
+func (mb *monitorBase) Finish() {
+	errorutil.AssertTrue(mb.activated, "Cannot finish a not active monitor")
+	errorutil.AssertTrue(!mb.finished, "Cannot finish a finished monitor")
+
+	mb.finished = true
+	mb.rootMonitor.descendantFinished(mb)
+}
+
+/*
+Errors returns the error object of this monitor.
+*/
+func (mb *monitorBase) Errors() *TaskError {
+	errorutil.AssertTrue(mb.finished, "Cannot get errors on an unfinished monitor")
+	return mb.Err
+}
+
+/*
+SetErrors adds an error object to this monitor.
+*/
+func (mb *monitorBase) SetErrors(e *TaskError) {
+	errorutil.AssertTrue(mb.finished, "Cannot set errors on an unfinished monitor")
+	mb.Err = e
+	mb.rootMonitor.descendantFailed(mb)
+}
+
+/*
+EventPath returns the chain of events which created this monitor.
+*/
+func (mb *monitorBase) EventPath() []*Event {
+	errorutil.AssertTrue(mb.finished, "Cannot get event path on an unfinished monitor")
+
+	path := []*Event{mb.event}
+
+	child := mb.Parent
+	for child != nil {
+		path = append(path, child.event)
+		child = child.Parent
+	}
+
+	// Reverse path
+
+	for i, j := 0, len(path)-1; i < j; i, j = i+1, j-1 {
+		path[i], path[j] = path[j], path[i]
+	}
+
+	return path
+}
+
+/*
+EventPathString returns the event path as a string.
+*/
+func (mb *monitorBase) EventPathString() string {
+	var buf bytes.Buffer
+
+	ep := mb.EventPath()
+	last := len(ep) - 1
+
+	for i, e := range mb.EventPath() {
+		buf.WriteString(e.name)
+		if i < last {
+			buf.WriteString(" -> ")
+		}
+	}
+
+	return buf.String()
+}
+
+/*
+String returns a string representation of this monitor.
+*/
+func (mb *monitorBase) String() string {
+	return fmt.Sprintf("Monitor %v (parent: %v priority: %v activated: %v finished: %v)",
+		mb.ID(), mb.Parent, mb.priority, mb.activated, mb.finished)
+}
+
+// Root Monitor
+// ============
+
+/*
+RootMonitor is a monitor which is at a beginning of an event cascade.
+*/
+type RootMonitor struct {
+	*monitorBase
+	lock         *sync.Mutex             // Lock for datastructures
+	incomplete   map[int]int             // Priority -> Counters of incomplete trackers
+	priorities   *sortutil.IntHeap       // List of handled priorities
+	ruleScope    *RuleScope              // Rule scope definitions
+	unfinished   int                     // Counter of all unfinished trackers
+	messageQueue *flowutil.EventPump     // Message passing queue of the processor
+	errors       map[uint64]*monitorBase // Monitors which got errors
+	finished     func(Processor)         // Finish handler (can be used externally)
+}
+
+/*
+NewRootMonitor creates a new root monitor.
+*/
+func newRootMonitor(context map[string]interface{}, scope *RuleScope,
+	messageQueue *flowutil.EventPump) *RootMonitor {
+
+	ret := &RootMonitor{newMonitorBase(0, nil, context), &sync.Mutex{},
+		make(map[int]int), &sortutil.IntHeap{}, scope, 1, messageQueue,
+		make(map[uint64]*monitorBase), nil}
+
+	// A root monitor is its own parent
+
+	ret.rootMonitor = ret
+
+	heap.Init(ret.priorities)
+
+	return ret
+}
+
+/*
+SetFinishHandler adds a handler function to this monitor which is called once
+this monitor has finished.
+*/
+func (rm *RootMonitor) SetFinishHandler(fh func(Processor)) {
+	rm.finished = fh
+}
+
+/*
+HighestPriority returns the highest priority which is handled by this monitor.
+*/
+func (rm *RootMonitor) HighestPriority() int {
+	rm.lock.Lock()
+	defer rm.lock.Unlock()
+
+	if len(*rm.priorities) > 0 {
+		return (*rm.priorities)[0]
+	}
+
+	return -1
+}
+
+/*
+AllErrors returns all error which have been collected in this root monitor.
+*/
+func (rm *RootMonitor) AllErrors() []*TaskError {
+	rm.lock.Lock()
+	defer rm.lock.Unlock()
+
+	ret := make([]*TaskError, 0, len(rm.errors))
+
+	// Sort by monitor id - this should give the corrent order timewise
+
+	var ids []uint64
+	for id := range rm.errors {
+		ids = append(ids, id)
+	}
+
+	sortutil.UInt64s(ids)
+
+	for _, id := range ids {
+		m := rm.errors[id]
+		ret = append(ret, m.Errors())
+	}
+
+	return ret
+}
+
+/*
+descendantCreated notifies this root monitor that a descendant has been created.
+*/
+func (rm *RootMonitor) descendantCreated(monitor Monitor) {
+	rm.lock.Lock()
+	defer rm.lock.Unlock()
+
+	rm.unfinished++
+}
+
+/*
+descendantActivated notifies this root monitor that a descendant has been activated.
+*/
+func (rm *RootMonitor) descendantActivated(priority int) {
+	rm.lock.Lock()
+	defer rm.lock.Unlock()
+
+	val, ok := rm.incomplete[priority]
+	if !ok {
+		val = 0
+		heap.Push(rm.priorities, priority)
+	}
+
+	rm.incomplete[priority] = val + 1
+}
+
+/*
+descendantFailed notifies this root monitor that a descendant has failed.
+*/
+func (rm *RootMonitor) descendantFailed(monitor *monitorBase) {
+	rm.lock.Lock()
+	defer rm.lock.Unlock()
+
+	rm.errors[monitor.ID()] = monitor
+}
+
+/*
+descendantFinished records that this monitor has finished. If it is the last
+active monitor in the event tree then send a notification.
+*/
+func (rm *RootMonitor) descendantFinished(m Monitor) {
+
+	rm.lock.Lock()
+
+	rm.unfinished--
+
+	finished := rm.unfinished == 0
+
+	if m.IsActivated() {
+		priority := m.Priority()
+
+		rm.incomplete[priority]--
+
+		if rm.incomplete[priority] == 0 {
+			rm.priorities.RemoveFirst(priority)
+			delete(rm.incomplete, priority)
+		}
+	}
+
+	rm.lock.Unlock()
+
+	// Post notification
+
+	if finished {
+		rm.messageQueue.PostEvent(MessageRootMonitorFinished, rm)
+	}
+}
+
+// Child Monitor
+// =============
+
+/*
+ChildMonitor is a monitor which is a descendant of a root monitor.
+*/
+type ChildMonitor struct {
+	*monitorBase
+}
+
+// Unique id creation
+// ==================
+
+var midcounter uint64 = 1
+var midcounterLock = &sync.Mutex{}
+
+/*
+newId returns a new unique id.
+*/
+func newMonID() uint64 {
+	midcounterLock.Lock()
+	defer midcounterLock.Unlock()
+
+	ret := midcounter
+	midcounter++
+
+	return ret
+}

+ 431 - 0
engine/processor.go

@@ -0,0 +1,431 @@
+/*
+ * ECAL
+ *
+ * Copyright 2020 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package engine
+
+import (
+	"fmt"
+	"sync"
+
+	"devt.de/krotik/common/flowutil"
+	"devt.de/krotik/common/pools"
+)
+
+/*
+Processor is the main object of the event engine. It coordinates the thread pool
+and rule index. Rules can only be added if the processor is stopped. Events
+can only be added if the processor is not stopped.
+*/
+type Processor interface {
+
+	/*
+	   ID returns the processor ID.
+	*/
+	ID() uint64
+
+	/*
+	   Workers returns the number of threads of this processor.
+	*/
+	Workers() int
+
+	/*
+	   Reset removes all stored rules from this processor.
+	*/
+	Reset() error
+
+	/*
+	   AddRule adds a new rule to the processor.
+	*/
+	AddRule(rule *Rule) error
+
+	/*
+	   Rules returns all loaded rules.
+	*/
+	Rules() map[string]*Rule
+
+	/*
+	   Start starts this processor.
+	*/
+	Start()
+
+	/*
+	   Finish will finish all remaining tasks and then stop the processor.
+	*/
+	Finish()
+
+	/*
+	   Stopped returns if the processor is stopped.
+	*/
+	Stopped() bool
+
+	/*
+	   Status returns the status of the processor (Running / Stopping / Stopped).
+	*/
+	Status() string
+
+	/*
+	   NewRootMonitor creates a new root monitor for this processor. This monitor is used to add initial
+	   root events.
+	*/
+	NewRootMonitor(context map[string]interface{}, scope *RuleScope) *RootMonitor
+
+	/*
+		SetRootMonitorErrorObserver specifies an observer which is triggered
+		when a root monitor of this processor has finished and returns errors.
+		By default this is set to nil (no observer).
+	*/
+	SetRootMonitorErrorObserver(func(rm *RootMonitor))
+
+	/*
+	   AddEvent adds a new event to the processor. Returns the monitor if the event
+	   triggered a rule and nil if the event was skipped.
+	*/
+	AddEvent(event *Event, parentMonitor Monitor) (Monitor, error)
+
+	/*
+	   IsTriggering checks if a given event triggers a loaded rule. This does not the
+	   actual state matching for speed.
+	*/
+	IsTriggering(event *Event) bool
+
+	/*
+	   ProcessEvent processes an event by determining which rules trigger and match
+	   the given event.
+	*/
+	ProcessEvent(event *Event, parent Monitor) map[string]error
+
+	/*
+	   String returns a string representation the processor.
+	*/
+	String() string
+}
+
+/*
+eventProcessor main implementation of the Processor interface.
+
+Event cycle:
+
+Process -> Triggering -> Matching -> Fire Rule
+
+*/
+type eventProcessor struct {
+	id                  uint64                // Processor ID
+	pool                *pools.ThreadPool     // Thread pool of this processor
+	workerCount         int                   // Number of threads for this processor
+	ruleIndex           RuleIndex             // Container for loaded rules
+	triggeringCache     map[string]bool       // Cache which remembers which events are triggering
+	triggeringCacheLock sync.Mutex            // Lock for triggeringg cache
+	messageQueue        *flowutil.EventPump   // Queue for message passing between components
+	rmErrorObserver     func(rm *RootMonitor) // Error observer for root monitors
+}
+
+/*
+NewProcessor creates a new event processor with a given number of workers.
+*/
+func NewProcessor(workerCount int) Processor {
+	ep := flowutil.NewEventPump()
+	return &eventProcessor{newProcID(), pools.NewThreadPoolWithQueue(NewTaskQueue(ep)),
+		workerCount, NewRuleIndex(), nil, sync.Mutex{}, ep, nil}
+}
+
+/*
+ID returns the processor ID.
+*/
+func (p *eventProcessor) ID() uint64 {
+	return p.id
+}
+
+/*
+Workers returns the number of threads of this processor.
+*/
+func (p *eventProcessor) Workers() int {
+	return p.workerCount
+}
+
+/*
+Reset removes all stored rules from this processor.
+*/
+func (p *eventProcessor) Reset() error {
+
+	// Check that the thread pool is stopped
+
+	if p.pool.Status() != pools.StatusStopped {
+		return fmt.Errorf("Cannot reset processor if it has not stopped")
+	}
+
+	// Invalidate triggering cache
+
+	p.triggeringCacheLock.Lock()
+	p.triggeringCache = nil
+	p.triggeringCacheLock.Unlock()
+
+	// Create a new rule index
+
+	p.ruleIndex = NewRuleIndex()
+
+	return nil
+}
+
+/*
+AddRule adds a new rule to the processor.
+*/
+func (p *eventProcessor) AddRule(rule *Rule) error {
+
+	// Check that the thread pool is stopped
+
+	if p.pool.Status() != pools.StatusStopped {
+		return fmt.Errorf("Cannot add rule if the processor has not stopped")
+	}
+
+	// Invalidate triggering cache
+
+	p.triggeringCacheLock.Lock()
+	p.triggeringCache = nil
+	p.triggeringCacheLock.Unlock()
+
+	return p.ruleIndex.AddRule(rule)
+}
+
+/*
+Rules returns all loaded rules.
+*/
+func (p *eventProcessor) Rules() map[string]*Rule {
+	return p.ruleIndex.Rules()
+}
+
+/*
+Start starts this processor.
+*/
+func (p *eventProcessor) Start() {
+	p.pool.SetWorkerCount(p.workerCount, false)
+}
+
+/*
+Finish will finish all remaining tasks and then stop the processor.
+*/
+func (p *eventProcessor) Finish() {
+	p.pool.JoinAll()
+}
+
+/*
+Stopped returns if the processor is stopped.
+*/
+func (p *eventProcessor) Stopped() bool {
+	return p.pool.Status() == pools.StatusStopped
+}
+
+/*
+Status returns the status of the processor (Running / Stopping / Stopped).
+*/
+func (p *eventProcessor) Status() string {
+	return p.pool.Status()
+}
+
+/*
+NewRootMonitor creates a new root monitor for this processor. This monitor is used to add initial
+root events.
+*/
+func (p *eventProcessor) NewRootMonitor(context map[string]interface{}, scope *RuleScope) *RootMonitor {
+
+	if scope == nil {
+		scope = NewRuleScope(map[string]bool{
+			"": true, // Default root monitor has global scope
+		})
+	}
+
+	return newRootMonitor(context, scope, p.messageQueue)
+}
+
+/*
+SetRootMonitorErrorObserver specifies an observer which is triggered
+when a root monitor of this processor has finished and returns errors.
+By default this is set to nil (no observer).
+*/
+func (p *eventProcessor) SetRootMonitorErrorObserver(rmErrorObserver func(rm *RootMonitor)) {
+	p.rmErrorObserver = rmErrorObserver
+}
+
+/*
+Notify the root monitor error observer that an error occurred.
+*/
+func (p *eventProcessor) notifyRootMonitorErrors(rm *RootMonitor) {
+	if p.rmErrorObserver != nil {
+		p.rmErrorObserver(rm)
+	}
+}
+
+/*
+AddEvent adds a new event to the processor. Returns the monitor if the event
+triggered a rule and nil if the event was skipped.
+*/
+func (p *eventProcessor) AddEvent(event *Event, parentMonitor Monitor) (Monitor, error) {
+
+	// Check that the thread pool is running
+
+	if p.pool.Status() == pools.StatusStopped {
+		return nil, fmt.Errorf("Cannot add event if the processor is not running")
+	}
+
+	EventTracer.record(event, "eventProcessor.AddEvent", "Event added to the processor")
+
+	// First check if the event is triggering any rules at all
+
+	if !p.IsTriggering(event) {
+
+		EventTracer.record(event, "eventProcessor.AddEvent", "Event was skipped")
+
+		if parentMonitor != nil {
+			parentMonitor.Skip(event)
+		}
+
+		return nil, nil
+	}
+
+	// Check if we need to construct a new root monitor
+
+	if parentMonitor == nil {
+		parentMonitor = p.NewRootMonitor(nil, nil)
+	}
+
+	if rootMonitor, ok := parentMonitor.(*RootMonitor); ok {
+		p.messageQueue.AddObserver(MessageRootMonitorFinished, rootMonitor,
+			func(event string, eventSource interface{}) {
+
+				// Call finish handler if there is one
+
+				if rm := eventSource.(*RootMonitor); rm.finished != nil {
+					rm.finished(p)
+				}
+
+				p.messageQueue.RemoveObservers(event, eventSource)
+			})
+	}
+
+	parentMonitor.Activate(event)
+
+	EventTracer.record(event, "eventProcessor.AddEvent", "Adding task to thread pool")
+
+	// Kick off event processing (see Processor.processEvent)
+
+	p.pool.AddTask(&Task{p, parentMonitor, event})
+
+	return parentMonitor, nil
+}
+
+/*
+IsTriggering checks if a given event triggers a loaded rule. This does not the
+actual state matching for speed.
+*/
+func (p *eventProcessor) IsTriggering(event *Event) bool {
+	var res, ok bool
+
+	p.triggeringCacheLock.Lock()
+	defer p.triggeringCacheLock.Unlock()
+
+	// Ensure the triggering cache exists
+
+	if p.triggeringCache == nil {
+		p.triggeringCache = make(map[string]bool)
+	}
+
+	name := event.Name()
+
+	if res, ok = p.triggeringCache[name]; !ok {
+		res = p.ruleIndex.IsTriggering(event)
+		p.triggeringCache[name] = res
+	}
+
+	return res
+}
+
+/*
+ProcessEvent processes an event by determining which rules trigger and match
+the given event.
+*/
+func (p *eventProcessor) ProcessEvent(event *Event, parent Monitor) map[string]error {
+	var rulesTriggering []*Rule
+	var rulesExecuting []*Rule
+
+	scope := parent.Scope()
+	ruleCandidates := p.ruleIndex.Match(event)
+	suppressedRules := make(map[string]bool)
+
+	EventTracer.record(event, "eventProcessor.ProcessEvent", "Processing event")
+
+	// Remove candidates which are out of scope
+
+	for _, ruleCandidate := range ruleCandidates {
+
+		if scope.IsAllowedAll(ruleCandidate.ScopeMatch) {
+			rulesTriggering = append(rulesTriggering, ruleCandidate)
+
+			// Build up a suppression list
+
+			for _, suppressedRule := range ruleCandidate.SuppressionList {
+				suppressedRules[suppressedRule] = true
+			}
+		}
+	}
+
+	// Remove suppressed rules
+
+	for _, ruleTriggers := range rulesTriggering {
+		if _, ok := suppressedRules[ruleTriggers.Name]; ok {
+			continue
+		}
+		rulesExecuting = append(rulesExecuting, ruleTriggers)
+	}
+
+	// Sort rules according to their priority (0 is the highest)
+
+	SortRuleSlice(rulesExecuting)
+
+	// Run rules which are not suppressed
+
+	errors := make(map[string]error)
+
+	EventTracer.record(event, "eventProcessor.ProcessEvent", "Running rules: ", rulesExecuting)
+
+	for _, rule := range rulesExecuting {
+		if err := rule.Action(p, parent, event); err != nil {
+			errors[rule.Name] = err
+		}
+	}
+
+	parent.Finish()
+
+	return errors
+}
+
+/*
+String returns a string representation the processor.
+*/
+func (p *eventProcessor) String() string {
+	return fmt.Sprintf("RumbleProcessor %v (workers:%v)", p.ID(), p.workerCount)
+}
+
+// Unique id creation
+// ==================
+
+var pidcounter uint64 = 1
+var pidcounterLock = &sync.Mutex{}
+
+/*
+newProcId returns a new unique id or processors.
+*/
+func newProcID() uint64 {
+	pidcounterLock.Lock()
+	defer pidcounterLock.Unlock()
+
+	ret := pidcounter
+	pidcounter++
+
+	return ret
+}

+ 742 - 0
engine/processor_test.go

@@ -0,0 +1,742 @@
+/*
+ * ECAL
+ *
+ * Copyright 2020 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package engine
+
+import (
+	"bytes"
+	"errors"
+	"fmt"
+	"os"
+	"sync"
+	"testing"
+	"time"
+
+	"devt.de/krotik/common/errorutil"
+	"devt.de/krotik/common/pools"
+)
+
+func TestProcessorSimpleCascade(t *testing.T) {
+	UnitTestResetIDs()
+
+	// Add debug logging
+
+	var debugBuffer bytes.Buffer
+
+	EventTracer.out = &debugBuffer
+	EventTracer.MonitorEvent("core.*", map[interface{}]interface{}{
+		"foo":  "bar",
+		"foo2": nil,
+	})
+	EventTracer.MonitorEvent("core.*", map[interface{}]interface{}{
+		"foo2": "test",
+	})
+	defer func() {
+		EventTracer.Reset()
+		EventTracer.out = os.Stdout
+	}()
+
+	// Do the normal testing
+
+	var log bytes.Buffer
+
+	proc := NewProcessor(1)
+
+	if res := fmt.Sprint(proc); res != "RumbleProcessor 1 (workers:1)" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	// Add rules to the processor
+
+	rule1 := &Rule{
+		"TestRule1",                            // Name
+		"",                                     // Description
+		[]string{"core.main.event1"},           // Kind match
+		[]string{"data"},                       // Match on event cascade scope
+		nil,                                    // No state match
+		2,                                      // Priority of the rule
+		[]string{"TestRule3", "TestRule3Copy"}, // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			log.WriteString("TestRule1\n")
+
+			// Add another event
+
+			p.AddEvent(&Event{
+				"InitialEvent",
+				[]string{"core", "main", "event2"},
+				map[interface{}]interface{}{
+					"foo":  "bar",
+					"foo2": "bla",
+				},
+			}, m.NewChildMonitor(1))
+
+			return nil
+		},
+	}
+
+	rule2 := &Rule{
+		"TestRule2",             // Name
+		"",                      // Description
+		[]string{"core.main.*"}, // Kind match
+		[]string{"data.read"},   // Match on event cascade scope
+		nil,                     // No state match
+		5,                       // Priority of the rule
+		nil,                     // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			log.WriteString("TestRule2\n")
+			return nil
+		},
+	}
+
+	rule3 := &Rule{
+		"TestRule3",             // Name
+		"",                      // Description
+		[]string{"core.main.*"}, // Kind match
+		[]string{"data.read"},   // Match on event cascade scope
+		nil,                     // No state match
+		0,                       // Priority of the rule
+		nil,                     // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			log.WriteString("TestRule3\n")
+			return nil
+		},
+	}
+
+	proc.AddRule(rule1)
+	proc.AddRule(rule2)
+	proc.AddRule(rule3)
+
+	if r := len(proc.Rules()); r != 3 {
+		t.Error("Unexpected rule number:", r)
+		return
+	}
+
+	// Start the processor
+	proc.Start()
+
+	// Push a root event
+
+	e := NewEvent(
+		"InitialEvent",
+		[]string{"core", "main", "event1"},
+		map[interface{}]interface{}{
+			"foo":  "bar",
+			"foo2": "bla",
+		},
+	)
+
+	if e.Name() != e.name || e.Kind() == nil || e.State() == nil {
+		t.Error("Unepxected getter result:", e)
+		return
+	}
+
+	rootm := proc.NewRootMonitor(nil, nil)
+	rootm.SetFinishHandler(func(p Processor) {
+		log.WriteString("finished!")
+	})
+	proc.AddEvent(e, rootm)
+
+	if err := proc.AddRule(rule3); err.Error() != "Cannot add rule if the processor has not stopped" {
+		t.Error("Unexpected error:", err)
+		return
+	}
+
+	if err := proc.Reset(); err.Error() != "Cannot reset processor if it has not stopped" {
+		t.Error("Unexpected error:", err)
+		return
+	}
+
+	// Finish the processor
+
+	proc.Finish()
+
+	// Finish the processor
+
+	// Rule 1, 2 and 3 trigger on event1 but rule 3 is suppressed by rule 1
+	// Rule 1 adds a new event which triggers only rule 2 and 3
+	// Rule 3 comes first since it has the higher priority
+
+	if log.String() != `TestRule1
+TestRule2
+TestRule3
+TestRule2
+finished!` {
+		t.Error("Unexpected result:", log.String())
+		return
+
+	}
+
+	log.Reset()
+
+	if err := proc.AddRule(rule3.CopyAs("TestRule3Copy")); err != nil {
+		t.Error("Unexpected error:", err)
+		return
+	}
+
+	// Start the processor
+
+	proc.Start()
+
+	// Push a root event
+
+	proc.AddEvent(&Event{
+		"InitialEvent",
+		[]string{"core", "main", "event1"},
+		nil,
+	}, nil)
+
+	// Finish the processor
+
+	proc.Finish()
+
+	if log.String() != `TestRule1
+TestRule2
+TestRule3
+TestRule3
+TestRule2
+` {
+		t.Error("Unexpected result:", log.String())
+		return
+	}
+
+	// Test the case when the event is pointless
+
+	log.Reset()
+
+	proc.Start()
+
+	proc.AddEvent(&Event{
+		"InitialEventFoo",
+		[]string{"core", "foo", "event1"},
+		nil,
+	}, nil)
+
+	rm := proc.NewRootMonitor(nil, nil)
+
+	proc.AddEvent(&Event{
+		"InitialEventFoo",
+		[]string{"core", "foo", "event1"},
+		nil,
+	}, rm)
+
+	if !rm.IsFinished() {
+		t.Error("Monitor which monitored a non-triggering event should still finished")
+		return
+	}
+
+	proc.Finish()
+
+	if log.String() != "" {
+		t.Error("Unexpected result:", log.String())
+		return
+	}
+
+	proc.Reset()
+
+	if r := len(proc.Rules()); r != 0 {
+		t.Error("Unexpected rule number:", r)
+		return
+	}
+
+	if debugBuffer.String() == "" {
+		t.Error("Nothing was recorded in the debug buffer")
+		return
+	}
+}
+
+func TestProcessorSimplePriorities(t *testing.T) {
+	UnitTestResetIDs()
+
+	var logLock = sync.Mutex{}
+	var log bytes.Buffer
+
+	testPriorities := func(p1, p2 int) int {
+
+		proc := NewProcessor(2)
+
+		// Add rules to the processor
+
+		rule1 := &Rule{
+			"TestRule1",                  // Name
+			"",                           // Description
+			[]string{"core.main.event1"}, // Kind match
+			[]string{"data"},             // Match on event cascade scope
+			nil,                          // No state match
+			0,                            // Priority of the rule
+			nil,                          // List of suppressed rules by this rule
+			func(p Processor, m Monitor, e *Event) error { // Action of the rule
+				logLock.Lock()
+				log.WriteString("TestRule1\n")
+				logLock.Unlock()
+				time.Sleep(2 * time.Millisecond)
+				return nil
+			},
+		}
+
+		rule2 := &Rule{
+			"TestRule2",                  // Name
+			"",                           // Description
+			[]string{"core.main.event2"}, // Kind match
+			[]string{"data"},             // Match on event cascade scope
+			nil,                          // No state match
+			0,                            // Priority of the rule
+			nil,                          // List of suppressed rules by this rule
+			func(p Processor, m Monitor, e *Event) error { // Action of the rule
+				logLock.Lock()
+				log.WriteString("TestRule2\n")
+				logLock.Unlock()
+				time.Sleep(2 * time.Millisecond)
+				return nil
+			},
+		}
+
+		proc.AddRule(rule1)
+		proc.AddRule(rule2)
+
+		proc.Start()
+
+		m := proc.NewRootMonitor(nil, nil)
+
+		// Push a root event
+
+		for i := 0; i < 3; i++ {
+			proc.AddEvent(&Event{
+				"InitialEvent1",
+				[]string{"core", "main", "event1"},
+				nil,
+			}, m.NewChildMonitor(p1))
+		}
+
+		proc.AddEvent(&Event{
+			"InitialEvent2",
+			[]string{"core", "main", "event2"},
+			nil,
+		}, m.NewChildMonitor(p2))
+
+		proc.AddEvent(&Event{
+			"InitialEvent1",
+			[]string{"core", "main", "event1"},
+			nil,
+		}, m.NewChildMonitor(p1))
+
+		hp := m.HighestPriority()
+
+		// Finish the processor
+
+		proc.Finish()
+
+		errorutil.AssertTrue(m.HighestPriority() == -1,
+			"Highest priority should be -1 once a monitor has finished")
+
+		return hp
+	}
+
+	// Since rule 1 has the higher priority it is more likely to be
+	// executed
+
+	if res := testPriorities(3, 5); res != 3 {
+		t.Error("Unexpected highest priority:", res)
+		return
+	}
+
+	if log.String() != `TestRule1
+TestRule1
+TestRule1
+TestRule1
+TestRule2
+` && log.String() != `TestRule1
+TestRule1
+TestRule1
+TestRule2
+TestRule1
+` {
+		t.Error("Unexpected result:", log.String())
+		return
+	}
+
+	log.Reset()
+
+	// Since rule 2 has the higher priority it is more likely to be
+	// executed
+
+	if res := testPriorities(5, 2); res != 2 {
+		t.Error("Unexpected highest priority:", res)
+		return
+	}
+
+	if log.String() != `TestRule2
+TestRule1
+TestRule1
+TestRule1
+TestRule1
+` && log.String() != `TestRule1
+TestRule2
+TestRule1
+TestRule1
+TestRule1
+` && log.String() != `TestRule1
+TestRule1
+TestRule2
+TestRule1
+TestRule1
+` {
+		t.Error("Unexpected result:", log.String())
+		return
+	}
+}
+
+func TestProcessorScopeHandling(t *testing.T) {
+	UnitTestResetIDs()
+
+	var logLock = sync.Mutex{}
+	var log bytes.Buffer
+
+	proc := NewProcessor(10)
+
+	// Add rules to the processor
+
+	rule1 := &Rule{
+		"TestRule1",             // Name
+		"",                      // Description
+		[]string{"core.main.*"}, // Kind match
+		[]string{"data.write"},  // Match on event cascade scope
+		nil,                     // No state match
+		0,                       // Priority of the rule
+		nil,                     // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			logLock.Lock()
+			log.WriteString("TestRule1\n")
+			logLock.Unlock()
+			time.Sleep(2 * time.Millisecond)
+			return nil
+		},
+	}
+
+	rule2 := &Rule{
+		"TestRule2",             // Name
+		"",                      // Description
+		[]string{"core.main.*"}, // Kind match
+		[]string{"data"},        // Match on event cascade scope
+		nil,                     // No state match
+		0,                       // Priority of the rule
+		nil,                     // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			logLock.Lock()
+			log.WriteString("TestRule2\n")
+			logLock.Unlock()
+			time.Sleep(2 * time.Millisecond)
+			return nil
+		},
+	}
+
+	proc.AddRule(rule1)
+	proc.AddRule(rule2)
+
+	if proc.Status() != pools.StatusStopped || !proc.Stopped() {
+		t.Error("Unexpected status:", proc.Status(), proc.Stopped())
+		return
+	}
+
+	proc.Start()
+
+	if proc.Status() != pools.StatusRunning || proc.Stopped() {
+		t.Error("Unexpected status:", proc.Status(), proc.Stopped())
+		return
+	}
+
+	scope1 := NewRuleScope(map[string]bool{
+		"data":       true,
+		"data.read":  true,
+		"data.write": false,
+	})
+
+	m := proc.NewRootMonitor(nil, scope1)
+
+	// Push a root event
+
+	proc.AddEvent(&Event{
+		"InitialEvent",
+		[]string{"core", "main", "event1"},
+		nil,
+	}, m)
+
+	// Finish the processor
+
+	proc.Finish()
+
+	// Only rule 2 should trigger since the monitor has only access
+	// to data and data.read
+
+	if log.String() != `TestRule2
+` {
+		t.Error("Unexpected result:", log.String())
+		return
+	}
+
+	log.Reset()
+
+	proc.Start()
+
+	scope2 := NewRuleScope(map[string]bool{
+		"data":       true,
+		"data.read":  true,
+		"data.write": true,
+	})
+
+	m = proc.NewRootMonitor(nil, scope2)
+
+	// Push a root event
+
+	proc.AddEvent(&Event{
+		"InitialEvent",
+		[]string{"core", "main", "event1"},
+		nil,
+	}, m)
+
+	// Finish the processor
+
+	proc.Finish()
+
+	// Now both rules should trigger
+
+	if log.String() != `TestRule1
+TestRule2
+` {
+		t.Error("Unexpected result:", log.String())
+		return
+	}
+}
+
+func TestProcessorStateMatching(t *testing.T) {
+	UnitTestResetIDs()
+
+	var logLock = sync.Mutex{}
+	var log bytes.Buffer
+
+	proc := NewProcessor(10)
+
+	if res := proc.Workers(); res != 10 {
+		t.Error("Unexpected number of workers:", res)
+		return
+	}
+
+	// Add rules to the processor
+
+	rule1 := &Rule{
+		"TestRule1",             // Name
+		"",                      // Description
+		[]string{"core.main.*"}, // Kind match
+		[]string{"data"},        // Match on event cascade scope
+		map[string]interface{}{"name": nil, "test": 1}, // Simple state match
+		0,   // Priority of the rule
+		nil, // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			logLock.Lock()
+			log.WriteString("TestRule1\n")
+			logLock.Unlock()
+			time.Sleep(2 * time.Millisecond)
+			return nil
+		},
+	}
+
+	rule2 := &Rule{
+		"TestRule2",             // Name
+		"",                      // Description
+		[]string{"core.main.*"}, // Kind match
+		[]string{"data"},        // Match on event cascade scope
+		map[string]interface{}{"name": nil, "test": "123"}, // Simple state match
+		0,   // Priority of the rule
+		nil, // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			logLock.Lock()
+			log.WriteString("TestRule2\n")
+			logLock.Unlock()
+			time.Sleep(2 * time.Millisecond)
+			return nil
+		},
+	}
+
+	proc.AddRule(rule1)
+	proc.AddRule(rule2)
+
+	proc.Start()
+
+	// Push a root event
+
+	proc.AddEvent(&Event{
+		"InitialEvent",
+		[]string{"core", "main", "event1"},
+		map[interface{}]interface{}{"name": "foo", "test": "123"},
+	}, nil)
+
+	proc.Finish()
+
+	if log.String() != `TestRule2
+` {
+		t.Error("Unexpected result:", log.String())
+		return
+	}
+
+	proc.Start()
+
+	proc.AddEvent(&Event{
+		"InitialEvent",
+		[]string{"core", "main", "event1"},
+		map[interface{}]interface{}{"name": nil, "test": 1, "foobar": 123},
+	}, nil)
+
+	proc.AddEvent(&Event{
+		"InitialEvent",
+		[]string{"core", "main", "event1"},
+		map[interface{}]interface{}{"name": "bar", "test": 1},
+	}, nil)
+
+	// The following rule should not trigger as it is missing name
+
+	proc.AddEvent(&Event{
+		"InitialEvent",
+		[]string{"core", "main", "event1"},
+		map[interface{}]interface{}{"foobar": nil, "test": "123"},
+	}, nil)
+
+	proc.Finish()
+
+	if log.String() != `TestRule2
+TestRule1
+TestRule1
+` {
+		t.Error("Unexpected result:", log.String())
+		return
+	}
+}
+
+func TestProcessorSimpleErrorHandling(t *testing.T) {
+	UnitTestResetIDs()
+
+	proc := NewProcessor(10)
+
+	// Add rules to the processor
+
+	rule1 := &Rule{
+		"TestRule1",                  // Name
+		"",                           // Description
+		[]string{"core.main.event1"}, // Kind match
+		[]string{"data"},             // Match on event cascade scope
+		nil,
+		0,   // Priority of the rule
+		nil, // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			p.AddEvent(&Event{
+				"event2",
+				[]string{"core", "main", "event2"},
+				nil,
+			}, m.NewChildMonitor(1))
+			return errors.New("testerror")
+		},
+	}
+
+	rule2 := &Rule{
+		"TestRule2",                  // Name
+		"",                           // Description
+		[]string{"core.main.event2"}, // Kind match
+		[]string{"data"},             // Match on event cascade scope
+		nil,
+		0,   // Priority of the rule
+		nil, // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			p.AddEvent(&Event{
+				"event3",
+				[]string{"core", "main", "event3"},
+				nil,
+			}, m.NewChildMonitor(1))
+			return nil
+		},
+	}
+
+	rule3 := &Rule{
+		"TestRule3", // Name
+		"",          // Description
+		[]string{"core.main.event3", "core.main.event1"}, // Kind match
+		[]string{"data"}, // Match on event cascade scope
+		nil,
+		0,   // Priority of the rule
+		nil, // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			return errors.New("testerror2")
+		},
+	}
+
+	// Add rule 1 twice
+
+	proc.AddRule(rule1)
+	proc.AddRule(rule1.CopyAs("TestRule1Copy"))
+
+	proc.AddRule(rule2)
+	proc.AddRule(rule3)
+
+	recordedErrors := 0
+	proc.SetRootMonitorErrorObserver(func(rm *RootMonitor) {
+		recordedErrors = len(rm.AllErrors()[0].ErrorMap)
+	})
+
+	proc.Start()
+
+	// Push a root event
+
+	mon, err := proc.AddEvent(&Event{
+		"InitialEvent",
+		[]string{"core", "main", "event1"},
+		map[interface{}]interface{}{"name": "foo", "test": "123"},
+	}, nil)
+
+	rmon, ok := mon.(*RootMonitor)
+	if !ok {
+		t.Error("Root monitor expected:", mon, err)
+		return
+	}
+
+	proc.Finish()
+
+	if fmt.Sprint(mon) != "Monitor 1 (parent: <nil> priority: 0 activated: true finished: true)" {
+		t.Error("Unexpected result:", mon)
+		return
+	}
+
+	_, err = proc.AddEvent(&Event{}, nil)
+	if err.Error() != "Cannot add event if the processor is not running" {
+		t.Error("Unexpected error", err)
+		return
+	}
+
+	// Two errors should have been collected
+
+	errs := rmon.AllErrors()
+
+	if len(errs) != 3 {
+		t.Error("Unexpected number of errors:", len(errs))
+		return
+	}
+
+	if recordedErrors != 3 {
+		t.Error("Unexpected number of recorded errors:", recordedErrors)
+		return
+	}
+
+	if fmt.Sprint(errs) != `[Taskerrors:
+InitialEvent -> TestRule1 : testerror
+InitialEvent -> TestRule1Copy : testerror
+InitialEvent -> TestRule3 : testerror2 Taskerror:
+InitialEvent -> event2 -> event3 -> TestRule3 : testerror2 Taskerror:
+InitialEvent -> event2 -> event3 -> TestRule3 : testerror2]` {
+		t.Error("Unexpected errors:", errs)
+		return
+	}
+}

+ 804 - 0
engine/rule.go

@@ -0,0 +1,804 @@
+/*
+ * ECAL
+ *
+ * Copyright 2020 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package engine
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"regexp"
+	"sort"
+	"strings"
+	"sync"
+
+	"devt.de/krotik/common/errorutil"
+	"devt.de/krotik/common/sortutil"
+)
+
+/*
+Rule models a matching rule for event receivers (actions). A rule has 3 possible
+matching criteria:
+
+- Match on event kinds: A list of strings in dot notation which describes event kinds. May
+contain '*' characters as wildcards (e.g. core.tests.*).
+
+- Match on event cascade scope: A list of strings in dot notation which describe the
+required scopes of an event cascade.
+
+- Match on event state: A simple list of required key / value states in the event
+state. Nil values can be used as wildcards (i.e. match is only on key).
+
+Rules have priorities (0 being the highest) and may suppress each other.
+*/
+type Rule struct {
+	Name            string                 // Name of the rule
+	Desc            string                 // Description of the rule (optional)
+	KindMatch       []string               // Match on event kinds
+	ScopeMatch      []string               // Match on event cascade scope
+	StateMatch      map[string]interface{} // Match on event state
+	Priority        int                    // Priority of the rule
+	SuppressionList []string               // List of suppressed rules by this rule
+	Action          RuleAction             // Action of the rule
+}
+
+/*
+CopyAs returns a shallow copy of this rule with a new name.
+*/
+func (r *Rule) CopyAs(newName string) *Rule {
+	return &Rule{
+		Name:            newName,
+		Desc:            r.Desc,
+		KindMatch:       r.KindMatch,
+		ScopeMatch:      r.ScopeMatch,
+		StateMatch:      r.StateMatch,
+		Priority:        r.Priority,
+		SuppressionList: r.SuppressionList,
+		Action:          r.Action,
+	}
+}
+
+func (r *Rule) String() string {
+	sm, _ := json.Marshal(r.StateMatch)
+	return fmt.Sprintf("Rule:%s [%s] (Priority:%v Kind:%v Scope:%v StateMatch:%s Suppress:%v)",
+		r.Name, strings.TrimSpace(r.Desc), r.Priority, r.KindMatch, r.ScopeMatch, sm, r.SuppressionList)
+}
+
+/*
+RuleAction is an action which is executed by a matching rule.
+*/
+type RuleAction func(p Processor, m Monitor, e *Event) error
+
+/*
+RuleIndex is an index for rules. It takes the form of a tree structure in which
+incoming events are matched level by level (e.g. event of kind core.task1.step1
+is first matched by kind "core" then "task1" and then "step1". At the leaf of
+the index tree it may then be matched on a state condition).
+*/
+type RuleIndex interface {
+
+	/*
+	   AddRule adds a new rule to the index.
+	*/
+	AddRule(rule *Rule) error
+
+	/*
+	   IsTriggering checks if a given event triggers a rule in this index.
+	*/
+
+	IsTriggering(event *Event) bool
+
+	/*
+		Match returns all rules in this index which match a given event. This
+		method does a full matching check including state matching.
+	*/
+	Match(event *Event) []*Rule
+
+	/*
+		String returns a string representation of this rule index and all subindexes.
+	*/
+	String() string
+
+	/*
+		Rules returns all rules with the given prefix in the name. Use the empty
+		string to return all rules.
+	*/
+	Rules() map[string]*Rule
+}
+
+/*
+ruleSubIndex is a sub index used by a rule index.
+*/
+type ruleSubIndex interface {
+
+	/*
+		type returns the type of the rule sub index.
+	*/
+	Type() string
+
+	/*
+		addRuleAtLevel adds a new rule to the index at a specific level. The
+		level is described by a part of the rule kind match.
+	*/
+	addRuleAtLevel(rule *Rule, kindMatchLevel []string)
+
+	/*
+		isTriggeringAtLevel checks if a given event triggers a rule at the given
+		level of the index.
+	*/
+	isTriggeringAtLevel(event *Event, level int) bool
+
+	/*
+		matchAtLevel returns all rules in this index which match a given event
+		at the given level. This method does a full matching check including
+		state matching.
+	*/
+	matchAtLevel(event *Event, level int) []*Rule
+
+	/*
+		stringIndent returns a string representation with a given indentation of this
+		rule index and all subindexes.
+	*/
+	stringIndent(indent string) string
+}
+
+/*
+ruleIndexRoot models the index root node.
+*/
+type ruleIndexRoot struct {
+	*RuleIndexKind
+	rules map[string]*Rule
+}
+
+/*
+   AddRule adds a new rule to the index.
+*/
+func (r *ruleIndexRoot) AddRule(rule *Rule) error {
+
+	if _, ok := r.rules[rule.Name]; ok {
+		return fmt.Errorf("Cannot add rule %v twice", rule.Name)
+	}
+
+	r.rules[rule.Name] = rule
+
+	return r.RuleIndexKind.AddRule(rule)
+}
+
+/*
+Rules returns all rules with the given prefix in the name. Use the empty
+string to return all rules.
+*/
+func (r *ruleIndexRoot) Rules() map[string]*Rule {
+	return r.rules
+}
+
+/*
+NewRuleIndex creates a new rule container for efficient event matching.
+*/
+func NewRuleIndex() RuleIndex {
+	return &ruleIndexRoot{newRuleIndexKind(), make(map[string]*Rule)}
+}
+
+/*
+Rule index types
+*/
+const (
+	typeRuleIndexKind  = "RuleIndexKind"
+	typeRuleIndexState = "RuleIndexState"
+	typeRuleIndexAll   = "RuleIndexAll"
+)
+
+// Rule Index Kind
+// ===============
+
+/*
+RuleIndexKind data structure.
+*/
+type RuleIndexKind struct {
+	id              uint64                    // Id of this rule index
+	kindAllMatch    []ruleSubIndex            // Rules with target all events of a specific category
+	kindSingleMatch map[string][]ruleSubIndex // Rules which target specific event kinds
+	count           int                       // Number of loaded rules
+}
+
+/*
+newRuleIndexKind creates a new rule index matching on event kind.
+*/
+func newRuleIndexKind() *RuleIndexKind {
+	return &RuleIndexKind{
+		newRuleIndexID(),
+		make([]ruleSubIndex, 0),
+		make(map[string][]ruleSubIndex),
+		0,
+	}
+}
+
+/*
+Type returns the type of the rule sub index.
+*/
+func (ri *RuleIndexKind) Type() string {
+	return typeRuleIndexKind
+}
+
+/*
+AddRule adds a new rule to the index.
+*/
+func (ri *RuleIndexKind) AddRule(rule *Rule) error {
+
+	// Check essential rule attributes
+
+	if rule.KindMatch == nil || len(rule.KindMatch) == 0 {
+		return fmt.Errorf("Cannot add rule without a kind match: %v", rule.Name)
+	} else if rule.ScopeMatch == nil {
+		return fmt.Errorf("Cannot add rule without a scope match: %v", rule.Name)
+	}
+
+	// Add rule to the index for all kind matches
+
+	for _, kindMatch := range rule.KindMatch {
+		ri.addRuleAtLevel(rule, strings.Split(kindMatch, RuleKindSeparator))
+		ri.count++
+	}
+
+	return nil
+}
+
+/*
+addRuleAtLevel adds a new rule to the index at a specific level. The
+level is described by a part of the rule kind match.
+*/
+func (ri *RuleIndexKind) addRuleAtLevel(rule *Rule, kindMatchLevel []string) {
+	var indexType string
+	var index ruleSubIndex
+	var ruleSubIndexList []ruleSubIndex
+	var ok bool
+
+	// Pick the right index type
+
+	if len(kindMatchLevel) == 1 {
+		if rule.StateMatch != nil {
+			indexType = typeRuleIndexState
+		} else {
+			indexType = typeRuleIndexAll
+		}
+	} else {
+		indexType = typeRuleIndexKind
+	}
+
+	// Get (create when necessary) a sub index of a specific type for the
+	// match item of this level
+
+	matchItem := kindMatchLevel[0]
+
+	// Select the correct ruleSubIndexList
+
+	if matchItem == RuleKindWildcard {
+		ruleSubIndexList = ri.kindAllMatch
+	} else {
+		if ruleSubIndexList, ok = ri.kindSingleMatch[matchItem]; !ok {
+			ruleSubIndexList = make([]ruleSubIndex, 0)
+			ri.kindSingleMatch[matchItem] = ruleSubIndexList
+		}
+	}
+
+	// Check if the required index is already existing
+
+	for _, item := range ruleSubIndexList {
+		if item.Type() == indexType {
+			index = item
+			break
+		}
+	}
+
+	// Create a new index if no index was found
+
+	if index == nil {
+		switch indexType {
+		case typeRuleIndexState:
+			index = newRuleIndexState()
+		case typeRuleIndexAll:
+			index = newRuleIndexAll()
+		case typeRuleIndexKind:
+			index = newRuleIndexKind()
+		}
+
+		// Add the new index to the correct list
+
+		if matchItem == RuleKindWildcard {
+			ri.kindAllMatch = append(ruleSubIndexList, index)
+		} else {
+			ri.kindSingleMatch[matchItem] = append(ruleSubIndexList, index)
+		}
+	}
+
+	// Recurse into the next level
+
+	index.addRuleAtLevel(rule, kindMatchLevel[1:])
+}
+
+/*
+IsTriggering checks if a given event triggers a rule in this index.
+*/
+func (ri *RuleIndexKind) IsTriggering(event *Event) bool {
+	return ri.isTriggeringAtLevel(event, 0)
+}
+
+/*
+isTriggeringAtLevel checks if a given event triggers a rule at the given
+level of the index.
+*/
+func (ri *RuleIndexKind) isTriggeringAtLevel(event *Event, level int) bool {
+
+	// Check if the event kind is too general (e.g. rule is defined as a.b.c
+	// and the event kind is a.b)
+
+	if len(event.kind) <= level {
+		return false
+	}
+
+	levelKind := event.kind[level]
+	nextLevel := level + 1
+
+	// Check rules targeting all events
+
+	for _, index := range ri.kindAllMatch {
+		if index.isTriggeringAtLevel(event, nextLevel) {
+			return true
+		}
+	}
+
+	// Check rules targeting specific events
+
+	if ruleSubIndexList, ok := ri.kindSingleMatch[levelKind]; ok {
+		for _, index := range ruleSubIndexList {
+			if index.isTriggeringAtLevel(event, nextLevel) {
+				return true
+			}
+		}
+	}
+
+	return false
+}
+
+/*
+Match returns all rules in this index which match a given event. This method
+does a full matching check including state matching.
+*/
+func (ri *RuleIndexKind) Match(event *Event) []*Rule {
+	return ri.matchAtLevel(event, 0)
+}
+
+/*
+matchAtLevel returns all rules in this index which match a given event
+at the given level. This method does a full matching check including
+state matching.
+*/
+func (ri *RuleIndexKind) matchAtLevel(event *Event, level int) []*Rule {
+
+	// Check if the event kind is too general (e.g. rule is defined as a.b.c
+	// and the event kind is a.b)
+
+	if len(event.kind) <= level {
+		return nil
+	}
+
+	var ret []*Rule
+
+	levelKind := event.kind[level]
+	nextLevel := level + 1
+
+	// Check rules targeting all events
+
+	for _, index := range ri.kindAllMatch {
+		ret = append(ret, index.matchAtLevel(event, nextLevel)...)
+	}
+
+	// Check rules targeting specific events
+
+	if ruleSubIndexList, ok := ri.kindSingleMatch[levelKind]; ok {
+		for _, index := range ruleSubIndexList {
+			ret = append(ret, index.matchAtLevel(event, nextLevel)...)
+		}
+	}
+
+	return ret
+}
+
+/*
+String returns a string representation of this rule index and all subindexes.
+*/
+func (ri *RuleIndexKind) String() string {
+	return ri.stringIndent("")
+}
+
+/*
+stringIndent returns a string representation with a given indentation of this
+rule index and all subindexes.
+*/
+func (ri *RuleIndexKind) stringIndent(indent string) string {
+	var buf bytes.Buffer
+
+	newIndent := indent + "  "
+
+	writeIndexList := func(name string, indexList []ruleSubIndex) {
+		if len(indexList) > 0 {
+
+			buf.WriteString(fmt.Sprint(indent, name))
+			buf.WriteString(fmt.Sprintf(" - %v (%v)\n", ri.Type(), ri.id))
+
+			for _, index := range indexList {
+				buf.WriteString(index.stringIndent(newIndent))
+			}
+		}
+	}
+
+	writeIndexList("*", ri.kindAllMatch)
+
+	var keys []string
+	for k := range ri.kindSingleMatch {
+		keys = append(keys, k)
+	}
+
+	sort.Strings(keys)
+
+	for _, key := range keys {
+		indexList := ri.kindSingleMatch[key]
+		writeIndexList(key, indexList)
+	}
+
+	return buf.String()
+}
+
+// Rule Index State
+// ================
+
+/*
+RuleMatcherKey is used for pure key - value state matches.
+*/
+type RuleMatcherKey struct {
+	bits        uint64
+	bitsAny     uint64
+	bitsValue   map[interface{}]uint64
+	bitsRegexes map[uint64]*regexp.Regexp
+}
+
+/*
+addRule adds a new rule to this key matcher.
+*/
+func (rm *RuleMatcherKey) addRule(num uint, bit uint64, key string, value interface{}) {
+
+	// Register rule bit
+
+	rm.bits |= bit
+
+	if value == nil {
+		rm.bitsAny |= bit
+
+	} else if regex, ok := value.(*regexp.Regexp); ok {
+
+		// For regex match we add a bit to the any mask so the presence of
+		// the key is checked before the actual regex is checked
+
+		rm.bitsAny |= bit
+		rm.bitsRegexes[bit] = regex
+
+	} else {
+		rm.bitsValue[value] |= bit
+	}
+}
+
+/*
+match adds matching rules to a given bit mask.
+*/
+func (rm *RuleMatcherKey) match(bits uint64, value interface{}) uint64 {
+	toRemove := rm.bitsAny ^ rm.bits
+
+	if value != nil {
+		if additionalBits, ok := rm.bitsValue[value]; ok {
+			toRemove = rm.bitsAny | additionalBits ^ rm.bits
+		}
+	}
+
+	keyMatchedBits := bits ^ (bits & toRemove)
+
+	for bm, r := range rm.bitsRegexes {
+
+		if keyMatchedBits&bm > 0 && !r.MatchString(fmt.Sprint(value)) {
+
+			// Regex does not match remove the bit
+
+			keyMatchedBits ^= keyMatchedBits & bm
+		}
+	}
+
+	return keyMatchedBits
+}
+
+/*
+unmatch removes all registered rules in this
+*/
+func (rm *RuleMatcherKey) unmatch(bits uint64) uint64 {
+	return bits ^ (bits & rm.bits)
+}
+
+/*
+String returns a string representation of this key matcher.
+*/
+func (rm *RuleMatcherKey) String() string {
+	var buf bytes.Buffer
+
+	buf.WriteString(fmt.Sprintf("%08X *:%08X", rm.bits, rm.bitsAny))
+
+	buf.WriteString(" [")
+
+	var keys []interface{}
+	for k := range rm.bitsValue {
+		keys = append(keys, k)
+	}
+
+	sortutil.InterfaceStrings(keys)
+
+	for _, k := range keys {
+		m := rm.bitsValue[k]
+		buf.WriteString(fmt.Sprintf("%v:%08X ", k, m))
+	}
+
+	buf.WriteString("] [")
+
+	var rkeys []uint64
+	for k := range rm.bitsRegexes {
+		rkeys = append(rkeys, k)
+	}
+
+	sortutil.UInt64s(rkeys)
+
+	for _, k := range rkeys {
+		r := rm.bitsRegexes[k]
+		buf.WriteString(fmt.Sprintf("%08X:%v ", k, r))
+	}
+
+	buf.WriteString("]")
+
+	return buf.String()
+}
+
+/*
+RuleIndexState data structure
+*/
+type RuleIndexState struct {
+	id     uint64                     // Id of this rule index
+	rules  []*Rule                    // All rules stored in this index
+	keyMap map[string]*RuleMatcherKey // Map of keys (key or key and value) to KeyMatcher
+}
+
+/*
+newRuleIndexState creates a new rule index matching on event state.
+*/
+func newRuleIndexState() *RuleIndexState {
+	return &RuleIndexState{newRuleIndexID(), make([]*Rule, 0),
+		make(map[string]*RuleMatcherKey)}
+}
+
+/*
+Type returns the type of the rule sub index.
+*/
+func (ri *RuleIndexState) Type() string {
+	return typeRuleIndexState
+}
+
+/*
+addRuleAtLevel adds a new rule to the index at a specific level. The
+level is described by a part of the rule kind match.
+*/
+func (ri *RuleIndexState) addRuleAtLevel(rule *Rule, kindMatchLevel []string) {
+	errorutil.AssertTrue(len(kindMatchLevel) == 0,
+		fmt.Sprint("RuleIndexState must be a leaf - level is:", kindMatchLevel))
+
+	num := uint(len(ri.rules))
+	var bit uint64 = 1 << num
+
+	ri.rules = append(ri.rules, rule)
+
+	for k, v := range rule.StateMatch {
+		var ok bool
+		var keyMatcher *RuleMatcherKey
+
+		if keyMatcher, ok = ri.keyMap[k]; !ok {
+			keyMatcher = &RuleMatcherKey{0, 0, make(map[interface{}]uint64), make(map[uint64]*regexp.Regexp)}
+			ri.keyMap[k] = keyMatcher
+		}
+
+		keyMatcher.addRule(num, bit, k, v)
+	}
+}
+
+/*
+isTriggeringAtLevel checks if a given event triggers a rule at the given
+level of the index.
+*/
+func (ri *RuleIndexState) isTriggeringAtLevel(event *Event, level int) bool {
+	return len(event.kind) == level
+}
+
+/*
+matchAtLevel returns all rules in this index which match a given event
+at the given level. This method does a full matching check including
+state matching.
+*/
+func (ri *RuleIndexState) matchAtLevel(event *Event, level int) []*Rule {
+	if len(event.kind) != level {
+		return nil
+	}
+
+	// Assume all rules match and remove the ones with don't
+
+	var matchBits uint64 = (1 << uint(len(ri.rules))) - 1
+
+	// Match key and values
+
+	for key, matcher := range ri.keyMap {
+		if val, ok := event.state[key]; ok {
+
+			// Key is present in event
+
+			matchBits = matcher.match(matchBits, val)
+
+		} else {
+
+			// Key is not present in event - remove all rules which require the key
+
+			matchBits = matcher.unmatch(matchBits)
+		}
+
+		if matchBits == 0 {
+
+			// All rules have been excluded
+
+			return nil
+		}
+	}
+
+	var ret []*Rule
+	var collectionBits uint64 = 1
+
+	// Collect matched rules
+
+	for i := 0; collectionBits <= matchBits; i++ {
+		if matchBits&collectionBits > 0 {
+			ret = append(ret, ri.rules[i])
+		}
+
+		collectionBits <<= 1
+	}
+
+	return ret
+}
+
+/*
+stringIndent returns a string representation with a given indentation of this
+rule index and all subindexes.
+*/
+func (ri *RuleIndexState) stringIndent(indent string) string {
+	var buf bytes.Buffer
+
+	buf.WriteString(fmt.Sprintf("%v%v (%v) ", indent, ri.Type(), ri.id))
+	buf.WriteString("[")
+	for _, r := range ri.rules {
+		buf.WriteString(fmt.Sprintf("%v ", r.Name))
+	}
+	buf.WriteString("]\n")
+
+	newIndent := indent + "  "
+
+	var keys []string
+	for k := range ri.keyMap {
+		keys = append(keys, k)
+	}
+
+	sort.Strings(keys)
+
+	for _, k := range keys {
+		m := ri.keyMap[k]
+		buf.WriteString(fmt.Sprintf("%v%v - %v\n", newIndent, k, m))
+	}
+
+	return buf.String()
+}
+
+// Rule Index All
+// ==============
+
+/*
+RuleIndexAll data structure.
+*/
+type RuleIndexAll struct {
+	id    uint64  // Id of this rule index
+	rules []*Rule // Rules with target all events of a specific category
+}
+
+/*
+newRuleIndexAll creates a new leaf rule index matching on all events.
+*/
+func newRuleIndexAll() *RuleIndexAll {
+	return &RuleIndexAll{newRuleIndexID(), make([]*Rule, 0)}
+}
+
+/*
+Type returns the type of the rule sub index.
+*/
+func (ri *RuleIndexAll) Type() string {
+	return typeRuleIndexAll
+}
+
+/*
+addRuleAtLevel adds a new rule to the index at a specific level. The
+level is described by a part of the rule kind match.
+*/
+func (ri *RuleIndexAll) addRuleAtLevel(rule *Rule, kindMatchLevel []string) {
+	ri.rules = append(ri.rules, rule)
+}
+
+/*
+isTriggeringAtLevel checks if a given event triggers a rule at the given
+level of the index.
+*/
+func (ri *RuleIndexAll) isTriggeringAtLevel(event *Event, level int) bool {
+	return len(event.kind) == level
+}
+
+/*
+matchAtLevel returns all rules in this index which match a given event
+at the given level. This method does a full matching check including
+state matching.
+*/
+func (ri *RuleIndexAll) matchAtLevel(event *Event, level int) []*Rule {
+	if len(event.kind) != level {
+		return nil
+	}
+
+	return ri.rules
+}
+
+/*
+stringIndent returns a string representation with a given indentation of this
+rule index and all subindexes.
+*/
+func (ri *RuleIndexAll) stringIndent(indent string) string {
+	var buf bytes.Buffer
+
+	buf.WriteString(fmt.Sprintf("%v%v (%v)\n", indent, ri.Type(), ri.id))
+
+	newIndent := indent + "  "
+
+	for _, rule := range ri.rules {
+		buf.WriteString(fmt.Sprintf("%v%v\n", newIndent, rule))
+	}
+
+	return buf.String()
+}
+
+// Unique id creation
+// ==================
+
+var ruleindexidcounter uint64 = 1
+var ruleindexidcounterLock = &sync.Mutex{}
+
+/*
+newId returns a new unique id.
+*/
+func newRuleIndexID() uint64 {
+	ruleindexidcounterLock.Lock()
+	defer ruleindexidcounterLock.Unlock()
+
+	ret := ruleindexidcounter
+	ruleindexidcounter++
+
+	return ret
+}

+ 509 - 0
engine/rule_test.go

@@ -0,0 +1,509 @@
+/*
+ * ECAL
+ *
+ * Copyright 2020 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package engine
+
+import (
+	"fmt"
+	"regexp"
+	"sort"
+	"testing"
+)
+
+func TestRuleIndexSimple(t *testing.T) {
+	ruleindexidcounter = 0
+	defer func() {
+		ruleindexidcounter = 0
+	}()
+
+	// Store a simple rule
+
+	rule := &Rule{
+		"TestRule", // Name
+		"",         // Description
+		[]string{"core.main.tester", "core.tmp.*"}, // Kind match
+		[]string{"data.read", "data.test"},         // Match on event cascade scope
+		nil,
+		0,                      // Priority of the rule
+		[]string{"TestRule66"}, // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			return nil
+		},
+	}
+
+	index := NewRuleIndex()
+
+	index.AddRule(rule)
+
+	// Check error cases
+
+	err := index.AddRule(&Rule{
+		"TestRuleError",              // Name
+		"",                           // Description
+		[]string{"core.main.tester"}, // Kind match
+		nil,
+		nil,
+		0,                      // Priority of the rule
+		[]string{"TestRule66"}, // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			return nil
+		},
+	})
+	if err.Error() != "Cannot add rule without a scope match: TestRuleError" {
+		t.Error("Unexpected result:", err)
+		return
+	}
+
+	err = index.AddRule(&Rule{
+		"TestRuleError2",                   // Name
+		"",                                 // Description
+		nil,                                // Kind match
+		[]string{"data.read", "data.test"}, // Match on event cascade scope
+		nil,
+		0,                      // Priority of the rule
+		[]string{"TestRule66"}, // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			return nil
+		},
+	})
+	if err.Error() != "Cannot add rule without a kind match: TestRuleError2" {
+		t.Error("Unexpected result:", err)
+		return
+	}
+
+	// Check index layout
+
+	if res := index.String(); res != `
+core - RuleIndexKind (0)
+  main - RuleIndexKind (1)
+    tester - RuleIndexKind (2)
+      RuleIndexAll (3)
+        Rule:TestRule [] (Priority:0 Kind:[core.main.tester core.tmp.*] Scope:[data.read data.test] StateMatch:null Suppress:[TestRule66])
+  tmp - RuleIndexKind (1)
+    * - RuleIndexKind (4)
+      RuleIndexAll (5)
+        Rule:TestRule [] (Priority:0 Kind:[core.main.tester core.tmp.*] Scope:[data.read data.test] StateMatch:null Suppress:[TestRule66])
+`[1:] {
+		t.Error("Unexpected index layout:", res)
+		return
+	}
+
+	// Check trigger queries
+
+	if !index.IsTriggering(&Event{
+		"bla",
+		[]string{"core", "tmp", "bla"},
+		nil,
+	}) {
+		t.Error("Unexpected result")
+		return
+	}
+
+	if index.IsTriggering(&Event{
+		"bla",
+		[]string{"core", "tmp"},
+		nil,
+	}) {
+		t.Error("Unexpected result")
+		return
+	}
+
+	if index.IsTriggering(&Event{
+		"bla",
+		[]string{"core", "tmpp", "bla"},
+		nil,
+	}) {
+		t.Error("Unexpected result")
+		return
+	}
+
+	if !index.IsTriggering(&Event{
+		"bla",
+		[]string{"core", "main", "tester"},
+		nil,
+	}) {
+		t.Error("Unexpected result")
+		return
+	}
+
+	if index.IsTriggering(&Event{
+		"bla",
+		[]string{"core", "main", "tester", "bla"},
+		nil,
+	}) {
+		t.Error("Unexpected result")
+		return
+	}
+
+	if index.IsTriggering(&Event{
+		"bla",
+		[]string{"core", "main", "teste"},
+		nil,
+	}) {
+		t.Error("Unexpected result")
+		return
+	}
+
+	if index.IsTriggering(&Event{
+		"bla",
+		[]string{"core", "main"},
+		nil,
+	}) {
+		t.Error("Unexpected result")
+		return
+	}
+
+	// Event matching
+
+	if res := index.Match(&Event{
+		"bla",
+		[]string{"core", "main", "tester"},
+		nil,
+	}); printRules(res) != "[TestRule]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := index.Match(&Event{
+		"bla",
+		[]string{"core", "tmp", "x"},
+		nil,
+	}); printRules(res) != "[TestRule]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := index.Match(&Event{
+		"bla",
+		[]string{"core", "tmp"},
+		nil,
+	}); printRules(res) != "[]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := index.Match(&Event{
+		"bla",
+		[]string{"core", "tmp", "x", "y"},
+		nil,
+	}); printRules(res) != "[]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+}
+
+func TestRuleIndexStateMatch(t *testing.T) {
+	ruleindexidcounter = 0
+	defer func() {
+		ruleindexidcounter = 0
+	}()
+
+	rule1 := &Rule{
+		"TestRule1", // Name
+		"",          // Description
+		[]string{"core.main.tester", "core.tmp.*"}, // Kind match
+		[]string{"data.read", "data.test"},         // Match on event cascade scope
+		map[string]interface{}{ // Match on event state
+			"name": nil,
+			"test": "val1",
+		},
+		0,                      // Priority of the rule
+		[]string{"TestRule66"}, // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			return nil
+		},
+	}
+
+	rule2 := &Rule{
+		"TestRule2",                  // Name
+		"",                           // Description
+		[]string{"core.main.tester"}, // Kind match
+		[]string{"data.read"},        // Match on event cascade scope
+		map[string]interface{}{ // Match on event state
+			"name":  nil,
+			"test":  "val2",
+			"test2": 42,
+		},
+		0,                      // Priority of the rule
+		[]string{"TestRule66"}, // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			return nil
+		},
+	}
+
+	rule3 := &Rule{
+		"TestRule3",                  // Name
+		"",                           // Description
+		[]string{"core.main.tester"}, // Kind match
+		[]string{"data.read"},        // Match on event cascade scope
+		map[string]interface{}{ // Match on event state
+			"name":  nil,
+			"test":  "val2",
+			"test2": 42,
+			"test3": 15,
+		},
+		0,                      // Priority of the rule
+		[]string{"TestRule66"}, // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			return nil
+		},
+	}
+
+	index := NewRuleIndex()
+
+	index.AddRule(rule1)
+	index.AddRule(rule2)
+	index.AddRule(rule3)
+
+	if err := index.AddRule(rule3); err.Error() != "Cannot add rule TestRule3 twice" {
+		t.Error("Unexpected result:", err)
+		return
+	}
+
+	if len(index.Rules()) != 3 {
+		t.Error("Unexpected number of rules:", len(index.Rules()))
+	}
+
+	// Check index layout
+
+	if res := index.String(); res != `
+core - RuleIndexKind (0)
+  main - RuleIndexKind (1)
+    tester - RuleIndexKind (2)
+      RuleIndexState (3) [TestRule1 TestRule2 TestRule3 ]
+        name - 00000007 *:00000007 [] []
+        test - 00000007 *:00000000 [val1:00000001 val2:00000006 ] []
+        test2 - 00000006 *:00000000 [42:00000006 ] []
+        test3 - 00000004 *:00000000 [15:00000004 ] []
+  tmp - RuleIndexKind (1)
+    * - RuleIndexKind (4)
+      RuleIndexState (5) [TestRule1 ]
+        name - 00000001 *:00000001 [] []
+        test - 00000001 *:00000000 [val1:00000001 ] []
+`[1:] {
+		t.Error("Unexpected index layout:", res)
+		return
+	}
+
+	// Make sure events without state do not match
+
+	if res := index.Match(&Event{
+		"bla",
+		[]string{"core", "tmp", "x"},
+		nil,
+	}); printRules(res) != "[]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	// Single rule match
+
+	if res := index.Match(&Event{
+		"bla",
+		[]string{"core", "tmp", "x"},
+		map[interface{}]interface{}{ // Match on event state
+			"name": nil,
+			"test": "val1",
+		},
+	}); printRules(res) != "[TestRule1]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := index.Match(&Event{
+		"bla",
+		[]string{"core", "main", "tester"},
+		map[interface{}]interface{}{ // Match on event state
+			"name": nil,
+			"test": "val1",
+		},
+	}); printRules(res) != "[TestRule1]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := index.Match(&Event{
+		"bla",
+		[]string{"core", "main", "tester"},
+		map[interface{}]interface{}{ // Match on event state
+			"name":  "foobar",
+			"test":  "val2",
+			"test2": 42,
+		},
+	}); printRules(res) != "[TestRule2]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	// Test multiple rule match
+
+	if res := index.Match(&Event{
+		"bla",
+		[]string{"core", "main", "tester"},
+		map[interface{}]interface{}{ // Match on event state
+			"name":  nil,
+			"test":  "val2",
+			"test2": 42,
+			"test3": 15,
+		},
+	}); printRules(res) != "[TestRule2 TestRule3]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+}
+
+func TestRuleIndexStateRegexMatch(t *testing.T) {
+	ruleindexidcounter = 0
+	defer func() {
+		ruleindexidcounter = 0
+	}()
+
+	rule1 := &Rule{
+		"TestRule1", // Name
+		"",          // Description
+		[]string{"core.main.tester", "core.tmp.*"}, // Kind match
+		[]string{"data.read", "data.test"},         // Match on event cascade scope
+		map[string]interface{}{ // Match on event state
+			"name": nil,
+			"test": regexp.MustCompile("val.*"),
+		},
+		0,                      // Priority of the rule
+		[]string{"TestRule66"}, // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			return nil
+		},
+	}
+
+	rule2 := &Rule{
+		"TestRule2",                  // Name
+		"",                           // Description
+		[]string{"core.main.tester"}, // Kind match
+		[]string{"data.read"},        // Match on event cascade scope
+		map[string]interface{}{ // Match on event state
+			"name": nil,
+			"test": regexp.MustCompile("va..*"),
+		},
+		0,                      // Priority of the rule
+		[]string{"TestRule66"}, // List of suppressed rules by this rule
+		func(p Processor, m Monitor, e *Event) error { // Action of the rule
+			return nil
+		},
+	}
+
+	index := NewRuleIndex()
+
+	index.AddRule(rule1)
+	index.AddRule(rule2)
+
+	// Check index layout
+
+	if res := index.String(); res != `
+core - RuleIndexKind (0)
+  main - RuleIndexKind (1)
+    tester - RuleIndexKind (2)
+      RuleIndexState (3) [TestRule1 TestRule2 ]
+        name - 00000003 *:00000003 [] []
+        test - 00000003 *:00000003 [] [00000001:val.* 00000002:va..* ]
+  tmp - RuleIndexKind (1)
+    * - RuleIndexKind (4)
+      RuleIndexState (5) [TestRule1 ]
+        name - 00000001 *:00000001 [] []
+        test - 00000001 *:00000001 [] [00000001:val.* ]
+`[1:] {
+		t.Error("Unexpected index layout:", res)
+		return
+	}
+
+	if res := index.Match(&Event{
+		"bla",
+		[]string{"core", "tmp", "x"},
+		map[interface{}]interface{}{ // Match on event state
+			"name": "boo",
+			"test": "val1",
+		},
+	}); printRules(res) != "[TestRule1]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := index.Match(&Event{
+		"bla",
+		[]string{"core", "tmp", "x"},
+		map[interface{}]interface{}{ // Match on event state
+			"name": "boo",
+			"test": "val",
+		},
+	}); printRules(res) != "[TestRule1]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := index.Match(&Event{
+		"bla",
+		[]string{"core", "main", "tester"},
+		map[interface{}]interface{}{ // Match on event state
+			"name": "boo",
+			"test": "var",
+		},
+	}); printRules(res) != "[TestRule2]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := index.Match(&Event{
+		"bla",
+		[]string{"core", "main", "tester"},
+		map[interface{}]interface{}{ // Match on event state
+			"name": "boo",
+			"test": "val",
+		},
+	}); printRules(res) != "[TestRule1 TestRule2]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	// Test error cases
+
+	if res := index.IsTriggering(&Event{
+		"bla",
+		[]string{"core", "main", "tester", "a"},
+		map[interface{}]interface{}{ // Match on event state
+			"name": "boo",
+			"test": "val",
+		},
+	}); res {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := index.Match(&Event{
+		"bla",
+		[]string{"core", "main", "tester", "a"},
+		map[interface{}]interface{}{ // Match on event state
+			"name": "boo",
+			"test": "val",
+		},
+	}); printRules(res) != "[]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+}
+
+func printRules(rules []*Rule) string {
+	var ret []string
+
+	for _, r := range rules {
+		ret = append(ret, r.Name)
+	}
+
+	sort.Strings(ret)
+
+	return fmt.Sprint(ret)
+}

+ 236 - 0
engine/taskqueue.go

@@ -0,0 +1,236 @@
+/*
+ * ECAL
+ *
+ * Copyright 2020 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package engine
+
+import (
+	"bytes"
+	"fmt"
+	"math/rand"
+	"sort"
+	"sync"
+
+	"devt.de/krotik/common/errorutil"
+	"devt.de/krotik/common/flowutil"
+	"devt.de/krotik/common/pools"
+	"devt.de/krotik/common/sortutil"
+	"devt.de/krotik/common/stringutil"
+)
+
+/*
+TaskError datastructure to collect all rule errors of an event.
+*/
+type TaskError struct {
+	ErrorMap map[string]error // Rule errors (rule name -> error)
+	Event    *Event           // Event which caused the error
+	Monitor  Monitor          // Event monitor
+}
+
+/*
+Error returns a string representation of this error.
+*/
+func (te *TaskError) Error() string {
+	var ret bytes.Buffer
+
+	// Collect all errors and sort them by name
+
+	errNames := make([]string, 0, len(te.ErrorMap))
+
+	for name := range te.ErrorMap {
+		errNames = append(errNames, name)
+	}
+
+	sort.Strings(errNames)
+
+	ret.WriteString(fmt.Sprintf("Taskerror%v:\n", stringutil.Plural(len(errNames))))
+	for i, name := range errNames {
+		ret.WriteString(te.Monitor.EventPathString())
+		ret.WriteString(fmt.Sprintf(" -> %v : %v", name, te.ErrorMap[name]))
+		if i < len(errNames)-1 {
+			ret.WriteString("\n")
+		}
+	}
+
+	return ret.String()
+}
+
+/*
+Task models a task which is created and executed by the processor.
+*/
+type Task struct {
+	p Processor // Processor which created the task
+	m Monitor   // Monitor which observes the task execution
+	e *Event    // Event which caused the task creation
+}
+
+/*
+Run the task.
+*/
+func (t *Task) Run() error {
+	EventTracer.record(t.e, "Task.Run", "Running task")
+
+	errors := t.p.ProcessEvent(t.e, t.m)
+
+	if len(errors) > 0 {
+		EventTracer.record(t.e, "Task.Run", fmt.Sprint("Task had errors:", errors))
+
+		return &TaskError{errors, t.e, t.m}
+	}
+
+	return nil
+}
+
+/*
+Returns a string representation of this task.
+*/
+func (t *Task) String() string {
+	return fmt.Sprintf("Task: %v %v %v", t.p, t.m, t.e)
+}
+
+/*
+HandleError handles an error which occurred during the run method.
+*/
+func (t *Task) HandleError(e error) {
+	t.m.SetErrors(e.(*TaskError))
+	t.p.(*eventProcessor).notifyRootMonitorErrors(t.m.RootMonitor())
+}
+
+/*
+TaskQueue models the queue of tasks for a processor.
+*/
+type TaskQueue struct {
+	lock         *sync.Mutex                        // Lock for queue
+	queues       map[uint64]*sortutil.PriorityQueue // Map from root monitor id to priority queue
+	messageQueue *flowutil.EventPump                // Queue for message passing between components
+}
+
+/*
+NewTaskQueue creates a new TaskQueue object.
+*/
+func NewTaskQueue(ep *flowutil.EventPump) *TaskQueue {
+	return &TaskQueue{&sync.Mutex{}, make(map[uint64]*sortutil.PriorityQueue), ep}
+}
+
+/*
+Clear the queue of all pending tasks.
+*/
+func (tq *TaskQueue) Clear() {
+	tq.lock.Lock()
+	defer tq.lock.Unlock()
+
+	tq.queues = make(map[uint64]*sortutil.PriorityQueue)
+}
+
+/*
+Pop returns the next task from the queue.
+*/
+func (tq *TaskQueue) Pop() pools.Task {
+	tq.lock.Lock()
+	defer tq.lock.Unlock()
+
+	var popQueue *sortutil.PriorityQueue
+	var idx int
+
+	// Pick a random number between 0 and len(tq.queues) - 1
+
+	if lq := len(tq.queues); lq > 0 {
+		idx = rand.Intn(lq)
+	}
+
+	// Go through all queues and pick one - clean up while we are at it
+
+	for k, v := range tq.queues {
+
+		if v.Size() > 0 {
+
+			// Pick a random queue - pick the last if idx does not
+			// reach 0 before the end of the iteration.
+
+			idx--
+
+			popQueue = v
+
+			if idx <= 0 {
+				break
+			}
+
+		} else {
+
+			// Remove empty queues
+
+			delete(tq.queues, k)
+		}
+	}
+
+	if popQueue != nil {
+		if res := popQueue.Pop(); res != nil {
+			return res.(*Task)
+		}
+	}
+
+	return nil
+}
+
+/*
+Push adds another task to the queue.
+*/
+func (tq *TaskQueue) Push(t pools.Task) {
+	tq.lock.Lock()
+	defer tq.lock.Unlock()
+
+	var q *sortutil.PriorityQueue
+	var ok bool
+
+	task := t.(*Task)
+
+	rm := task.m.RootMonitor()
+	id := rm.ID()
+
+	if q, ok = tq.queues[id]; !ok {
+		q = sortutil.NewPriorityQueue()
+		tq.queues[id] = q
+
+		// Add listener for finish
+
+		tq.messageQueue.AddObserver(MessageRootMonitorFinished, rm,
+			func(event string, eventSource interface{}) {
+				tq.lock.Lock()
+				defer tq.lock.Unlock()
+
+				rm := eventSource.(*RootMonitor)
+				q := tq.queues[rm.ID()]
+
+				// Safeguard that no tasks are ever left over
+
+				errorutil.AssertTrue(q == nil || q.Size() == 0,
+					"Finished monitor left events behind")
+
+				tq.messageQueue.RemoveObservers(event, eventSource)
+			})
+	}
+
+	q.Push(task, task.m.Priority())
+}
+
+/*
+Size returns the size of the queue.
+*/
+func (tq *TaskQueue) Size() int {
+	tq.lock.Lock()
+	defer tq.lock.Unlock()
+
+	var ret int
+
+	for _, q := range tq.queues {
+		ret += q.Size()
+	}
+
+	return ret
+}

+ 223 - 0
engine/taskqueue_test.go

@@ -0,0 +1,223 @@
+/*
+ * ECAL
+ *
+ * Copyright 2020 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package engine
+
+import (
+	"fmt"
+	"testing"
+)
+
+func TestTaskQueue(t *testing.T) {
+	UnitTestResetIDs()
+
+	// Create dummy processor
+
+	proc := NewProcessor(1)
+
+	// Create dummy event
+
+	event := &Event{
+		"DummyEvent",
+		[]string{"main"},
+		nil,
+	}
+
+	// Create different root monitors with different IDs
+
+	m1 := newRootMonitor(nil, NewRuleScope(map[string]bool{"": true}), proc.(*eventProcessor).messageQueue)
+	m2 := newRootMonitor(nil, NewRuleScope(map[string]bool{"": true}), proc.(*eventProcessor).messageQueue)
+	m3 := newRootMonitor(nil, NewRuleScope(map[string]bool{"": true}), proc.(*eventProcessor).messageQueue)
+
+	// Create now different tasks which come from the different monitors
+
+	t1 := &Task{proc, m1, event}
+	t2 := &Task{proc, m2, event}
+	t3 := &Task{proc, m3, event}
+	t4 := &Task{proc, m2.NewChildMonitor(5), event}
+	t5 := &Task{proc, m2.NewChildMonitor(10), event}
+
+	tq := NewTaskQueue(proc.(*eventProcessor).messageQueue)
+
+	tq.Push(t1)
+
+	if res := tq.Size(); res != 1 {
+		t.Error("Unexpected size:", res)
+		return
+	}
+
+	tq.Clear()
+
+	if res := tq.Size(); res != 0 {
+		t.Error("Unexpected size:", res)
+		return
+	}
+
+	if e := tq.Pop(); e != nil {
+		t.Error("Unexpected event:", e)
+		return
+	}
+
+	if res := tq.Size(); res != 0 {
+		t.Error("Unexpected size:", res)
+		return
+	}
+
+	tq.Push(t1)
+	tq.Push(t2)
+	tq.Push(t3)
+	tq.Push(t4)
+	tq.Push(t5)
+
+	if res := len(tq.queues); res != 3 {
+		t.Error("Unexpected size:", res)
+		return
+	}
+
+	if s := tq.queues[1].Size(); s != 1 {
+		t.Error("Unexpected result:", s)
+		return
+	}
+
+	if s := tq.queues[2].Size(); s != 3 {
+		t.Error("Unexpected result:", s)
+		return
+	}
+
+	if e := tq.Pop(); e != t1 && e != t2 && e != t3 {
+		t.Error("Unexpected event:", e)
+		return
+	}
+
+	if res := len(tq.queues); res != 3 {
+		t.Error("Unexpected size:", res)
+		return
+	}
+
+	if e := tq.Pop(); e != t1 && e != t2 && e != t3 && e != t4 && e != t5 {
+		t.Error("Unexpected event:", e)
+		return
+	}
+
+	if res := len(tq.queues); res != 3 && res != 2 {
+		t.Error("Unexpected size:", res)
+		return
+	}
+
+	if e := tq.Pop(); e != t1 && e != t2 && e != t3 && e != t4 && e != t5 {
+		t.Error("Unexpected event:", e)
+		return
+	}
+
+	if s := tq.Size(); s != 2 {
+		t.Error("Unexpected result:", s)
+		return
+	}
+
+	if e := tq.Pop(); e != t1 && e != t2 && e != t3 && e != t4 && e != t5 {
+		t.Error("Unexpected event:", e)
+		return
+	}
+
+	if e := tq.Pop(); e != t1 && e != t2 && e != t3 && e != t4 && e != t5 {
+		t.Error("Unexpected event:", e)
+		return
+	}
+
+	if s := tq.Size(); s != 0 {
+		t.Error("Unexpected result:", s)
+		return
+	}
+
+	if e := tq.Pop(); e != nil {
+		t.Error("Unexpected event:", e)
+		return
+	}
+
+	tq.Push(t5)
+
+	if fmt.Sprint(tq.queues) != "map[2:[ Task: RumbleProcessor 1 (workers:1) Monitor 5 (parent: Monitor 2 (parent: <nil> priority: 0 activated: false finished: false) priority: 10 activated: false finished: false) Event: DummyEvent main {} (10) ]]" {
+		t.Error("Unexpected queue:", tq.queues)
+		return
+	}
+
+	if e := tq.Pop(); e != t5 {
+		t.Error("Unexpected event:", e)
+		return
+	}
+
+	if fmt.Sprint(tq.queues) != "map[2:[ ]]" {
+		t.Error("Unexpected list of ids:", tq.queues)
+		return
+	}
+
+	if e := tq.Pop(); e != nil {
+		t.Error("Unexpected event:", e)
+		return
+	}
+
+	if fmt.Sprint(tq.queues) != "map[]" {
+		t.Error("Unexpected list of ids:", tq.queues)
+		return
+	}
+
+	if e := tq.Pop(); e != nil {
+		t.Error("Unexpected event:", e)
+		return
+	}
+}
+
+func TestTaskQueueCorrectPriorities(t *testing.T) {
+	UnitTestResetIDs()
+
+	// Create dummy processor
+
+	proc := NewProcessor(1)
+
+	// Create dummy event
+
+	event := &Event{
+		"DummyEvent",
+		[]string{"main"},
+		nil,
+	}
+
+	// Create different root monitors with different IDs
+
+	m1 := newRootMonitor(nil, NewRuleScope(map[string]bool{"": true}), proc.(*eventProcessor).messageQueue)
+
+	// Create now different tasks which come from the different monitors
+
+	t1 := &Task{proc, m1, event}
+	t2 := &Task{proc, m1.NewChildMonitor(5), event}
+	t3 := &Task{proc, m1.NewChildMonitor(10), event}
+
+	tq := NewTaskQueue(proc.(*eventProcessor).messageQueue)
+
+	tq.Push(t2)
+	tq.Push(t1)
+	tq.Push(t3)
+
+	if s := tq.Size(); s != 3 {
+		t.Error("Unexpected result:", s)
+		return
+	}
+
+	var popList []int
+
+	popList = append(popList, tq.Pop().(*Task).m.Priority())
+	popList = append(popList, tq.Pop().(*Task).m.Priority())
+	popList = append(popList, tq.Pop().(*Task).m.Priority())
+
+	if fmt.Sprint(popList) != "[0 5 10]" {
+		t.Error("Unexpected poplist:", popList)
+		return
+	}
+}

+ 161 - 0
engine/util.go

@@ -0,0 +1,161 @@
+/*
+ * ECAL
+ *
+ * Copyright 2020 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package engine
+
+import (
+	"sort"
+	"strings"
+)
+
+// Globals
+// =======
+
+/*
+RuleKindSeparator is the separator for rule kinds
+*/
+const RuleKindSeparator = "."
+
+/*
+RuleKindWildcard is a wildcard for rule kinds
+*/
+const RuleKindWildcard = "*"
+
+// Messages
+// ========
+
+/*
+MessageRootMonitorFinished is send from a root monitor when it has finished
+*/
+const MessageRootMonitorFinished = "MessageRootMonitorFinished"
+
+// Rule Scope
+// ==========
+
+/*
+RuleScope is a set of scope definitions for rules. Each definition allows or disallows
+a set of rule types. Scope definitions and rule sets are usually expressed with
+named paths (scope paths) in dot notation (e.g. core.data.read).
+*/
+type RuleScope struct {
+	scopeDefs map[string]interface{}
+}
+
+const ruleScopeAllowFlag = "."
+
+/*
+NewRuleScope creates a new rule scope object with an initial set of definitions.
+*/
+func NewRuleScope(allows map[string]bool) *RuleScope {
+	rs := &RuleScope{make(map[string]interface{})}
+	rs.AddAll(allows)
+	return rs
+}
+
+/*
+IsAllowedAll checks if all given scopes are allowed.
+*/
+func (rs *RuleScope) IsAllowedAll(scopePaths []string) bool {
+	for _, path := range scopePaths {
+		if !rs.IsAllowed(path) {
+			return false
+		}
+	}
+	return true
+}
+
+/*
+IsAllowed checks if a given scope path is allowed within this rule scope.
+*/
+func (rs *RuleScope) IsAllowed(scopePath string) bool {
+	allowed := false
+	scopeDefs := rs.scopeDefs
+
+	if a, ok := scopeDefs[ruleScopeAllowFlag]; ok {
+		allowed = a.(bool)
+	}
+
+	for _, scopeStep := range strings.Split(scopePath, ".") {
+		val, ok := scopeDefs[scopeStep]
+
+		if !ok {
+			break
+		}
+
+		scopeDefs = val.(map[string]interface{})
+
+		if a, ok := scopeDefs[ruleScopeAllowFlag]; ok {
+			allowed = a.(bool)
+		}
+	}
+
+	return allowed
+}
+
+/*
+AddAll adds all given definitions to the rule scope.
+*/
+func (rs *RuleScope) AddAll(allows map[string]bool) {
+	for scopePath, allow := range allows {
+		rs.Add(scopePath, allow)
+	}
+}
+
+/*
+Add adds a given definition to the rule scope.
+*/
+func (rs *RuleScope) Add(scopePath string, allow bool) {
+	scopeDefs := rs.scopeDefs
+
+	if scopePath != "" {
+		for _, scopeStep := range strings.Split(scopePath, ".") {
+			val, ok := scopeDefs[scopeStep]
+
+			if !ok {
+				val = make(map[string]interface{})
+				scopeDefs[scopeStep] = val
+			}
+
+			scopeDefs = val.(map[string]interface{})
+		}
+	}
+
+	scopeDefs[ruleScopeAllowFlag] = allow
+}
+
+// Rule sorting
+// ============
+
+/*
+RuleSlice is a slice of rules
+*/
+type RuleSlice []*Rule
+
+func (s RuleSlice) Len() int           { return len(s) }
+func (s RuleSlice) Less(i, j int) bool { return s[i].Priority < s[j].Priority }
+func (s RuleSlice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
+
+/*
+SortRuleSlice sorts a slice of rules.
+*/
+func SortRuleSlice(a []*Rule) { sort.Sort(RuleSlice(a)) }
+
+// Unit testing
+// ============
+
+/*
+UnitTestResetIDs reset all counting IDs.
+THIS FUNCTION SHOULD ONLY BE CALLED BY UNIT TESTS!
+*/
+func UnitTestResetIDs() {
+	pidcounter = 1
+	ruleindexidcounter = 1
+	midcounter = 1
+}

+ 77 - 0
engine/util_test.go

@@ -0,0 +1,77 @@
+/*
+ * ECAL
+ *
+ * Copyright 2020 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package engine
+
+import "testing"
+
+func TestRuleScope(t *testing.T) {
+
+	rs := NewRuleScope(map[string]bool{
+		"test.first.read":  true,
+		"test.first.write": false,
+		"test.second":      true,
+	})
+
+	if rs.IsAllowed("test.first") {
+		t.Error("Unexpected result")
+		return
+	}
+
+	if rs.IsAllowed("test.first.write") {
+		t.Error("Unexpected result")
+		return
+	}
+
+	if !rs.IsAllowed("test.first.read") {
+		t.Error("Unexpected result")
+		return
+	}
+
+	if !rs.IsAllowed("test.second") {
+		t.Error("Unexpected result")
+		return
+	}
+
+	if !rs.IsAllowed("test.second.bla") {
+		t.Error("Unexpected result")
+		return
+	}
+
+	// Test all is allowed
+
+	rs = NewRuleScope(map[string]bool{
+		"": true,
+	})
+
+	if !rs.IsAllowed("test.first") {
+		t.Error("Unexpected result")
+		return
+	}
+
+	if !rs.IsAllowed("test.first.write") {
+		t.Error("Unexpected result")
+		return
+	}
+
+	// Test nothing is allowed
+
+	rs = NewRuleScope(nil)
+
+	if rs.IsAllowed("test.first") {
+		t.Error("Unexpected result")
+		return
+	}
+
+	if rs.IsAllowed("test.first.write") {
+		t.Error("Unexpected result")
+		return
+	}
+}