123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468 |
- /*
- * ECAL
- *
- * Copyright 2020 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the MIT
- * License, If a copy of the MIT License was not distributed with this
- * file, You can obtain one at https://opensource.org/licenses/MIT.
- */
- package engine
- import (
- "bytes"
- "container/heap"
- "fmt"
- "sync"
- "devt.de/krotik/common/errorutil"
- "devt.de/krotik/common/sortutil"
- "devt.de/krotik/ecal/engine/pubsub"
- )
- /*
- Monitor monitors events as they are cascading. Event cascades will produce tree
- structures.
- */
- type Monitor interface {
- /*
- ID returns the monitor ID.
- */
- ID() uint64
- /*
- NewChildMonitor creates a new child monitor of this monitor.
- */
- NewChildMonitor(priority int) Monitor
- /*
- Scope returns the rule scope of this monitor.
- */
- Scope() *RuleScope
- /*
- Priority returns the monitors priority.
- */
- Priority() int
- /*
- Activated returns if this monitor has been activated.
- */
- IsActivated() bool
- /*
- Activate activates this monitor with a given event.
- */
- Activate(e *Event)
- /*
- Skip finishes this monitor without activation.
- */
- Skip(e *Event)
- /*
- Finish finishes this monitor.
- */
- Finish()
- /*
- Returns the root monitor of this monitor.
- */
- RootMonitor() *RootMonitor
- /*
- Errors returns the error object of this monitor.
- */
- Errors() *TaskError
- /*
- SetErrors adds an error object to this monitor.
- */
- SetErrors(e *TaskError)
- /*
- EventPath returns the chain of events which created this monitor.
- */
- EventPath() []*Event
- /*
- EventPathString returns the event path as a string.
- */
- EventPathString() string
- /*
- String returns a string representation of this monitor.
- */
- String() string
- }
- /*
- monitorBase provides the basic functions and fields for any monitor.
- */
- type monitorBase struct {
- id uint64 // Monitor ID
- Parent *monitorBase // Parent monitor
- Context map[string]interface{} // Context object of the monitor
- Err *TaskError // Errors which occurred during event processing
- priority int // Priority of the monitor
- rootMonitor *RootMonitor // Root monitor
- event *Event // Event which activated this monitor
- activated bool // Flag indicating if the monitor was activated
- finished bool // Flag indicating if the monitor has finished
- }
- /*
- NewMonitor creates a new monitor.
- */
- func newMonitorBase(priority int, parent *monitorBase, context map[string]interface{}) *monitorBase {
- var ret *monitorBase
- if parent != nil {
- ret = &monitorBase{newMonID(), parent, context, nil, priority, parent.rootMonitor, nil, false, false}
- } else {
- ret = &monitorBase{newMonID(), nil, context, nil, priority, nil, nil, false, false}
- }
- return ret
- }
- /*
- NewChildMonitor creates a new child monitor of this monitor.
- */
- func (mb *monitorBase) NewChildMonitor(priority int) Monitor {
- child := &ChildMonitor{newMonitorBase(priority, mb, mb.Context)}
- mb.rootMonitor.descendantCreated(child)
- return child
- }
- /*
- ID returns the monitor ID.
- */
- func (mb *monitorBase) ID() uint64 {
- return mb.id
- }
- /*
- RootMonitor returns the root monitor of this monitor.
- */
- func (mb *monitorBase) RootMonitor() *RootMonitor {
- return mb.rootMonitor
- }
- /*
- Scope returns the rule scope of this monitor.
- */
- func (mb *monitorBase) Scope() *RuleScope {
- return mb.rootMonitor.ruleScope
- }
- /*
- Priority returns the priority of this monitor.
- */
- func (mb *monitorBase) Priority() int {
- return mb.priority
- }
- /*
- IsActivated returns if this monitor has been activated.
- */
- func (mb *monitorBase) IsActivated() bool {
- return mb.activated
- }
- /*
- IsFinished returns if this monitor has finished.
- */
- func (mb *monitorBase) IsFinished() bool {
- return mb.finished
- }
- /*
- Activate activates this monitor.
- */
- func (mb *monitorBase) Activate(e *Event) {
- errorutil.AssertTrue(!mb.finished, "Cannot activate a finished monitor")
- errorutil.AssertTrue(!mb.activated, "Cannot activate an active monitor")
- errorutil.AssertTrue(e != nil, "Monitor can only be activated with an event")
- mb.event = e
- mb.rootMonitor.descendantActivated(mb.priority)
- mb.activated = true
- }
- /*
- Skip finishes this monitor without activation.
- */
- func (mb *monitorBase) Skip(e *Event) {
- errorutil.AssertTrue(!mb.finished, "Cannot skip a finished monitor")
- errorutil.AssertTrue(!mb.activated, "Cannot skip an active monitor")
- mb.event = e
- mb.activated = true
- mb.Finish()
- }
- /*
- Finish finishes this monitor.
- */
- func (mb *monitorBase) Finish() {
- errorutil.AssertTrue(mb.activated, "Cannot finish a not active monitor")
- errorutil.AssertTrue(!mb.finished, "Cannot finish a finished monitor")
- mb.finished = true
- mb.rootMonitor.descendantFinished(mb)
- }
- /*
- Errors returns the error object of this monitor.
- */
- func (mb *monitorBase) Errors() *TaskError {
- errorutil.AssertTrue(mb.finished, "Cannot get errors on an unfinished monitor")
- return mb.Err
- }
- /*
- SetErrors adds an error object to this monitor.
- */
- func (mb *monitorBase) SetErrors(e *TaskError) {
- mb.Err = e
- mb.rootMonitor.descendantFailed(mb)
- }
- /*
- EventPath returns the chain of events which created this monitor.
- */
- func (mb *monitorBase) EventPath() []*Event {
- errorutil.AssertTrue(mb.finished, "Cannot get event path on an unfinished monitor")
- path := []*Event{mb.event}
- child := mb.Parent
- for child != nil {
- path = append(path, child.event)
- child = child.Parent
- }
- // Reverse path
- for i, j := 0, len(path)-1; i < j; i, j = i+1, j-1 {
- path[i], path[j] = path[j], path[i]
- }
- return path
- }
- /*
- EventPathString returns the event path as a string.
- */
- func (mb *monitorBase) EventPathString() string {
- var buf bytes.Buffer
- ep := mb.EventPath()
- last := len(ep) - 1
- for i, e := range mb.EventPath() {
- buf.WriteString(e.name)
- if i < last {
- buf.WriteString(" -> ")
- }
- }
- return buf.String()
- }
- /*
- String returns a string representation of this monitor.
- */
- func (mb *monitorBase) String() string {
- return fmt.Sprintf("Monitor %v (parent: %v priority: %v activated: %v finished: %v)",
- mb.ID(), mb.Parent, mb.priority, mb.activated, mb.finished)
- }
- // Root Monitor
- // ============
- /*
- RootMonitor is a monitor which is at a beginning of an event cascade.
- */
- type RootMonitor struct {
- *monitorBase
- lock *sync.Mutex // Lock for datastructures
- incomplete map[int]int // Priority -> Counters of incomplete trackers
- priorities *sortutil.IntHeap // List of handled priorities
- ruleScope *RuleScope // Rule scope definitions
- unfinished int // Counter of all unfinished trackers
- messageQueue *pubsub.EventPump // Message passing queue of the processor
- errors map[uint64]*monitorBase // Monitors which got errors
- finished func(Processor) // Finish handler (can be used externally)
- }
- /*
- NewRootMonitor creates a new root monitor.
- */
- func newRootMonitor(context map[string]interface{}, scope *RuleScope,
- messageQueue *pubsub.EventPump) *RootMonitor {
- ret := &RootMonitor{newMonitorBase(0, nil, context), &sync.Mutex{},
- make(map[int]int), &sortutil.IntHeap{}, scope, 1, messageQueue,
- make(map[uint64]*monitorBase), nil}
- // A root monitor is its own parent
- ret.rootMonitor = ret
- heap.Init(ret.priorities)
- return ret
- }
- /*
- SetFinishHandler adds a handler function to this monitor which is called once
- this monitor has finished.
- */
- func (rm *RootMonitor) SetFinishHandler(fh func(Processor)) {
- rm.finished = fh
- }
- /*
- HighestPriority returns the highest priority which is handled by this monitor.
- */
- func (rm *RootMonitor) HighestPriority() int {
- rm.lock.Lock()
- defer rm.lock.Unlock()
- if len(*rm.priorities) > 0 {
- return (*rm.priorities)[0]
- }
- return -1
- }
- /*
- AllErrors returns all error which have been collected in this root monitor.
- */
- func (rm *RootMonitor) AllErrors() []*TaskError {
- rm.lock.Lock()
- defer rm.lock.Unlock()
- ret := make([]*TaskError, 0, len(rm.errors))
- // Sort by monitor id - this should give the corrent order timewise
- var ids []uint64
- for id := range rm.errors {
- ids = append(ids, id)
- }
- sortutil.UInt64s(ids)
- for _, id := range ids {
- m := rm.errors[id]
- ret = append(ret, m.Errors())
- }
- return ret
- }
- /*
- descendantCreated notifies this root monitor that a descendant has been created.
- */
- func (rm *RootMonitor) descendantCreated(monitor Monitor) {
- rm.lock.Lock()
- defer rm.lock.Unlock()
- rm.unfinished++
- }
- /*
- descendantActivated notifies this root monitor that a descendant has been activated.
- */
- func (rm *RootMonitor) descendantActivated(priority int) {
- rm.lock.Lock()
- defer rm.lock.Unlock()
- val, ok := rm.incomplete[priority]
- if !ok {
- val = 0
- heap.Push(rm.priorities, priority)
- }
- rm.incomplete[priority] = val + 1
- }
- /*
- descendantFailed notifies this root monitor that a descendant has failed.
- */
- func (rm *RootMonitor) descendantFailed(monitor *monitorBase) {
- rm.lock.Lock()
- defer rm.lock.Unlock()
- rm.errors[monitor.ID()] = monitor
- }
- /*
- descendantFinished records that this monitor has finished. If it is the last
- active monitor in the event tree then send a notification.
- */
- func (rm *RootMonitor) descendantFinished(m Monitor) {
- rm.lock.Lock()
- rm.unfinished--
- finished := rm.unfinished == 0
- if m.IsActivated() {
- priority := m.Priority()
- rm.incomplete[priority]--
- if rm.incomplete[priority] == 0 {
- rm.priorities.RemoveFirst(priority)
- delete(rm.incomplete, priority)
- }
- }
- rm.lock.Unlock()
- // Post notification
- if finished {
- rm.messageQueue.PostEvent(MessageRootMonitorFinished, rm)
- }
- }
- // Child Monitor
- // =============
- /*
- ChildMonitor is a monitor which is a descendant of a root monitor.
- */
- type ChildMonitor struct {
- *monitorBase
- }
- // Unique id creation
- // ==================
- var midcounter uint64 = 1
- var midcounterLock = &sync.Mutex{}
- /*
- newId returns a new unique id.
- */
- func newMonID() uint64 {
- midcounterLock.Lock()
- defer midcounterLock.Unlock()
- ret := midcounter
- midcounter++
- return ret
- }
|