Browse Source

feat: Adding mutex blocks

Matthias Ladkau 3 years ago
parent
commit
7e561e5089

+ 10 - 1
ecal.md

@@ -29,7 +29,7 @@ foobar.doSomething()
 
 Event Sinks
 --
-Event sinks are the core constructs of ECAL which provide concurrency and the means to respond to events of an external system. Sinks provide ECAL with an interface to an [event condition action engine](engine.md) which coordinates the parallel execution of code. Sinks cannot be scoped into modules or objects and are usually declared at the top level. Sinks must not write to top level variables. They have the following form:
+Event sinks are the core constructs of ECAL which provide concurrency and the means to respond to events of an external system. Sinks provide ECAL with an interface to an [event condition action engine](engine.md) which coordinates the parallel execution of code. Sinks cannot be scoped into modules or objects and are usually declared at the top level. They must only access top level variables within mutex blocks. Sinks have the following form:
 ```
 sink mysink
     kindmatch [ "foo.bar.*" ],
@@ -94,6 +94,15 @@ res := addEventAndWait("request", "foo.bar.xxx", {
  ```
 The order of execution of sinks can be controlled via their priority. All sinks which are triggered by a particular event will be executed in order of their priority.
 
+Mutex blocks
+--
+To protect shared resource when handling concurrent events, ECAL supports mutex blocks. Mutex blocks which share the same name can only be accessed by one thread at a given time:
+```
+mutex myresource {
+  globalResource := "new value"
+}
+```
+
 Functions
 --
 Functions define reusable pieces of code dedicated to perform a particular task based on a set of given input values. In ECAL functions are first-class citizens in that they can be assigned to variables and  passed as arguments. Each parameter can have a default value which is by default NULL.

+ 4 - 0
examples/embedding/main.go

@@ -108,8 +108,12 @@ mystuff.add(compute(5), 1)
 		log.Fatal(err)
 	}
 
+	// All errors can be found on the returned monitor object
+
 	fmt.Println("Event result:", monitor.RootMonitor().AllErrors())
 
+	// The log messages of a program can be collected
+
 	fmt.Println("Log:", logger.String())
 }
 

+ 8 - 1
interpreter/provider.go

@@ -13,6 +13,7 @@ package interpreter
 import (
 	"os"
 	"path/filepath"
+	"sync"
 
 	"devt.de/krotik/common/timeutil"
 	"devt.de/krotik/ecal/config"
@@ -130,6 +131,10 @@ var providerMap = map[string]ecalRuntimeNew{
 	parser.NodeTRY:     tryRuntimeInst,
 	parser.NodeEXCEPT:  voidRuntimeInst,
 	parser.NodeFINALLY: voidRuntimeInst,
+
+	// Mutex block
+
+	parser.NodeMUTEX: mutexRuntimeInst,
 }
 
 /*
@@ -140,6 +145,7 @@ type ECALRuntimeProvider struct {
 	ImportLocator util.ECALImportLocator // Locator object for imports
 	Logger        util.Logger            // Logger object for log messages
 	Processor     engine.Processor       // Processor of the ECA engine
+	Mutexes       map[string]*sync.Mutex // Map of named mutexes
 	Cron          *timeutil.Cron         // Cron object for scheduled execution
 	Debugger      util.ECALDebugger      // Optional: ECAL Debugger object
 }
@@ -173,7 +179,8 @@ func NewECALRuntimeProvider(name string, importLocator util.ECALImportLocator, l
 	cron := timeutil.NewCron()
 	cron.Start()
 
-	return &ECALRuntimeProvider{name, importLocator, logger, proc, cron, nil}
+	return &ECALRuntimeProvider{name, importLocator, logger, proc,
+		make(map[string]*sync.Mutex), cron, nil}
 }
 
 /*

+ 49 - 0
interpreter/rt_statements.go

@@ -12,6 +12,7 @@ package interpreter
 
 import (
 	"fmt"
+	"sync"
 
 	"devt.de/krotik/common/errorutil"
 	"devt.de/krotik/common/sortutil"
@@ -601,3 +602,51 @@ func (rt *tryRuntime) Eval(vs parser.Scope, is map[string]interface{}, tid uint6
 
 	return res, err
 }
+
+// Mutex Runtime
+// =============
+
+/*
+mutexRuntime is the runtime for mutex blocks.
+*/
+type mutexRuntime struct {
+	*baseRuntime
+}
+
+/*
+mutexRuntimeInst returns a new runtime component instance.
+*/
+func mutexRuntimeInst(erp *ECALRuntimeProvider, node *parser.ASTNode) parser.Runtime {
+	return &mutexRuntime{newBaseRuntime(erp, node)}
+}
+
+/*
+Eval evaluate this runtime component.
+*/
+func (rt *mutexRuntime) Eval(vs parser.Scope, is map[string]interface{}, tid uint64) (interface{}, error) {
+	var res interface{}
+
+	_, err := rt.baseRuntime.Eval(vs, is, tid)
+
+	if err == nil {
+
+		// Get the name of the mutex
+
+		name := rt.node.Children[0].Token.Val
+
+		mutex, ok := rt.erp.Mutexes[name]
+		if !ok {
+			mutex = &sync.Mutex{}
+			rt.erp.Mutexes[name] = mutex
+		}
+
+		tvs := vs.NewChild(scope.NameFromASTNode(rt.node))
+
+		mutex.Lock()
+		defer mutex.Unlock()
+
+		res, err = rt.node.Children[0].Runtime.Eval(tvs, is, tid)
+	}
+
+	return res, err
+}

+ 34 - 0
interpreter/rt_statements_test.go

@@ -1043,3 +1043,37 @@ error: This did not work`[1:] {
 		return
 	}
 }
+
+func TestMutexStatements(t *testing.T) {
+
+	vs := scope.NewScope(scope.GlobalScope)
+
+	_, err := UnitTestEvalAndAST(
+		`
+mutex foo {
+	a := 1
+    raise("test 12", null, [1,2,3])
+}
+`, vs,
+		`
+mutex
+  identifier: foo
+  statements
+    :=
+      identifier: a
+      number: 1
+    identifier: raise
+      funccall
+        string: 'test 12'
+        null
+        list
+          number: 1
+          number: 2
+          number: 3
+`[1:])
+
+	if err != nil {
+		t.Error(err)
+		return
+	}
+}

+ 8 - 0
parser/const.go

@@ -186,6 +186,10 @@ const (
 	TokenEXCEPT
 	TokenFINALLY
 
+	// Mutex block
+
+	TokenMUTEX
+
 	TokenENDLIST
 )
 
@@ -298,4 +302,8 @@ const (
 	NodeEXCEPT  = "except"
 	NodeAS      = "as"
 	NodeFINALLY = "finally"
+
+	// Mutex block
+
+	NodeMUTEX = "mutex"
 )

+ 4 - 0
parser/lexer.go

@@ -242,6 +242,10 @@ var KeywordMap = map[string]LexTokenID{
 	"try":     TokenTRY,
 	"except":  TokenEXCEPT,
 	"finally": TokenFINALLY,
+
+	// Mutex block
+
+	"mutex": TokenMUTEX,
 }
 
 /*

+ 19 - 0
parser/parser.go

@@ -139,6 +139,10 @@ func init() {
 		TokenTRY:     {NodeTRY, nil, nil, nil, nil, 0, ndTry, nil},
 		TokenEXCEPT:  {NodeEXCEPT, nil, nil, nil, nil, 0, nil, nil},
 		TokenFINALLY: {NodeFINALLY, nil, nil, nil, nil, 0, nil, nil},
+
+		// Mutex statement
+
+		TokenMUTEX: {NodeMUTEX, nil, nil, nil, nil, 0, ndMutex, nil},
 	}
 }
 
@@ -836,6 +840,21 @@ func ndTry(p *parser, self *ASTNode) (*ASTNode, error) {
 	return try, err
 }
 
+/*
+ndMutex is used to parse a mutex block.
+*/
+func ndMutex(p *parser, self *ASTNode) (*ASTNode, error) {
+	var block *ASTNode
+
+	err := acceptChild(p, self, TokenIDENTIFIER)
+
+	if err == nil {
+		block, err = parseInnerStatements(p, self)
+	}
+
+	return block, err
+}
+
 // Standard left denotation functions
 // ==================================
 

+ 26 - 0
parser/parser_statement_test.go

@@ -154,6 +154,32 @@ try
 	}
 }
 
+func TestMutexBlock(t *testing.T) {
+
+	input := `
+mutex foo {
+	print(1)
+	print(2)
+}
+`
+	expectedOutput := `
+mutex
+  identifier: foo
+  statements
+    identifier: print
+      funccall
+        number: 1
+    identifier: print
+      funccall
+        number: 2
+`[1:]
+
+	if res, err := UnitTestParse("mytest", input); err != nil || fmt.Sprint(res) != expectedOutput {
+		t.Error("Unexpected parser output:\n", res, "expected was:\n", expectedOutput, "Error:", err)
+		return
+	}
+}
+
 func TestLoopParsing(t *testing.T) {
 
 	input := `

+ 4 - 0
parser/prettyprinter.go

@@ -135,6 +135,10 @@ func init() {
 		// TokenTRY - Special case (handled in code)
 		// TokenEXCEPT - Special case (handled in code)
 		NodeFINALLY + "_1": template.Must(template.New(NodeFINALLY).Parse(" finally {\n{{.c1}}}\n")),
+
+		// Mutex block
+
+		NodeMUTEX + "_2": template.Must(template.New(NodeLOOP).Parse("mutex {{.c1}} {\n{{.c2}}}\n")),
 	}
 
 	bracketPrecedenceMap = map[string]bool{