|
@@ -14,7 +14,10 @@ import (
|
|
|
"fmt"
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
+ "time"
|
|
|
|
|
|
+ "devt.de/krotik/common/errorutil"
|
|
|
+ "devt.de/krotik/common/timeutil"
|
|
|
"devt.de/krotik/ecal/engine"
|
|
|
"devt.de/krotik/ecal/parser"
|
|
|
"devt.de/krotik/ecal/scope"
|
|
@@ -37,6 +40,7 @@ var InbuildFuncMap = map[string]util.ECALFunction{
|
|
|
"raise": &raise{&inbuildBaseFunc{}},
|
|
|
"addEvent": &addevent{&inbuildBaseFunc{}},
|
|
|
"addEventAndWait": &addeventandwait{&addevent{&inbuildBaseFunc{}}},
|
|
|
+ "setCronTrigger": &setCronTrigger{&inbuildBaseFunc{}},
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -743,3 +747,66 @@ func (rf *addeventandwait) DocString() (string, error) {
|
|
|
return "AddEventAndWait adds an event to trigger sinks. This function will " +
|
|
|
"return once the event cascade has finished.", nil
|
|
|
}
|
|
|
+
|
|
|
+// setCronTrigger
|
|
|
+// ==============
|
|
|
+
|
|
|
+/*
|
|
|
+setCronTrigger adds a periodic cron job which fires events.
|
|
|
+*/
|
|
|
+type setCronTrigger struct {
|
|
|
+ *inbuildBaseFunc
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+Run executes this function.
|
|
|
+*/
|
|
|
+func (ct *setCronTrigger) Run(instanceID string, vs parser.Scope, is map[string]interface{}, args []interface{}) (interface{}, error) {
|
|
|
+ var res interface{}
|
|
|
+ err := fmt.Errorf("Need a cronspec, an event name and an event scope as parameters")
|
|
|
+
|
|
|
+ if len(args) > 2 {
|
|
|
+ var cs *timeutil.CronSpec
|
|
|
+
|
|
|
+ cronspec := fmt.Sprint(args[0])
|
|
|
+ eventname := fmt.Sprint(args[1])
|
|
|
+ eventkind := strings.Split(fmt.Sprint(args[2]), ".")
|
|
|
+
|
|
|
+ erp := is["erp"].(*ECALRuntimeProvider)
|
|
|
+ proc := erp.Processor
|
|
|
+
|
|
|
+ if proc.Stopped() {
|
|
|
+ proc.Start()
|
|
|
+ }
|
|
|
+
|
|
|
+ if cs, err = timeutil.NewCronSpec(cronspec); err == nil {
|
|
|
+ res = cs.String()
|
|
|
+
|
|
|
+ tick := 0
|
|
|
+
|
|
|
+ erp.Cron.RegisterSpec(cs, func() {
|
|
|
+ tick += 1
|
|
|
+ now := erp.Cron.NowFunc()
|
|
|
+ event := engine.NewEvent(eventname, eventkind, map[interface{}]interface{}{
|
|
|
+ "time": now,
|
|
|
+ "timestamp": fmt.Sprintf("%d", now.UnixNano()/int64(time.Millisecond)),
|
|
|
+ "tick": tick,
|
|
|
+ })
|
|
|
+ monitor := proc.NewRootMonitor(nil, nil)
|
|
|
+ _, err := proc.AddEvent(event, monitor)
|
|
|
+ errorutil.AssertTrue(err == nil,
|
|
|
+ fmt.Sprintf("Could not add cron event for trigger %v %v %v: %v",
|
|
|
+ cronspec, eventname, eventkind, err))
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return res, err
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+DocString returns a descriptive string.
|
|
|
+*/
|
|
|
+func (ct *setCronTrigger) DocString() (string, error) {
|
|
|
+ return "setCronTrigger adds a periodic cron job which fires events.", nil
|
|
|
+}
|