Browse Source

fix: Exposing threadpool a bit more

Matthias Ladkau 3 years ago
parent
commit
79a2d93258

+ 21 - 0
engine/pool/threadpool.go

@@ -147,6 +147,26 @@ type ThreadPool struct {
 	tooFewTriggered bool   // Flag if too many tasks threshold was passed
 }
 
+/*
+State returns the current state of the ThreadPool.
+*/
+func (tp *ThreadPool) State() map[string]interface{} {
+
+	getIdsFromWorkerMap := func(m map[uint64]*ThreadPoolWorker) []uint64 {
+		var ret []uint64
+		for k := range m {
+			ret = append(ret, k)
+		}
+		return ret
+	}
+
+	return map[string]interface{}{
+		"TaskQueueSize":      tp.queue.Size(),
+		"TotalWorkerThreads": getIdsFromWorkerMap(tp.workerMap),
+		"IdleWorkerThreads":  getIdsFromWorkerMap(tp.workerIdleMap),
+	}
+}
+
 /*
 NewThreadPool creates a new thread pool.
 */
@@ -299,6 +319,7 @@ func (tp *ThreadPool) SetWorkerCount(count int, wait bool) {
 		for len(tp.workerMap) != count {
 			tid := tp.NewThreadID()
 			worker := &ThreadPoolWorker{tid, tp}
+
 			go worker.run()
 			tp.workerMap[tid] = worker
 		}

+ 6 - 0
engine/pool/threadpool_test.go

@@ -12,6 +12,7 @@ package pool
 import (
 	"bytes"
 	"errors"
+	"fmt"
 	"strings"
 	"sync"
 	"testing"
@@ -412,4 +413,9 @@ func TestThreadPoolErrorHandling(t *testing.T) {
 	if buf.String() != "testerror" {
 		t.Error("Unexpected result:", buf.String())
 	}
+
+	if state := fmt.Sprint(tp.State()); state == `` {
+		t.Error("Unexpected state:", state)
+		return
+	}
 }

+ 11 - 2
engine/processor.go

@@ -12,6 +12,7 @@ package engine
 
 import (
 	"fmt"
+	"os"
 	"sync"
 
 	"devt.de/krotik/ecal/engine/pool"
@@ -152,7 +153,15 @@ NewProcessor creates a new event processor with a given number of workers.
 */
 func NewProcessor(workerCount int) Processor {
 	ep := pubsub.NewEventPump()
-	return &eventProcessor{newProcID(), pool.NewThreadPoolWithQueue(NewTaskQueue(ep)),
+
+	pool := pool.NewThreadPoolWithQueue(NewTaskQueue(ep))
+
+	pool.TooManyThreshold = 10
+	pool.TooManyCallback = func() {
+		fmt.Fprintf(os.Stderr, "Warning: The thread pool queue is filling up ...")
+	}
+
+	return &eventProcessor{newProcID(), pool,
 		workerCount, false, NewRuleIndex(), nil, sync.Mutex{}, ep, nil}
 }
 
@@ -487,7 +496,7 @@ func (p *eventProcessor) ProcessEvent(tid uint64, event *Event, parent Monitor)
 String returns a string representation the processor.
 */
 func (p *eventProcessor) String() string {
-	return fmt.Sprintf("RumbleProcessor %v (workers:%v)", p.ID(), p.workerCount)
+	return fmt.Sprintf("EventProcessor %v (workers:%v)", p.ID(), p.workerCount)
 }
 
 // Unique id creation

+ 6 - 1
engine/processor_test.go

@@ -49,7 +49,12 @@ func TestProcessorSimpleCascade(t *testing.T) {
 
 	proc := NewProcessor(1)
 
-	if res := fmt.Sprint(proc); res != "RumbleProcessor 1 (workers:1)" {
+	StderrSave := os.Stderr
+	os.Stderr = nil
+	proc.ThreadPool().TooManyCallback()
+	os.Stderr = StderrSave
+
+	if res := fmt.Sprint(proc); res != "EventProcessor 1 (workers:1)" {
 		t.Error("Unexpected result:", res)
 		return
 	}

+ 1 - 1
engine/taskqueue_test.go

@@ -142,7 +142,7 @@ func testTaskQueuePushPop(t *testing.T, tq *TaskQueue, proc Processor, event *Ev
 func testTaskQueueMisc(t *testing.T, tq *TaskQueue, t5 *Task) {
 	tq.Push(t5)
 
-	if fmt.Sprint(tq.queues) != "map[2:[ Task: RumbleProcessor 1 (workers:1) Monitor 5 (parent: Monitor 2 (parent: <nil> priority: 0 activated: false finished: false) priority: 10 activated: false finished: false) Event: DummyEvent main {} (10) ]]" {
+	if fmt.Sprint(tq.queues) != "map[2:[ Task: EventProcessor 1 (workers:1) Monitor 5 (parent: Monitor 2 (parent: <nil> priority: 0 activated: false finished: false) priority: 10 activated: false finished: false) Event: DummyEvent main {} (10) ]]" {
 		t.Error("Unexpected queue:", tq.queues)
 		return
 	}

+ 15 - 2
interpreter/debug.go

@@ -22,6 +22,7 @@ import (
 
 	"devt.de/krotik/common/datautil"
 	"devt.de/krotik/common/errorutil"
+	"devt.de/krotik/ecal/engine/pool"
 	"devt.de/krotik/ecal/parser"
 	"devt.de/krotik/ecal/scope"
 	"devt.de/krotik/ecal/util"
@@ -44,6 +45,7 @@ type ecalDebugger struct {
 	lastVisit                  int64                               // Last time the debugger had a state visit
 	mutexeOwners               map[string]uint64                   // A map of current mutex owners
 	mutexLog                   *datautil.RingBuffer                // A log of taken mutexes
+	threadpool                 *pool.ThreadPool                    // Reference to the thread pool of the processor
 }
 
 /*
@@ -109,6 +111,7 @@ func NewECALDebugger(globalVS parser.Scope) util.ECALDebugger {
 		lastVisit:                  0,
 		mutexeOwners:               nil,
 		mutexLog:                   nil,
+		threadpool:                 nil,
 	}
 }
 
@@ -194,6 +197,15 @@ func (ed *ecalDebugger) SetLockingState(mutexeOwners map[string]uint64, mutexLog
 	}
 }
 
+/*
+SetThreadPool sets the reference to the current used thread pool.
+*/
+func (ed *ecalDebugger) SetThreadPool(tp *pool.ThreadPool) {
+	if ed.threadpool == nil {
+		ed.threadpool = tp
+	}
+}
+
 /*
 VisitState is called for every state during the execution of a program.
 */
@@ -599,8 +611,9 @@ LockState returns the current locking state.
 */
 func (ed *ecalDebugger) LockState() interface{} {
 	return map[string]interface{}{
-		"log":    ed.mutexLog.StringSlice(),
-		"owners": ed.mutexeOwners,
+		"log":     ed.mutexLog.StringSlice(),
+		"owners":  ed.mutexeOwners,
+		"threads": ed.threadpool.State(),
 	}
 }
 

+ 6 - 1
interpreter/debug_test.go

@@ -519,7 +519,12 @@ mutex a { mutex a { log("test5") } }
 
 	if lsString != `{
   "log": [],
-  "owners": {}
+  "owners": {},
+  "threads": {
+    "IdleWorkerThreads": null,
+    "TaskQueueSize": 0,
+    "TotalWorkerThreads": null
+  }
 }` {
 		t.Error("Unexpected result:", lsString)
 		return

+ 1 - 1
interpreter/func_provider.go

@@ -1019,7 +1019,7 @@ func (pt *setPulseTrigger) Run(instanceID string, vs parser.Scope, is map[string
 					lastmicros = micros
 
 					monitor := proc.NewRootMonitor(nil, nil)
-					_, err := proc.AddEvent(event, monitor)
+					_, err := proc.AddEventAndWait(event, monitor)
 
 					if status := proc.Status(); status == "Stopped" || status == "Stopping" {
 						break

+ 1 - 0
interpreter/rt_general.go

@@ -62,6 +62,7 @@ func (rt *baseRuntime) Eval(vs parser.Scope, is map[string]interface{}, tid uint
 	if rt.erp.Debugger != nil {
 		err = rt.erp.Debugger.VisitState(rt.node, vs, tid)
 		rt.erp.Debugger.SetLockingState(rt.erp.MutexeOwners, rt.erp.MutexLog)
+		rt.erp.Debugger.SetThreadPool(rt.erp.Processor.ThreadPool())
 	}
 
 	return nil, err

+ 6 - 0
util/types.go

@@ -14,6 +14,7 @@ import (
 	"time"
 
 	"devt.de/krotik/common/datautil"
+	"devt.de/krotik/ecal/engine/pool"
 	"devt.de/krotik/ecal/parser"
 )
 
@@ -140,6 +141,11 @@ type ECALDebugger interface {
 	*/
 	SetLockingState(mutexeOwners map[string]uint64, mutexLog *datautil.RingBuffer)
 
+	/*
+	   SetThreadPool sets the reference to the current used thread pool.
+	*/
+	SetThreadPool(tp *pool.ThreadPool)
+
 	/*
 	   VisitState is called for every state during the execution of a program.
 	*/