Browse Source

feat: Moving thread pool and pubsub into the engine code

Matthias Ladkau 3 years ago
parent
commit
e1f49fbe26

+ 14 - 19
ecal.md

@@ -1,6 +1,6 @@
 ECAL - Event Condition Action Language
 --
-ECAL is a language to create a rule based system which reacts to events provided that a defined condition holds:
+ECAL is a language to create a rule based system which reacts to events. Events are handled by actions which are guarded by conditions:
 
 Event -> Condition -> Action
 
@@ -29,21 +29,8 @@ foobar.doSomething()
 
 Event Sinks
 --
-TODO:
+Event sinks are the core constructs of ECAL which provide concurrency and the means to respond to events of an external system. Sinks provide ECAL with an interface to an [event condition action engine](engine.md) which coordinates the parallel execution of code. Sinks cannot be scoped into modules or objects and are usually declared at the top level. The have the following form:
 
-- How ECAL provides concurrency support
-
-Sinks are should have unique names which identify them and the following attributes:
-
-Attribute | Description
--|-
-kindmatch  | Matching condition for event kind e.g. db.op.TableInsert. A list of strings in dot notation which describes event kinds. May contain `*` characters as wildcards.
-scopematch | Matching condition for event cascade scope e.g. db.dbRead db.dbWrite. A list of strings in dot notation which describe the scopes which are required for this sink to trigger.
-statematch | Match on event state: A simple map of required key / value states in the event state. `NULL` values can be used as wildcards (i.e. match is only on key).
-priority | Priority of the sink. Sinks of higher priority are executed first. The higher the number the lower the priority - 0 is the highest priority.
-suppresses | A list of sink names which should be suppressed if this sink is executed.
-
-Example:
 ```
 sink "mysink"
     kindmatch [ foo.bar.* ],
@@ -56,11 +43,19 @@ sink "mysink"
     }
 ```
 
-Events which match
-...
+Sinks are should have unique names which identify them and the following attributes:
+
+Attribute | Description
+-|-
+kindmatch  | Matching condition for event kind e.g. db.op.TableInsert. A list of strings in dot notation which describes event kinds. May contain `*` characters as wildcards.
+scopematch | Matching condition for event cascade scope e.g. db.dbRead db.dbWrite. A list of strings in dot notation which describe the scopes which are required for this sink to trigger.
+statematch | Match on event state: A simple map of required key / value states in the event state. `NULL` values can be used as wildcards (i.e. match is only on key).
+priority | Priority of the sink. Sinks of higher priority are executed first. The higher the number the lower the priority - 0 is the highest priority.
+suppresses | A list of sink names which should be suppressed if this sink is executed.
+
+
+
 
-Events which don't match
-...
 
 Functions
 --

+ 3 - 3
engine/monitor.go

@@ -17,8 +17,8 @@ import (
 	"sync"
 
 	"devt.de/krotik/common/errorutil"
-	"devt.de/krotik/common/flowutil"
 	"devt.de/krotik/common/sortutil"
+	"devt.de/krotik/ecal/engine/pubsub"
 )
 
 /*
@@ -299,7 +299,7 @@ type RootMonitor struct {
 	priorities   *sortutil.IntHeap       // List of handled priorities
 	ruleScope    *RuleScope              // Rule scope definitions
 	unfinished   int                     // Counter of all unfinished trackers
-	messageQueue *flowutil.EventPump     // Message passing queue of the processor
+	messageQueue *pubsub.EventPump       // Message passing queue of the processor
 	errors       map[uint64]*monitorBase // Monitors which got errors
 	finished     func(Processor)         // Finish handler (can be used externally)
 }
@@ -308,7 +308,7 @@ type RootMonitor struct {
 NewRootMonitor creates a new root monitor.
 */
 func newRootMonitor(context map[string]interface{}, scope *RuleScope,
-	messageQueue *flowutil.EventPump) *RootMonitor {
+	messageQueue *pubsub.EventPump) *RootMonitor {
 
 	ret := &RootMonitor{newMonitorBase(0, nil, context), &sync.Mutex{},
 		make(map[int]int), &sortutil.IntHeap{}, scope, 1, messageQueue,

+ 514 - 0
engine/pool/threadpool.go

@@ -0,0 +1,514 @@
+/*
+ * Public Domain Software
+ *
+ * I (Matthias Ladkau) am the author of the source code in this file.
+ * I have placed the source code in this file in the public domain.
+ *
+ * For further information see: http://creativecommons.org/publicdomain/zero/1.0/
+ */
+
+/*
+Package pool contains a thread pool implementation.
+*/
+package pool
+
+import (
+	"math"
+	"sync"
+	"time"
+)
+
+/*
+Different states of a thread pool.
+*/
+const (
+	StatusRunning  = "Running"
+	StatusStopping = "Stopping"
+	StatusStopped  = "Stopped"
+)
+
+/*
+Task is a task which should be run in a thread.
+*/
+type Task interface {
+
+	/*
+		Run the task.
+	*/
+	Run() error
+
+	/*
+		HandleError handles an error which occurred during the run method.
+	*/
+	HandleError(e error)
+}
+
+/*
+TaskQueue is a queue of tasks for a thread pool.
+*/
+type TaskQueue interface {
+
+	/*
+		Clear the queue of all pending tasks
+	*/
+	Clear()
+
+	/*
+		Pop returns the next task from the queue.
+	*/
+	Pop() Task
+	/*
+		Push adds another task to the queue.
+	*/
+	Push(t Task)
+
+	/*
+		Size returns the size of the queue.
+	*/
+	Size() int
+}
+
+/*
+DefaultTaskQueue implements a simple (FIFO) task queue for a thread pool.
+*/
+type DefaultTaskQueue struct {
+	queue []Task
+}
+
+/*
+Clear the queue of all pending tasks
+*/
+func (tq *DefaultTaskQueue) Clear() {
+	tq.queue = make([]Task, 0)
+}
+
+/*
+Pop returns the next task from the queue.
+*/
+func (tq *DefaultTaskQueue) Pop() Task {
+	var task Task
+
+	if len(tq.queue) > 0 {
+		task = tq.queue[0]
+		tq.queue = tq.queue[1:]
+	}
+
+	return task
+}
+
+/*
+Push adds another task to the queue.
+*/
+func (tq *DefaultTaskQueue) Push(t Task) {
+	tq.queue = append(tq.queue, t)
+}
+
+/*
+Size returns the size of the queue.
+*/
+func (tq *DefaultTaskQueue) Size() int {
+	return len(tq.queue)
+}
+
+/*
+ThreadPool creates a pool of threads which process tasks according to a given
+task queue. The threads are kept in an idle state if no more tasks are available.
+They resume immediately once a new task is added.
+*/
+type ThreadPool struct {
+
+	// Task regulation
+
+	queue     TaskQueue   // Task queue for thread pool
+	queueLock *sync.Mutex // Lock for queue
+
+	// Worker regulation
+
+	workerIDCount uint64                       // Id counter for worker tasks
+	workerMap     map[uint64]*ThreadPoolWorker // Map of all workers
+	workerIdleMap map[uint64]*ThreadPoolWorker // Map of all idle workers
+	workerMapLock *sync.Mutex                  // Lock for worker map
+	workerKill    int                          // Count of workers which should die
+	newTaskCond   *sync.Cond                   // Waiting condition for new tasks
+
+	// Callbacks to regulate load
+
+	RegulationLock *sync.Mutex // Lock for regulation variables
+
+	TooManyThreshold int    // Threshold for too many tasks
+	TooManyCallback  func() // Callback for too many tasks
+	tooManyTriggered bool   // Flag if too many tasks threshold was passed
+
+	TooFewThreshold int    // Threshold for too few tasks
+	TooFewCallback  func() // Callback for too few tasks
+	tooFewTriggered bool   // Flag if too many tasks threshold was passed
+}
+
+/*
+NewThreadPool creates a new thread pool.
+*/
+func NewThreadPool() *ThreadPool {
+	return NewThreadPoolWithQueue(&DefaultTaskQueue{})
+}
+
+/*
+NewThreadPoolWithQueue creates a new thread pool with a specific task queue.
+*/
+func NewThreadPoolWithQueue(q TaskQueue) *ThreadPool {
+	return &ThreadPool{q, &sync.Mutex{},
+		0, make(map[uint64]*ThreadPoolWorker),
+		make(map[uint64]*ThreadPoolWorker), &sync.Mutex{},
+		0, sync.NewCond(&sync.Mutex{}), &sync.Mutex{},
+		math.MaxInt32, func() {}, false, 0, func() {}, false}
+}
+
+/*
+AddTask adds a task to the thread pool.
+*/
+func (tp *ThreadPool) AddTask(t Task) {
+	tp.queueLock.Lock()
+	defer tp.queueLock.Unlock()
+
+	tp.queue.Push(t)
+
+	// Reset too few flag
+
+	tp.RegulationLock.Lock()
+
+	if tp.tooFewTriggered && tp.TooFewThreshold < tp.queue.Size() {
+		tp.tooFewTriggered = false
+	}
+
+	// Check too many
+
+	if !tp.tooManyTriggered && tp.TooManyThreshold <= tp.queue.Size() {
+		tp.tooManyTriggered = true
+		tp.TooManyCallback()
+	}
+
+	tp.RegulationLock.Unlock()
+
+	// Wake up a waiting worker
+
+	tp.newTaskCond.Signal()
+}
+
+/*
+getTask is called by a worker to request a new task. The worker is expected to finish
+if this function returns nil.
+*/
+func (tp *ThreadPool) getTask() Task {
+	var returnIdleTask = true
+
+	// Check if tasks should be stopped
+
+	tp.workerMapLock.Lock()
+	if tp.workerKill > 0 {
+		tp.workerKill--
+		tp.workerMapLock.Unlock()
+		return nil
+
+	} else if tp.workerKill == -1 {
+
+		// Check for special worker kill value which is used when workers should
+		// be killed when no more tasks are available.
+
+		returnIdleTask = false
+	}
+	tp.workerMapLock.Unlock()
+
+	// Check if there is a task available
+
+	tp.queueLock.Lock()
+	task := tp.queue.Pop()
+	tp.queueLock.Unlock()
+
+	if task != nil {
+		return task
+	}
+
+	tp.RegulationLock.Lock()
+
+	// Reset too many flag
+
+	if tp.tooManyTriggered && tp.TooManyThreshold > tp.queue.Size() {
+		tp.tooManyTriggered = false
+	}
+
+	// Check too few
+
+	if !tp.tooFewTriggered && tp.TooFewThreshold >= tp.queue.Size() {
+		tp.tooFewTriggered = true
+		tp.TooFewCallback()
+	}
+
+	tp.RegulationLock.Unlock()
+
+	if returnIdleTask {
+
+		// No new task available return idle task
+
+		return &idleTask{tp}
+	}
+
+	return nil
+}
+
+/*
+SetWorkerCount sets the worker count of this pool. If the wait flag is true then
+this call will return after the pool has reached the requested worker count.
+*/
+func (tp *ThreadPool) SetWorkerCount(count int, wait bool) {
+
+	tp.workerMapLock.Lock()
+	workerCount := len(tp.workerMap)
+	tp.workerMapLock.Unlock()
+
+	if count < 0 {
+		count = 0
+	}
+
+	if workerCount < count {
+
+		// More workers are needed
+
+		tp.workerMapLock.Lock()
+
+		// Make sure no more workers are killed
+
+		tp.workerKill = 0
+
+		for len(tp.workerMap) != count {
+			worker := &ThreadPoolWorker{tp.workerIDCount, tp}
+			go worker.run()
+			tp.workerMap[tp.workerIDCount] = worker
+			tp.workerIDCount++
+		}
+
+		tp.workerMapLock.Unlock()
+
+	} else if workerCount > count {
+
+		// Fewer workers are needed
+
+		tp.workerMapLock.Lock()
+		tp.workerKill = workerCount - count
+		tp.workerMapLock.Unlock()
+
+		tp.newTaskCond.Broadcast()
+
+		if wait {
+			for true {
+				tp.workerMapLock.Lock()
+				workerCount = len(tp.workerMap)
+				tp.workerMapLock.Unlock()
+
+				if workerCount == count {
+					break
+				}
+
+				time.Sleep(5 * time.Nanosecond)
+
+				// Broadcast again since sine workers might be now waiting
+
+				tp.newTaskCond.Broadcast()
+			}
+		}
+	}
+}
+
+/*
+Status returns the current status of the thread pool.
+*/
+func (tp *ThreadPool) Status() string {
+	var status string
+
+	tp.workerMapLock.Lock()
+	workerCount := len(tp.workerMap)
+	workerKill := tp.workerKill
+	tp.workerMapLock.Unlock()
+
+	if workerCount > 0 {
+		if workerKill == -1 {
+			status = StatusStopping
+		} else {
+			status = StatusRunning
+		}
+	} else {
+		status = StatusStopped
+	}
+
+	return status
+}
+
+/*
+WorkerCount returns the current count of workers.
+*/
+func (tp *ThreadPool) WorkerCount() int {
+	tp.workerMapLock.Lock()
+	defer tp.workerMapLock.Unlock()
+	return len(tp.workerMap)
+}
+
+/*
+WaitAll waits for all workers to become idle.
+*/
+func (tp *ThreadPool) WaitAll() {
+
+	// Wake up all workers
+
+	tp.newTaskCond.Broadcast()
+
+	time.Sleep(5 * time.Nanosecond)
+
+	for true {
+
+		tp.workerMapLock.Lock()
+		tp.queueLock.Lock()
+
+		// Get total number of workers and idle workers
+
+		workerCount := len(tp.workerMap)
+		workerIdleCount := len(tp.workerIdleMap)
+
+		// Get number of pending tasks
+
+		tasks := tp.queue.Size()
+
+		tp.queueLock.Unlock()
+		tp.workerMapLock.Unlock()
+
+		// Only leave this loop if either no workers are left or if all
+		// tasks are done and all workers are idle
+
+		if workerCount == 0 || (workerCount == workerIdleCount && tasks == 0) {
+			break
+		}
+
+		time.Sleep(5 * time.Nanosecond)
+
+		// Broadcast again and again until all workers are idle
+
+		tp.newTaskCond.Broadcast()
+	}
+}
+
+/*
+JoinAll processes all remaining tasks and kills off all workers afterwards.
+*/
+func (tp *ThreadPool) JoinAll() {
+
+	// Tell all workers to die
+
+	tp.workerMapLock.Lock()
+	tp.workerKill = -1
+	tp.workerMapLock.Unlock()
+
+	tp.newTaskCond.Broadcast()
+
+	for true {
+
+		tp.workerMapLock.Lock()
+		tp.queueLock.Lock()
+
+		// Get total number of workers
+
+		workerCount := len(tp.workerMap)
+
+		// Get number of pending tasks
+
+		tasks := tp.queue.Size()
+
+		tp.queueLock.Unlock()
+		tp.workerMapLock.Unlock()
+
+		// Only leave this loop if no workers are existing and all tasks are done
+
+		if workerCount == 0 && tasks == 0 {
+			break
+		}
+
+		time.Sleep(5 * time.Nanosecond)
+
+		// Broadcast again and again until all workers are dead
+
+		tp.newTaskCond.Broadcast()
+	}
+}
+
+/*
+ThreadPoolWorker models a worker in the thread pool.
+*/
+type ThreadPoolWorker struct {
+	id   uint64      // ID of the thread pool worker
+	pool *ThreadPool // Thread pool of this worker
+}
+
+/*
+run lets this worker run tasks.
+*/
+func (w *ThreadPoolWorker) run() {
+
+	for true {
+
+		// Try to get the next task
+
+		task := w.pool.getTask()
+
+		// Exit if there is not new task
+
+		if task == nil {
+			break
+		}
+
+		_, isIdleTask := task.(*idleTask)
+
+		if isIdleTask {
+
+			// Register this worker as idle
+
+			w.pool.workerMapLock.Lock()
+			w.pool.workerIdleMap[w.id] = w
+			w.pool.workerMapLock.Unlock()
+		}
+
+		// Run the task
+
+		if err := task.Run(); err != nil {
+			task.HandleError(err)
+		}
+
+		if isIdleTask {
+			w.pool.workerMapLock.Lock()
+			delete(w.pool.workerIdleMap, w.id)
+			w.pool.workerMapLock.Unlock()
+		}
+	}
+
+	// Remove worker from workerMap
+
+	w.pool.workerMapLock.Lock()
+	delete(w.pool.workerMap, w.id)
+	w.pool.workerMapLock.Unlock()
+}
+
+/*
+idleTask is the internal idle task.
+*/
+type idleTask struct {
+	tp *ThreadPool
+}
+
+/*
+Run the idle task.
+*/
+func (t *idleTask) Run() error {
+	t.tp.newTaskCond.L.Lock()
+	t.tp.newTaskCond.Wait()
+	t.tp.newTaskCond.L.Unlock()
+	return nil
+}
+
+func (t *idleTask) HandleError(e error) {
+	panic(e.Error())
+}

+ 415 - 0
engine/pool/threadpool_test.go

@@ -0,0 +1,415 @@
+/*
+ * Public Domain Software
+ *
+ * I (Matthias Ladkau) am the author of the source code in this file.
+ * I have placed the source code in this file in the public domain.
+ *
+ * For further information see: http://creativecommons.org/publicdomain/zero/1.0/
+ */
+
+package pool
+
+import (
+	"bytes"
+	"errors"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+)
+
+type testTask struct {
+	task         func() error
+	errorHandler func(e error)
+}
+
+func (t *testTask) Run() error {
+	return t.task()
+}
+
+func (t *testTask) HandleError(e error) {
+	t.errorHandler(e)
+}
+
+func TestDefaultTaskQueue(t *testing.T) {
+	var taskFinishCounter int
+	var tq DefaultTaskQueue
+
+	if res := tq.Size(); res != 0 {
+		t.Error("Initial size should be empty not: ", res)
+		return
+	}
+
+	if res := tq.Pop(); res != nil {
+		t.Error("Unexpected result: ", res)
+		return
+	}
+
+	tq.Clear()
+
+	if res := tq.Size(); res != 0 {
+		t.Error("Initial size should be empty not: ", res)
+		return
+	}
+
+	if res := tq.Pop(); res != nil {
+		t.Error("Unexpected result: ", res)
+		return
+	}
+
+	tq.Push(&testTask{func() error {
+		taskFinishCounter++
+		return nil
+	}, nil})
+	tq.Push(&testTask{func() error {
+		taskFinishCounter++
+		return nil
+	}, nil})
+	tq.Push(&testTask{func() error {
+		taskFinishCounter++
+		return nil
+	}, nil})
+
+	if res := tq.Size(); res != 3 {
+		t.Error("Unexpected result: ", res)
+		return
+	}
+
+	// Execute the functions
+
+	tq.Pop().Run()
+
+	if res := tq.Size(); res != 2 {
+		t.Error("Unexpected result: ", res)
+		return
+	}
+
+	tq.Pop().Run()
+
+	if res := tq.Size(); res != 1 {
+		t.Error("Unexpected result: ", res)
+		return
+	}
+
+	tq.Pop().Run()
+
+	if res := tq.Size(); res != 0 {
+		t.Error("Unexpected result: ", res)
+		return
+	}
+
+	if res := tq.Pop(); res != nil {
+		t.Error("Unexpected result: ", res)
+		return
+	}
+
+	if taskFinishCounter != 3 {
+		t.Error("Unexpected result: ", taskFinishCounter)
+		return
+	}
+}
+
+func TestThreadPool(t *testing.T) {
+	var taskFinishCounter int
+	taskFinishCounterLock := &sync.Mutex{}
+
+	tp := NewThreadPool()
+
+	tp.SetWorkerCount(-10, true)
+	tp.TooManyThreshold = 1
+
+	if status := tp.Status(); status != StatusStopped {
+		t.Error("Unexpected status:", status)
+		return
+	}
+
+	tp.SetWorkerCount(3, true)
+
+	if status := tp.Status(); status != StatusRunning {
+		t.Error("Unexpected status:", status)
+		return
+	}
+
+	if workers := len(tp.workerMap); workers != 3 {
+		t.Error("Unepxected state:", workers)
+		return
+	}
+
+	tp.AddTask(&testTask{func() error {
+		taskFinishCounterLock.Lock()
+		taskFinishCounter++
+		taskFinishCounterLock.Unlock()
+		return nil
+	}, nil})
+	tp.AddTask(&testTask{func() error {
+		taskFinishCounterLock.Lock()
+		taskFinishCounter++
+		taskFinishCounterLock.Unlock()
+		return nil
+	}, nil})
+	tp.AddTask(&testTask{func() error {
+		taskFinishCounterLock.Lock()
+		taskFinishCounter++
+		taskFinishCounterLock.Unlock()
+		return nil
+	}, nil})
+
+	tp.JoinAll()
+
+	if workers := len(tp.workerMap); workers != 0 {
+		t.Error("Unepxected state:", workers)
+		return
+	}
+
+	if taskFinishCounter != 3 {
+		t.Error("Unexpected result: ", taskFinishCounter)
+		return
+	}
+
+	if status := tp.Status(); status != StatusStopped {
+		t.Error("Unexpected status:", status)
+		return
+	}
+
+	tp.AddTask(&testTask{func() error {
+		taskFinishCounterLock.Lock()
+		taskFinishCounter++
+		taskFinishCounterLock.Unlock()
+		return nil
+	}, nil})
+	tp.AddTask(&testTask{func() error {
+		taskFinishCounterLock.Lock()
+		taskFinishCounter++
+		taskFinishCounterLock.Unlock()
+		return nil
+	}, nil})
+	tp.AddTask(&testTask{func() error {
+		taskFinishCounterLock.Lock()
+		taskFinishCounter++
+		taskFinishCounterLock.Unlock()
+		return nil
+	}, nil})
+	tp.AddTask(&testTask{func() error {
+		taskFinishCounterLock.Lock()
+		taskFinishCounter++
+		taskFinishCounterLock.Unlock()
+		time.Sleep(10 * time.Millisecond)
+		return nil
+	}, nil})
+
+	if status := tp.Status(); status != StatusStopped {
+		t.Error("Unexpected status:", status)
+		return
+	}
+
+	tp.SetWorkerCount(3, false)
+
+	if workers := len(tp.workerMap); workers != 3 {
+		t.Error("Unepxected state:", workers)
+		return
+	}
+
+	// Let the workers go into the idle state
+
+	time.Sleep(20 * time.Millisecond)
+
+	// Reduce the number of workers
+
+	tp.SetWorkerCount(1, true)
+
+	if workers := len(tp.workerMap); workers != 1 {
+		t.Error("Unepxected state:", workers)
+		return
+	}
+
+	tp.AddTask(&testTask{func() error {
+		taskFinishCounterLock.Lock()
+		taskFinishCounter++
+		taskFinishCounterLock.Unlock()
+		return nil
+	}, nil})
+	tp.AddTask(&testTask{func() error {
+		taskFinishCounterLock.Lock()
+		taskFinishCounter++
+		taskFinishCounterLock.Unlock()
+		time.Sleep(10 * time.Millisecond)
+		return nil
+	}, nil})
+
+	// Set the kill value
+
+	tp.workerKill = -1
+
+	if status := tp.Status(); status != StatusStopping {
+		t.Error("Unexpected status:", status)
+		return
+	}
+
+	tp.WaitAll()
+
+	tp.SetWorkerCount(-5, true)
+
+	if workers := len(tp.workerMap); workers != 0 {
+		t.Error("Unepxected state:", workers)
+		return
+	}
+
+	tp.AddTask(&testTask{func() error {
+		taskFinishCounterLock.Lock()
+		taskFinishCounter++
+		taskFinishCounterLock.Unlock()
+		return nil
+	}, nil})
+
+	tp.WaitAll()
+
+	if taskFinishCounter != 9 {
+		t.Error("Unexpected result: ", taskFinishCounter)
+		return
+	}
+
+	tp.SetWorkerCount(1, false)
+
+	tp.WaitAll()
+
+	if taskFinishCounter != 10 {
+		t.Error("Unexpected result: ", taskFinishCounter)
+		return
+	}
+
+	tp.SetWorkerCount(0, true)
+
+	if status := tp.Status(); status != StatusStopped {
+		t.Error("Unexpected status:", status)
+		return
+	}
+}
+
+func TestThreadPoolThresholds(t *testing.T) {
+	var taskFinishCounter int
+	taskFinishCounterLock := &sync.Mutex{}
+
+	task := &testTask{func() error {
+		time.Sleep(time.Millisecond * 5)
+		taskFinishCounterLock.Lock()
+		taskFinishCounter++
+		taskFinishCounterLock.Unlock()
+		return nil
+	}, nil}
+
+	var buf bytes.Buffer
+
+	tp := NewThreadPool()
+
+	tp.TooFewThreshold = 1
+	tp.TooManyThreshold = 5
+
+	tp.TooFewCallback = func() {
+		taskFinishCounterLock.Lock()
+		buf.WriteString("low")
+		taskFinishCounterLock.Unlock()
+	}
+	tp.TooManyCallback = func() {
+		taskFinishCounterLock.Lock()
+		buf.WriteString("high")
+		taskFinishCounterLock.Unlock()
+	}
+
+	tp.SetWorkerCount(10, false)
+
+	for i := 0; i < 10; i++ {
+		tp.AddTask(task)
+	}
+
+	if wc := tp.WorkerCount(); wc != 10 {
+		t.Error("Unexpected result:", wc)
+		return
+	}
+
+	tp.SetWorkerCount(10, false)
+
+	tp.WaitAll()
+
+	if wc := tp.WorkerCount(); wc != 10 {
+		t.Error("Unexpected result:", wc)
+		return
+	}
+
+	tp.SetWorkerCount(10, false)
+
+	for i := 0; i < 10; i++ {
+		tp.AddTask(task)
+	}
+
+	tp.WaitAll()
+
+	if wc := tp.WorkerCount(); wc != 10 {
+		t.Error("Unexpected result:", wc)
+		return
+	}
+
+	if taskFinishCounter != 20 {
+		t.Error("Unexpected result:", taskFinishCounter)
+		return
+	}
+
+	tp.JoinAll()
+
+	if wc := tp.WorkerCount(); wc != 0 {
+		t.Error("Unexpected result:", wc)
+		return
+	}
+
+	// Check that the callbacks where triggered twice each
+
+	if !strings.Contains(buf.String(), "high") {
+		t.Error("Unexpected result:", buf.String())
+		return
+	}
+	if !strings.Contains(buf.String(), "low") {
+		t.Error("Unexpected result:", buf.String())
+		return
+	}
+}
+
+func TestThreadPoolIdleTaskPanic(t *testing.T) {
+
+	defer func() {
+		if r := recover(); r == nil {
+			t.Error("Error handling on the idle task did not cause a panic")
+		}
+	}()
+
+	// Run error handling function of idle task
+
+	idleTask := &idleTask{}
+	idleTask.HandleError(nil)
+}
+
+func TestThreadPoolErrorHandling(t *testing.T) {
+
+	// Test error normal task handling
+
+	var buf bytes.Buffer
+
+	task := &testTask{func() error {
+		return errors.New("testerror")
+	}, func(e error) {
+		buf.WriteString(e.Error())
+	}}
+
+	tp := NewThreadPool()
+
+	tp.AddTask(task)
+
+	if buf.String() != "" {
+		t.Error("Unexpected result:", buf.String())
+	}
+
+	tp.SetWorkerCount(1, false)
+	tp.JoinAll()
+
+	if buf.String() != "testerror" {
+		t.Error("Unexpected result:", buf.String())
+	}
+}

+ 11 - 11
engine/processor.go

@@ -14,8 +14,8 @@ import (
 	"fmt"
 	"sync"
 
-	"devt.de/krotik/common/flowutil"
-	"devt.de/krotik/common/pools"
+	"devt.de/krotik/ecal/engine/pool"
+	"devt.de/krotik/ecal/engine/pubsub"
 )
 
 /*
@@ -131,13 +131,13 @@ Process -> Triggering -> Matching -> Fire Rule
 */
 type eventProcessor struct {
 	id                  uint64                // Processor ID
-	pool                *pools.ThreadPool     // Thread pool of this processor
+	pool                *pool.ThreadPool      // Thread pool of this processor
 	workerCount         int                   // Number of threads for this processor
 	failOnFirstError    bool                  // Stop rule execution on first error in an event trigger sequence
 	ruleIndex           RuleIndex             // Container for loaded rules
 	triggeringCache     map[string]bool       // Cache which remembers which events are triggering
 	triggeringCacheLock sync.Mutex            // Lock for triggeringg cache
-	messageQueue        *flowutil.EventPump   // Queue for message passing between components
+	messageQueue        *pubsub.EventPump     // Queue for message passing between components
 	rmErrorObserver     func(rm *RootMonitor) // Error observer for root monitors
 }
 
@@ -145,8 +145,8 @@ type eventProcessor struct {
 NewProcessor creates a new event processor with a given number of workers.
 */
 func NewProcessor(workerCount int) Processor {
-	ep := flowutil.NewEventPump()
-	return &eventProcessor{newProcID(), pools.NewThreadPoolWithQueue(NewTaskQueue(ep)),
+	ep := pubsub.NewEventPump()
+	return &eventProcessor{newProcID(), pool.NewThreadPoolWithQueue(NewTaskQueue(ep)),
 		workerCount, false, NewRuleIndex(), nil, sync.Mutex{}, ep, nil}
 }
 
@@ -171,7 +171,7 @@ func (p *eventProcessor) Reset() error {
 
 	// Check that the thread pool is stopped
 
-	if p.pool.Status() != pools.StatusStopped {
+	if p.pool.Status() != pool.StatusStopped {
 		return fmt.Errorf("Cannot reset processor if it has not stopped")
 	}
 
@@ -195,7 +195,7 @@ func (p *eventProcessor) AddRule(rule *Rule) error {
 
 	// Check that the thread pool is stopped
 
-	if p.pool.Status() != pools.StatusStopped {
+	if p.pool.Status() != pool.StatusStopped {
 		return fmt.Errorf("Cannot add rule if the processor has not stopped")
 	}
 
@@ -233,7 +233,7 @@ func (p *eventProcessor) Finish() {
 Stopped returns if the processor is stopped.
 */
 func (p *eventProcessor) Stopped() bool {
-	return p.pool.Status() == pools.StatusStopped
+	return p.pool.Status() == pool.StatusStopped
 }
 
 /*
@@ -334,7 +334,7 @@ func (p *eventProcessor) AddEvent(event *Event, eventMonitor Monitor) (Monitor,
 
 	// Check that the thread pool is running
 
-	if p.pool.Status() == pools.StatusStopped {
+	if p.pool.Status() == pool.StatusStopped {
 		return nil, fmt.Errorf("Cannot add event if the processor is not running")
 	}
 
@@ -377,7 +377,7 @@ func (p *eventProcessor) AddEvent(event *Event, eventMonitor Monitor) (Monitor,
 
 	EventTracer.record(event, "eventProcessor.AddEvent", "Adding task to thread pool")
 
-	// Kick off event processing (see Processor.processEvent)
+	// Kick off event processing (see Processor.ProcessEvent)
 
 	p.pool.AddTask(&Task{p, eventMonitor, event})
 

+ 3 - 3
engine/processor_test.go

@@ -20,7 +20,7 @@ import (
 	"time"
 
 	"devt.de/krotik/common/errorutil"
-	"devt.de/krotik/common/pools"
+	"devt.de/krotik/ecal/engine/pool"
 )
 
 func TestProcessorSimpleCascade(t *testing.T) {
@@ -440,14 +440,14 @@ func TestProcessorScopeHandling(t *testing.T) {
 	proc.AddRule(rule1)
 	proc.AddRule(rule2)
 
-	if proc.Status() != pools.StatusStopped || !proc.Stopped() {
+	if proc.Status() != pool.StatusStopped || !proc.Stopped() {
 		t.Error("Unexpected status:", proc.Status(), proc.Stopped())
 		return
 	}
 
 	proc.Start()
 
-	if proc.Status() != pools.StatusRunning || proc.Stopped() {
+	if proc.Status() != pool.StatusRunning || proc.Stopped() {
 		t.Error("Unexpected status:", proc.Status(), proc.Stopped())
 		return
 	}

+ 169 - 0
engine/pubsub/eventpump.go

@@ -0,0 +1,169 @@
+/*
+ * ECAL
+ *
+ * Copyright 2020 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+/*
+Package pubsub contains a pub/sub event handling implementation.
+*/
+package pubsub
+
+import "sync"
+
+/*
+EventPump implements the observer pattern. Observers can subscribe to receive
+notifications on certain events. Observed objects can send notifications.
+*/
+type EventPump struct {
+	eventsObservers     map[string]map[interface{}][]EventCallback
+	eventsObserversLock *sync.Mutex
+}
+
+/*
+EventCallback is the callback function which is called when an event was observed.
+*/
+type EventCallback func(event string, eventSource interface{})
+
+/*
+NewEventPump creates a new event pump.
+*/
+func NewEventPump() *EventPump {
+	return &EventPump{make(map[string]map[interface{}][]EventCallback), &sync.Mutex{}}
+}
+
+/*
+AddObserver adds a new observer to the event pump. An observer can subscribe to
+a given event from a given event source. If the event is an empty string then
+the observer subscribes to all events from the event source. If the
+eventSource is nil then the observer subscribes to all event sources.
+*/
+func (ep *EventPump) AddObserver(event string, eventSource interface{}, callback EventCallback) {
+
+	// Ignore requests with non-existent callbacks
+
+	if callback == nil {
+		return
+	}
+
+	ep.eventsObserversLock.Lock()
+	defer ep.eventsObserversLock.Unlock()
+
+	sources, ok := ep.eventsObservers[event]
+	if !ok {
+		sources = make(map[interface{}][]EventCallback)
+		ep.eventsObservers[event] = sources
+	}
+
+	callbacks, ok := sources[eventSource]
+	if !ok {
+		callbacks = []EventCallback{callback}
+		sources[eventSource] = callbacks
+	} else {
+		sources[eventSource] = append(callbacks, callback)
+	}
+}
+
+/*
+PostEvent posts an event to this event pump from a given event source.
+*/
+func (ep *EventPump) PostEvent2(event string, eventSource interface{}) {
+	if event == "" || eventSource == nil {
+		panic("Posting an event requires the event and its source")
+	}
+
+	ep.eventsObserversLock.Lock()
+	defer ep.eventsObserversLock.Unlock()
+
+	postEvent := func(event string, eventSource interface{}) {
+
+		if sources, ok := ep.eventsObservers[event]; ok {
+			for source, callbacks := range sources {
+				if source == eventSource || source == nil {
+					for _, callback := range callbacks {
+						ep.eventsObserversLock.Unlock()
+						callback(event, eventSource)
+						ep.eventsObserversLock.Lock()
+					}
+				}
+			}
+		}
+
+	}
+
+	postEvent(event, eventSource)
+	postEvent("", eventSource)
+}
+
+/*
+PostEvent posts an event to this event pump from a given event source.
+*/
+func (ep *EventPump) PostEvent(event string, eventSource interface{}) {
+	if event == "" || eventSource == nil {
+		panic("Posting an event requires the event and its source")
+	}
+
+	postEvent := func(event string, eventSource interface{}) {
+
+		ep.eventsObserversLock.Lock()
+		sources, ok := ep.eventsObservers[event]
+		if ok {
+
+			// Create a local copy of sources before executing the callbacks
+
+			origsources := sources
+			sources = make(map[interface{}][]EventCallback)
+			for source, callbacks := range origsources {
+				sources[source] = callbacks
+			}
+		}
+		ep.eventsObserversLock.Unlock()
+
+		if ok {
+			for source, callbacks := range sources {
+				if source == eventSource || source == nil {
+					for _, callback := range callbacks {
+						callback(event, eventSource)
+					}
+				}
+			}
+		}
+
+	}
+
+	postEvent(event, eventSource)
+	postEvent("", eventSource)
+}
+
+/*
+RemoveObservers removes observers from the event pump. If the event is an
+empty string then the observer is removed from all events. If the
+eventSource is nil then all observers of the event are dropped.
+*/
+func (ep *EventPump) RemoveObservers(event string, eventSource interface{}) {
+	ep.eventsObserversLock.Lock()
+	defer ep.eventsObserversLock.Unlock()
+
+	// Clear everything
+
+	if event == "" && eventSource == nil {
+		ep.eventsObservers = make(map[string]map[interface{}][]EventCallback)
+
+	} else if eventSource == nil {
+		delete(ep.eventsObservers, event)
+
+	} else if event == "" {
+		for _, sources := range ep.eventsObservers {
+			delete(sources, eventSource)
+		}
+
+	} else {
+		if sources, ok := ep.eventsObservers[event]; ok {
+			delete(sources, eventSource)
+		}
+	}
+}

+ 239 - 0
engine/pubsub/eventpump_test.go

@@ -0,0 +1,239 @@
+/*
+ * ECAL
+ *
+ * Copyright 2020 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package pubsub
+
+import (
+	"bytes"
+	"errors"
+	"fmt"
+	"sort"
+	"testing"
+)
+
+func TestEventPump(t *testing.T) {
+	var res []string
+
+	source1 := &bytes.Buffer{}
+	source2 := errors.New("TEST")
+
+	ep := NewEventPump()
+
+	// Add observer 1
+
+	ep.AddObserver("event1", source1, func(event string, eventSource interface{}) {
+		if eventSource != source1 {
+			t.Error("Unexpected event source:", eventSource)
+			return
+		}
+		res = append(res, "1")
+		sort.Strings(res)
+
+	})
+
+	// Add observer 2
+
+	ep.AddObserver("event2", source2, func(event string, eventSource interface{}) {
+		if eventSource != source2 {
+			t.Error("Unexpected event source:", eventSource)
+			return
+		}
+		res = append(res, "2")
+		sort.Strings(res)
+
+	})
+
+	// Add observer 3
+
+	ep.AddObserver("event2", source2, func(event string, eventSource interface{}) {
+		if eventSource != source2 {
+			t.Error("Unexpected event source:", eventSource)
+			return
+		}
+		res = append(res, "3")
+		sort.Strings(res)
+
+	})
+
+	// Run the tests
+
+	// Test 1 straight forward case
+
+	ep.PostEvent("event1", source1)
+
+	if fmt.Sprint(res) != "[1]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	res = make([]string, 0) // Reset res
+
+	ep.PostEvent("event2", source2)
+
+	if fmt.Sprint(res) != "[2 3]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	res = make([]string, 0) // Reset res
+
+	ep.PostEvent("event1", source2)
+
+	if fmt.Sprint(res) != "[]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	// Add observer 4
+
+	ep.AddObserver("", source1, func(event string, eventSource interface{}) {
+		if eventSource != source1 {
+			t.Error("Unexpected event source:", eventSource)
+			return
+		}
+		res = append(res, "4")
+		sort.Strings(res)
+	})
+
+	// Add observer 5
+
+	ep.AddObserver("", nil, func(event string, eventSource interface{}) {
+		res = append(res, "5")
+		sort.Strings(res)
+	})
+
+	// Add observer 6
+
+	ep.AddObserver("", source2, func(event string, eventSource interface{}) {
+		if eventSource != source2 {
+			t.Error("Unexpected event source:", eventSource)
+			return
+		}
+		res = append(res, "6")
+		sort.Strings(res)
+	})
+
+	res = make([]string, 0) // Reset res
+
+	ep.PostEvent("event1", source2)
+
+	if fmt.Sprint(res) != "[5 6]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	res = make([]string, 0) // Reset res
+
+	ep.PostEvent("event3", source2)
+
+	if fmt.Sprint(res) != "[5 6]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	res = make([]string, 0) // Reset res
+
+	ep.PostEvent("event3", source1)
+
+	if fmt.Sprint(res) != "[4 5]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	res = make([]string, 0) // Reset res
+
+	ep.PostEvent("event3", errors.New("test"))
+
+	if fmt.Sprint(res) != "[5]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	// Remove observers
+
+	res = make([]string, 0) // Reset res
+
+	ep.PostEvent("event2", source2)
+
+	if fmt.Sprint(res) != "[2 3 5 6]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+	ep.RemoveObservers("event2", source2)
+
+	res = make([]string, 0) // Reset res
+
+	ep.PostEvent("event2", source2)
+
+	if fmt.Sprint(res) != "[5 6]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	ep.RemoveObservers("", source2) // Remove all handlers specific to source 2
+
+	res = make([]string, 0) // Reset res
+
+	ep.PostEvent("event2", source2)
+
+	if fmt.Sprint(res) != "[5]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	ep.PostEvent("event1", source1)
+
+	if fmt.Sprint(res) != "[1 4 5 5]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	ep.RemoveObservers("event1", nil) // Remove all handlers specific to source 2
+
+	res = make([]string, 0) // Reset res
+
+	ep.PostEvent("event2", source2)
+
+	if fmt.Sprint(res) != "[5]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	ep.RemoveObservers("", nil) // Remove all handlers
+
+	res = make([]string, 0) // Reset res
+
+	ep.PostEvent("event2", source2)
+
+	if fmt.Sprint(res) != "[]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	// This call should be ignored
+
+	ep.AddObserver("event1", source1, nil)
+
+	if fmt.Sprint(ep.eventsObservers) != "map[]" {
+		t.Error("Event map should be empty at this point:", ep.eventsObservers)
+		return
+	}
+}
+
+func TestWrongPostEvent(t *testing.T) {
+	defer func() {
+		if r := recover(); r == nil {
+			t.Error("Posting events with empty values shouldn't work.")
+		}
+	}()
+
+	ep := NewEventPump()
+	ep.PostEvent("", nil)
+}

+ 6 - 7
engine/taskqueue.go

@@ -18,10 +18,10 @@ import (
 	"sync"
 
 	"devt.de/krotik/common/errorutil"
-	"devt.de/krotik/common/flowutil"
-	"devt.de/krotik/common/pools"
 	"devt.de/krotik/common/sortutil"
 	"devt.de/krotik/common/stringutil"
+	"devt.de/krotik/ecal/engine/pool"
+	"devt.de/krotik/ecal/engine/pubsub"
 )
 
 /*
@@ -80,7 +80,6 @@ func (t *Task) Run() error {
 
 	if len(errors) > 0 {
 		EventTracer.record(t.e, "Task.Run", fmt.Sprint("Task had errors:", errors))
-
 		return &TaskError{errors, t.e, t.m}
 	}
 
@@ -108,13 +107,13 @@ TaskQueue models the queue of tasks for a processor.
 type TaskQueue struct {
 	lock         *sync.Mutex                        // Lock for queue
 	queues       map[uint64]*sortutil.PriorityQueue // Map from root monitor id to priority queue
-	messageQueue *flowutil.EventPump                // Queue for message passing between components
+	messageQueue *pubsub.EventPump                  // Queue for message passing between components
 }
 
 /*
 NewTaskQueue creates a new TaskQueue object.
 */
-func NewTaskQueue(ep *flowutil.EventPump) *TaskQueue {
+func NewTaskQueue(ep *pubsub.EventPump) *TaskQueue {
 	return &TaskQueue{&sync.Mutex{}, make(map[uint64]*sortutil.PriorityQueue), ep}
 }
 
@@ -131,7 +130,7 @@ func (tq *TaskQueue) Clear() {
 /*
 Pop returns the next task from the queue.
 */
-func (tq *TaskQueue) Pop() pools.Task {
+func (tq *TaskQueue) Pop() pool.Task {
 	tq.lock.Lock()
 	defer tq.lock.Unlock()
 
@@ -181,7 +180,7 @@ func (tq *TaskQueue) Pop() pools.Task {
 /*
 Push adds another task to the queue.
 */
-func (tq *TaskQueue) Push(t pools.Task) {
+func (tq *TaskQueue) Push(t pool.Task) {
 	tq.lock.Lock()
 	defer tq.lock.Unlock()
 

+ 1 - 2
interpreter/provider.go

@@ -9,9 +9,8 @@
  */
 
 // TODO:
-// Event function: event
+// Document event processing with sinks
 // Context supporting final
-// Event handling
 // Inline escaping in strings "bla {1+1} bla"
 
 package interpreter

+ 113 - 8
interpreter/rt_sink_test.go

@@ -126,7 +126,7 @@ res := addEventAndWait("request", "web.page.index", {
 }, {
 	"request.read" : false
 })
-log("ErrorResult:", res[0].errors, " ", res == null)
+log("ErrorResult:", res, " ", res == null)
 `, vs)
 
 	if err != nil {
@@ -140,14 +140,25 @@ rule2 - Tracking user:foo
 rule3 - Logging user:foo
 ErrorResult:null true
 rule2 - Tracking user:bar
-ErrorResult:{
-  "rule2": {
-    "detail": [
-      123
-    ],
-    "message": "ECAL error in ECALTestRuntime: Error in sink (User bar was here) (Line:17 Pos:13)"
+ErrorResult:[
+  {
+    "errors": {
+      "rule2": {
+        "detail": [
+          123
+        ],
+        "message": "ECAL error in ECALTestRuntime: Error in sink (User bar was here) (Line:17 Pos:13)"
+      }
+    },
+    "event": {
+      "kind": "web.page.index",
+      "name": "request",
+      "state": {
+        "user": "bar"
+      }
+    }
   }
-} false`[1:] {
+] false`[1:] {
 		t.Error("Unexpected result:", testlogger.String())
 		return
 	}
@@ -186,4 +197,98 @@ error: {
 		t.Error("Unexpected result:", testlogger.String())
 		return
 	}
+
+	// Test case 3 - rule suppression
+
+	_, err = UnitTestEval(
+		`
+sink rule1
+    kindmatch [ "test.event" ],
+    suppresses [ "rule3" ],
+	{
+        log("rule1 - Handling request: ", event.kind)
+	}
+
+sink rule2
+    kindmatch [ "test.*" ],
+    priority 1,  # Ensure this rule is always executed after rule1
+	{
+        log("rule2 - Handling request: ", event.kind)
+	}
+
+sink rule3
+    kindmatch [ "test.*" ],
+    priority 1,  # Ensure this rule is always executed after rule1
+	{
+        log("rule3 - Handling request: ", event.kind)
+	}
+
+err := addEventAndWait("myevent", "test.event", {})
+
+if len(err) > 0 {
+    error(err[0].errors)
+}
+`, vs)
+
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	if testlogger.String() != `
+rule1 - Handling request: test.event
+rule2 - Handling request: test.event`[1:] {
+		t.Error("Unexpected result:", testlogger.String())
+		return
+	}
+
+	// Test case 4 - state match
+
+	_, err = UnitTestEval(
+		`
+sink rule1
+    kindmatch [ "test.event" ],
+    statematch { "a" : null },
+	{
+        log("rule1 - Handling request: ", event.kind)
+	}
+
+sink rule2
+    kindmatch [ "test.*" ],
+    priority 1,
+    statematch { "b" : 1 },
+	{
+        log("rule2 - Handling request: ", event.kind)
+	}
+
+sink rule3
+    kindmatch [ "test.*" ],
+    priority 2,
+    statematch { "c" : 2 },
+	{
+        log("rule3 - Handling request: ", event.kind)
+	}
+
+err := addEventAndWait("myevent", "test.event", {
+	"a" : "foo",
+	"b" : 1,
+})
+
+if len(err) > 0 {
+    error(err[0].errors)
+}
+`, vs)
+
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	if testlogger.String() != `
+rule1 - Handling request: test.event
+rule2 - Handling request: test.event`[1:] {
+		t.Error("Unexpected result:", testlogger.String())
+		return
+	}
+
 }