Browse Source

fix: Better mutex management

Matthias Ladkau 3 years ago
parent
commit
80efb81841

+ 24 - 0
interpreter/debug.go

@@ -42,6 +42,8 @@ type ecalDebugger struct {
 	globalScope                parser.Scope                        // Global variable scope which can be used to transfer data
 	lock                       *sync.RWMutex                       // Lock for this debugger
 	lastVisit                  int64                               // Last time the debugger had a state visit
+	mutexeOwners               map[string]uint64                   // A map of current mutex owners
+	mutexLog                   *datautil.RingBuffer                // A log of taken mutexes
 }
 
 /*
@@ -105,6 +107,8 @@ func NewECALDebugger(globalVS parser.Scope) util.ECALDebugger {
 		globalScope:                globalVS,
 		lock:                       &sync.RWMutex{},
 		lastVisit:                  0,
+		mutexeOwners:               nil,
+		mutexLog:                   nil,
 	}
 }
 
@@ -180,6 +184,16 @@ func (ed *ecalDebugger) BreakOnError(flag bool) {
 	ed.breakOnError = flag
 }
 
+/*
+SetLockingState sets locking status information.
+*/
+func (ed *ecalDebugger) SetLockingState(mutexeOwners map[string]uint64, mutexLog *datautil.RingBuffer) {
+	if ed.mutexeOwners == nil {
+		ed.mutexeOwners = mutexeOwners
+		ed.mutexLog = mutexLog
+	}
+}
+
 /*
 VisitState is called for every state during the execution of a program.
 */
@@ -580,6 +594,16 @@ func (ed *ecalDebugger) Status() interface{} {
 	return res
 }
 
+/*
+LockState returns the current locking state.
+*/
+func (ed *ecalDebugger) LockState() interface{} {
+	return map[string]interface{}{
+		"log":    ed.mutexLog.StringSlice(),
+		"owners": ed.mutexeOwners,
+	}
+}
+
 /*
 Describe describes a thread currently observed by the debugger.
 */

+ 26 - 0
interpreter/debug_cmd.go

@@ -35,6 +35,7 @@ var DebugCommandsMap = map[string]util.DebugCommand{
 	"status":       &statusCommand{&inbuildDebugCommand{}},
 	"extract":      &extractCommand{&inbuildDebugCommand{}},
 	"inject":       &injectCommand{&inbuildDebugCommand{}},
+	"lockstate":    &lockstateCommand{&inbuildDebugCommand{}},
 }
 
 /*
@@ -404,3 +405,28 @@ DocString returns a descriptive text about this command.
 func (c *injectCommand) DocString() string {
 	return "Copies a value from the global variable scope into a suspended thread."
 }
+
+// lockstate
+// =========
+
+/*
+lockstateCommand inspects the locking state
+*/
+type lockstateCommand struct {
+	*inbuildDebugCommand
+}
+
+/*
+Execute the debug command and return its result. It must be possible to
+convert the output data into a JSON string.
+*/
+func (c *lockstateCommand) Run(debugger util.ECALDebugger, args []string) (interface{}, error) {
+	return debugger.LockState(), nil
+}
+
+/*
+DocString returns a descriptive text about this command.
+*/
+func (c *lockstateCommand) DocString() string {
+	return "Inspects the locking state."
+}

+ 28 - 0
interpreter/debug_test.go

@@ -444,6 +444,7 @@ log("test1")
 log("test2")
 test2()
 log("test4")
+mutex a { mutex a { log("test5") } }
 `, vs, erp)
 		if err != nil {
 			t.Error(err)
@@ -511,6 +512,19 @@ log("test4")
 		return
 	}
 
+	ls, err := testDebugger.HandleInput(fmt.Sprintf("lockstate"))
+
+	lsBytes, _ := json.MarshalIndent(ls, "", "  ")
+	lsString := string(lsBytes)
+
+	if lsString != `{
+  "log": [],
+  "owners": {}
+}` {
+		t.Error("Unexpected result:", lsString)
+		return
+	}
+
 	// Continue until the end
 
 	if _, err := testDebugger.HandleInput(fmt.Sprintf("cont 1 Resume")); err != nil {
@@ -531,10 +545,24 @@ log("test4")
     c (float64) : 2
     test1 (*interpreter.function) : ecal.function: test1 (Line 4, Pos 1)
     test2 (*interpreter.function) : ecal.function: test2 (Line 4, Pos 1)
+    block: mutex (Line:12 Pos:1) {
+        block: mutex (Line:12 Pos:11) {
+        }
+    }
 }` {
 		t.Error("Unexpected result:", vs)
 		return
 	}
+
+	ls, err = testDebugger.HandleInput(fmt.Sprintf("lockstate"))
+
+	lsBytes, _ = json.MarshalIndent(ls, "", "  ")
+	lsString = string(lsBytes)
+
+	if !strings.Contains(lsString, "took lock a with owner") || !strings.Contains(lsString, "attempted to take lock a twice") {
+		t.Error("Unexpected result:", lsString)
+		return
+	}
 }
 
 func waitForThreadSuspension(t *testing.T) uint64 {

+ 5 - 1
interpreter/provider.go

@@ -16,6 +16,7 @@ import (
 	"path/filepath"
 	"sync"
 
+	"devt.de/krotik/common/datautil"
 	"devt.de/krotik/common/timeutil"
 	"devt.de/krotik/ecal/config"
 	"devt.de/krotik/ecal/engine"
@@ -148,6 +149,9 @@ type ECALRuntimeProvider struct {
 	Logger        util.Logger            // Logger object for log messages
 	Processor     engine.Processor       // Processor of the ECA engine
 	Mutexes       map[string]*sync.Mutex // Map of named mutexes
+	MutexLog      *datautil.RingBuffer   // Ringbuffer to track locking events
+	MutexeOwners  map[string]uint64      // Map of mutex owners
+	MutexesMutex  *sync.Mutex            // Mutex for mutexes map
 	Cron          *timeutil.Cron         // Cron object for scheduled execution
 	Debugger      util.ECALDebugger      // Optional: ECAL Debugger object
 }
@@ -182,7 +186,7 @@ func NewECALRuntimeProvider(name string, importLocator util.ECALImportLocator, l
 	cron.Start()
 
 	return &ECALRuntimeProvider{name, importLocator, logger, proc,
-		make(map[string]*sync.Mutex), cron, nil}
+		make(map[string]*sync.Mutex), datautil.NewRingBuffer(1024), make(map[string]uint64), &sync.Mutex{}, cron, nil}
 }
 
 /*

+ 1 - 0
interpreter/rt_general.go

@@ -61,6 +61,7 @@ func (rt *baseRuntime) Eval(vs parser.Scope, is map[string]interface{}, tid uint
 
 	if rt.erp.Debugger != nil {
 		err = rt.erp.Debugger.VisitState(rt.node, vs, tid)
+		rt.erp.Debugger.SetLockingState(rt.erp.MutexeOwners, rt.erp.MutexLog)
 	}
 
 	return nil, err

+ 46 - 4
interpreter/rt_statements.go

@@ -675,18 +675,60 @@ func (rt *mutexRuntime) Eval(vs parser.Scope, is map[string]interface{}, tid uin
 
 		name := rt.node.Children[0].Token.Val
 
+		// Take mutex to modify the mutex map
+
+		rt.erp.MutexesMutex.Lock()
+
+		// Lookup the mutex
+
 		mutex, ok := rt.erp.Mutexes[name]
 		if !ok {
 			mutex = &sync.Mutex{}
 			rt.erp.Mutexes[name] = mutex
 		}
 
-		tvs := vs.NewChild(scope.NameFromASTNode(rt.node))
+		// Try to take the mutex if this thread does not already own it
 
-		mutex.Lock()
-		defer mutex.Unlock()
+		owner, ok := rt.erp.MutexeOwners[name]
 
-		res, err = rt.node.Children[0].Runtime.Eval(tvs, is, tid)
+		rt.erp.MutexesMutex.Unlock()
+
+		if !ok || owner != tid {
+
+			rt.erp.MutexLog.Add(fmt.Sprintf("Thread: %v - attempting to take lock %v with owner %v at %v:%v",
+				tid, name, owner, rt.node.Token.Lsource, rt.node.Token.Lline))
+
+			mutex.Lock()
+
+			rt.erp.MutexLog.Add(fmt.Sprintf("Thread: %v - took lock %v with owner %v", tid, name, owner))
+
+			// Register ownership on mutex
+
+			rt.erp.MutexesMutex.Lock()
+			rt.erp.MutexeOwners[name] = tid
+			rt.erp.MutexesMutex.Unlock()
+
+			defer func() {
+				rt.erp.MutexLog.Add(fmt.Sprintf("Thread: %v - releasing lock %v", tid, name))
+
+				// Unregister ownership on mutex
+
+				rt.erp.MutexesMutex.Lock()
+				rt.erp.MutexeOwners[name] = 0
+				rt.erp.MutexesMutex.Unlock()
+
+				mutex.Unlock()
+			}()
+
+		} else if owner == tid {
+
+			rt.erp.MutexLog.Add(fmt.Sprintf("Thread: %v - attempted to take lock %v twice", tid, name))
+		}
+
+		rt.erp.MutexLog.Add(fmt.Sprintf("Thread: %v - execute critical section %v", tid, name))
+
+		tvs := vs.NewChild(scope.NameFromASTNode(rt.node))
+		res, err = rt.node.Children[1].Runtime.Eval(tvs, is, tid)
 	}
 
 	return res, err

+ 73 - 14
interpreter/rt_statements_test.go

@@ -1126,30 +1126,89 @@ func TestMutexStatements(t *testing.T) {
 
 	_, err := UnitTestEvalAndAST(
 		`
+a := 2
 mutex foo {
 	a := 1
     raise("test 12", null, [1,2,3])
 }
 `, vs,
 		`
-mutex
-  identifier: foo
-  statements
-    :=
-      identifier: a
-      number: 1
-    identifier: raise
-      funccall
-        string: 'test 12'
-        null
-        list
-          number: 1
-          number: 2
-          number: 3
+statements
+  :=
+    identifier: a
+    number: 2
+  mutex
+    identifier: foo
+    statements
+      :=
+        identifier: a
+        number: 1
+      identifier: raise
+        funccall
+          string: 'test 12'
+          null
+          list
+            number: 1
+            number: 2
+            number: 3
+`[1:])
+
+	if err == nil || err.Error() != "ECAL error in ECALTestRuntime (ECALEvalTest): test 12 () (Line:5 Pos:5)" {
+		t.Error(err)
+		return
+	}
+
+	if vs.String() != `GlobalScope {
+    a (float64) : 1
+    block: mutex (Line:3 Pos:1) {
+    }
+}` {
+		t.Error("Unexpected variable scope:", vs)
+	}
+
+	// Can take mutex twice
+
+	_, err = UnitTestEvalAndAST(
+		`
+a := 2
+mutex foo {
+	a := 1
+	mutex foo {
+		a := 3
+	}
+}
+`, vs,
+		`
+statements
+  :=
+    identifier: a
+    number: 2
+  mutex
+    identifier: foo
+    statements
+      :=
+        identifier: a
+        number: 1
+      mutex
+        identifier: foo
+        statements
+          :=
+            identifier: a
+            number: 3
 `[1:])
 
 	if err != nil {
 		t.Error(err)
 		return
 	}
+
+	if vs.String() != `GlobalScope {
+    a (float64) : 3
+    block: mutex (Line:3 Pos:1) {
+        block: mutex (Line:5 Pos:2) {
+        }
+    }
+}` {
+		t.Error("Unexpected variable scope:", vs)
+	}
 }

+ 11 - 0
util/types.go

@@ -13,6 +13,7 @@ package util
 import (
 	"time"
 
+	"devt.de/krotik/common/datautil"
 	"devt.de/krotik/ecal/parser"
 )
 
@@ -134,6 +135,11 @@ type ECALDebugger interface {
 	*/
 	BreakOnError(flag bool)
 
+	/*
+	   SetLockingState sets locking status information.
+	*/
+	SetLockingState(mutexeOwners map[string]uint64, mutexLog *datautil.RingBuffer)
+
 	/*
 	   VisitState is called for every state during the execution of a program.
 	*/
@@ -191,6 +197,11 @@ type ECALDebugger interface {
 	*/
 	Status() interface{}
 
+	/*
+	   LockStatus returns the current locking status.
+	*/
+	LockState() interface{}
+
 	/*
 	   Describe describes a thread currently observed by the debugger.
 	*/