Browse Source

feat: Adding pulse trigger function

Matthias Ladkau 3 years ago
parent
commit
701de856f5
3 changed files with 134 additions and 3 deletions
  1. 18 1
      ecal.md
  2. 78 1
      interpreter/func_provider.go
  3. 38 1
      interpreter/func_provider_test.go

+ 18 - 1
ecal.md

@@ -495,8 +495,25 @@ cronspec  | The cron job specification string
 eventname | Event name for the cron triggered events
 eventkind | Event kind for the cron triggered events
 
-#### `setPulseTrigger() : string`
+Example:
+```
+# at second 1 of minute 1 of every 10th hour every day
+setCronTrigger("1 1 *%10 * * *", "cronevent", "foo.bar")
+```
+
+#### `setPulseTrigger(micros, eventname, eventkind)`
+Adds recurring events in very short intervals.
+
+Parameter | Description
+-|-
+micros    | Microsecond interval between events
+eventname | Event name for the triggered events
+eventkind | Event kind for the triggered events
 
+Example:
+```
+setPulseTrigger(100, "foo", "bar")
+```
 
 Logging Functions
 --

+ 78 - 1
interpreter/func_provider.go

@@ -41,6 +41,7 @@ var InbuildFuncMap = map[string]util.ECALFunction{
 	"addEvent":        &addevent{&inbuildBaseFunc{}},
 	"addEventAndWait": &addeventandwait{&addevent{&inbuildBaseFunc{}}},
 	"setCronTrigger":  &setCronTrigger{&inbuildBaseFunc{}},
+	"setPulseTrigger": &setPulseTrigger{&inbuildBaseFunc{}},
 }
 
 /*
@@ -790,7 +791,7 @@ func (ct *setCronTrigger) Run(instanceID string, vs parser.Scope, is map[string]
 				event := engine.NewEvent(eventname, eventkind, map[interface{}]interface{}{
 					"time":      now,
 					"timestamp": fmt.Sprintf("%d", now.UnixNano()/int64(time.Millisecond)),
-					"tick":      tick,
+					"tick":      float64(tick),
 				})
 				monitor := proc.NewRootMonitor(nil, nil)
 				_, err := proc.AddEvent(event, monitor)
@@ -810,3 +811,79 @@ DocString returns a descriptive string.
 func (ct *setCronTrigger) DocString() (string, error) {
 	return "setCronTrigger adds a periodic cron job which fires events.", nil
 }
+
+// setPulseTrigger
+// ==============
+
+/*
+setPulseTrigger adds recurring events in very short intervals.
+*/
+type setPulseTrigger struct {
+	*inbuildBaseFunc
+}
+
+/*
+Run executes this function.
+*/
+func (pt *setPulseTrigger) Run(instanceID string, vs parser.Scope, is map[string]interface{}, args []interface{}) (interface{}, error) {
+	err := fmt.Errorf("Need micro second interval, an event name and an event scope as parameters")
+
+	if len(args) > 2 {
+		var micros float64
+
+		micros, err = pt.AssertNumParam(1, args[0])
+
+		if err == nil {
+			eventname := fmt.Sprint(args[1])
+			eventkind := strings.Split(fmt.Sprint(args[2]), ".")
+
+			erp := is["erp"].(*ECALRuntimeProvider)
+			proc := erp.Processor
+
+			if proc.Stopped() {
+				proc.Start()
+			}
+
+			tick := 0
+
+			go func() {
+				var lastmicros int64
+
+				for {
+					time.Sleep(time.Duration(micros) * time.Microsecond)
+
+					tick += 1
+					now := time.Now()
+					micros := now.UnixNano() / int64(time.Microsecond)
+					event := engine.NewEvent(eventname, eventkind, map[interface{}]interface{}{
+						"currentMicros": float64(micros),
+						"lastMicros":    float64(lastmicros),
+						"timestamp":     fmt.Sprintf("%d", now.UnixNano()/int64(time.Microsecond)),
+						"tick":          float64(tick),
+					})
+					lastmicros = micros
+
+					monitor := proc.NewRootMonitor(nil, nil)
+					_, err := proc.AddEvent(event, monitor)
+
+					if proc.Stopped() {
+						break
+					}
+
+					errorutil.AssertTrue(err == nil,
+						fmt.Sprintf("Could not add pulse event for trigger %v %v %v: %v",
+							micros, eventname, eventkind, err))
+				}
+			}()
+		}
+	}
+
+	return nil, err
+}
+
+/*
+DocString returns a descriptive string.
+*/
+func (pt *setPulseTrigger) DocString() (string, error) {
+	return "setPulseTrigger adds recurring events in microsecond intervals.", nil
+}

+ 38 - 1
interpreter/func_provider_test.go

@@ -13,6 +13,7 @@ package interpreter
 import (
 	"fmt"
 	"reflect"
+	"strings"
 	"testing"
 	"time"
 
@@ -345,7 +346,7 @@ log("Cron:", setCronTrigger("1 1 *%10 * * *", "cronevent", "foo.bar"))
 	}
 
 	testcron.Start()
-	time.Sleep(100 * time.Millisecond)
+	time.Sleep(200 * time.Millisecond)
 
 	if testlogger.String() != `
 Cron:at second 1 of minute 1 of every 10th hour every day
@@ -379,7 +380,43 @@ test rule - Handling request: {
 		t.Error("Unexpected result:", testlogger.String())
 		return
 	}
+}
+
+func TestPulseTrigger(t *testing.T) {
+
+	res, err := UnitTestEval(
+		`setPulseTrigger("test", "foo", "bar")`, nil)
+
+	if err == nil ||
+		err.Error() != "ECAL error in ECALTestRuntime: Runtime error (Parameter 1 should be a number) (Line:1 Pos:1)" {
+		t.Error("Unexpected result: ", res, err)
+		return
+	}
+
+	res, err = UnitTestEval(
+		`
+sink test
+  kindmatch [ "foo.*" ],
+{
+	log("test rule - Handling request: ", event)
+	log("Duration: ", event.state.currentMicros - event.state.lastMicros," us (microsecond)")
+}
+
+setPulseTrigger(100, "pulseevent", "foo.bar")
+`, nil)
+
+	if err != nil {
+		t.Error("Unexpected result:", err)
+		return
+	}
+
+	time.Sleep(100 * time.Millisecond)
+	testprocessor.Finish()
 
+	if !strings.Contains(testlogger.String(), "Handling request") {
+		t.Error("Unexpected result:", testlogger.String())
+		return
+	}
 }
 
 func TestDocstrings(t *testing.T) {