func_provider.go 24 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070
  1. /*
  2. * ECAL
  3. *
  4. * Copyright 2020 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. package interpreter
  11. import (
  12. "fmt"
  13. "math/rand"
  14. "strconv"
  15. "strings"
  16. "time"
  17. "devt.de/krotik/common/errorutil"
  18. "devt.de/krotik/common/timeutil"
  19. "devt.de/krotik/ecal/engine"
  20. "devt.de/krotik/ecal/parser"
  21. "devt.de/krotik/ecal/scope"
  22. "devt.de/krotik/ecal/stdlib"
  23. "devt.de/krotik/ecal/util"
  24. )
  25. /*
  26. InbuildFuncMap contains the mapping of inbuild functions.
  27. */
  28. var InbuildFuncMap = map[string]util.ECALFunction{
  29. "range": &rangeFunc{&inbuildBaseFunc{}},
  30. "new": &newFunc{&inbuildBaseFunc{}},
  31. "type": &typeFunc{&inbuildBaseFunc{}},
  32. "len": &lenFunc{&inbuildBaseFunc{}},
  33. "del": &delFunc{&inbuildBaseFunc{}},
  34. "add": &addFunc{&inbuildBaseFunc{}},
  35. "concat": &concatFunc{&inbuildBaseFunc{}},
  36. "now": &nowFunc{&inbuildBaseFunc{}},
  37. "rand": &randFunc{&inbuildBaseFunc{}},
  38. "timestamp": &timestampFunc{&inbuildBaseFunc{}},
  39. "dumpenv": &dumpenvFunc{&inbuildBaseFunc{}},
  40. "doc": &docFunc{&inbuildBaseFunc{}},
  41. "sleep": &sleepFunc{&inbuildBaseFunc{}},
  42. "raise": &raise{&inbuildBaseFunc{}},
  43. "addEvent": &addevent{&inbuildBaseFunc{}},
  44. "addEventAndWait": &addeventandwait{&addevent{&inbuildBaseFunc{}}},
  45. "setCronTrigger": &setCronTrigger{&inbuildBaseFunc{}},
  46. "setPulseTrigger": &setPulseTrigger{&inbuildBaseFunc{}},
  47. }
  48. /*
  49. inbuildBaseFunc is the base structure for inbuild functions providing some
  50. utility functions.
  51. */
  52. type inbuildBaseFunc struct {
  53. }
  54. /*
  55. AssertNumParam converts a general interface{} parameter into a number.
  56. */
  57. func (ibf *inbuildBaseFunc) AssertNumParam(index int, val interface{}) (float64, error) {
  58. var err error
  59. resNum, ok := val.(float64)
  60. if !ok {
  61. resNum, err = strconv.ParseFloat(fmt.Sprint(val), 64)
  62. if err != nil {
  63. err = fmt.Errorf("Parameter %v should be a number", index)
  64. }
  65. }
  66. return resNum, err
  67. }
  68. /*
  69. AssertMapParam converts a general interface{} parameter into a map.
  70. */
  71. func (ibf *inbuildBaseFunc) AssertMapParam(index int, val interface{}) (map[interface{}]interface{}, error) {
  72. valMap, ok := val.(map[interface{}]interface{})
  73. if ok {
  74. return valMap, nil
  75. }
  76. return nil, fmt.Errorf("Parameter %v should be a map", index)
  77. }
  78. /*
  79. AssertListParam converts a general interface{} parameter into a list.
  80. */
  81. func (ibf *inbuildBaseFunc) AssertListParam(index int, val interface{}) ([]interface{}, error) {
  82. valList, ok := val.([]interface{})
  83. if ok {
  84. return valList, nil
  85. }
  86. return nil, fmt.Errorf("Parameter %v should be a list", index)
  87. }
  88. // Range
  89. // =====
  90. /*
  91. rangeFunc is an interator function which returns a range of numbers.
  92. */
  93. type rangeFunc struct {
  94. *inbuildBaseFunc
  95. }
  96. /*
  97. Run executes this function.
  98. */
  99. func (rf *rangeFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  100. var currVal, to float64
  101. var err error
  102. lenargs := len(args)
  103. from := 0.
  104. step := 1.
  105. if lenargs == 0 {
  106. err = fmt.Errorf("Need at least an end range as first parameter")
  107. }
  108. if err == nil {
  109. if stepVal, ok := is[instanceID+"step"]; ok {
  110. step = stepVal.(float64)
  111. from = is[instanceID+"from"].(float64)
  112. to = is[instanceID+"to"].(float64)
  113. currVal = is[instanceID+"currVal"].(float64)
  114. is[instanceID+"currVal"] = currVal + step
  115. // Check for end of iteration
  116. if (from < to && currVal > to) || (from > to && currVal < to) || from == to {
  117. err = util.ErrEndOfIteration
  118. }
  119. } else {
  120. if lenargs == 1 {
  121. to, err = rf.AssertNumParam(1, args[0])
  122. } else {
  123. from, err = rf.AssertNumParam(1, args[0])
  124. if err == nil {
  125. to, err = rf.AssertNumParam(2, args[1])
  126. }
  127. if err == nil && lenargs > 2 {
  128. step, err = rf.AssertNumParam(3, args[2])
  129. }
  130. }
  131. if err == nil {
  132. is[instanceID+"from"] = from
  133. is[instanceID+"to"] = to
  134. is[instanceID+"step"] = step
  135. is[instanceID+"currVal"] = from
  136. currVal = from
  137. }
  138. }
  139. }
  140. if err == nil {
  141. err = util.ErrIsIterator // Identify as iterator
  142. }
  143. return currVal, err
  144. }
  145. /*
  146. DocString returns a descriptive string.
  147. */
  148. func (rf *rangeFunc) DocString() (string, error) {
  149. return "Iterates over number ranges. Parameters are start, end and step.", nil
  150. }
  151. // New
  152. // ===
  153. /*
  154. newFunc instantiates a new object.
  155. */
  156. type newFunc struct {
  157. *inbuildBaseFunc
  158. }
  159. /*
  160. Run executes this function.
  161. */
  162. func (rf *newFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  163. var res interface{}
  164. err := fmt.Errorf("Need a map as first parameter")
  165. if len(args) > 0 {
  166. var argMap map[interface{}]interface{}
  167. if argMap, err = rf.AssertMapParam(1, args[0]); err == nil {
  168. obj := make(map[interface{}]interface{})
  169. res = obj
  170. _, err = rf.addSuperClasses(vs, is, obj, argMap)
  171. if initObj, ok := obj["init"]; ok {
  172. if initFunc, ok := initObj.(*function); ok {
  173. initvs := scope.NewScope(fmt.Sprintf("newfunc: %v", instanceID))
  174. initis := make(map[string]interface{})
  175. _, err = initFunc.Run(instanceID, initvs, initis, tid, args[1:])
  176. }
  177. }
  178. }
  179. }
  180. return res, err
  181. }
  182. /*
  183. addSuperClasses adds super class functions to a given object.
  184. */
  185. func (rf *newFunc) addSuperClasses(vs parser.Scope, is map[string]interface{},
  186. obj map[interface{}]interface{}, template map[interface{}]interface{}) (interface{}, error) {
  187. var err error
  188. var initFunc interface{}
  189. var initSuperList []interface{}
  190. // First loop into the base classes (i.e. top-most classes)
  191. if super, ok := template["super"]; ok {
  192. if superList, ok := super.([]interface{}); ok {
  193. for _, superObj := range superList {
  194. var superInit interface{}
  195. if superTemplate, ok := superObj.(map[interface{}]interface{}); ok {
  196. superInit, err = rf.addSuperClasses(vs, is, obj, superTemplate)
  197. initSuperList = append(initSuperList, superInit) // Build up the list of super functions
  198. }
  199. }
  200. } else {
  201. err = fmt.Errorf("Property _super must be a list of super classes")
  202. }
  203. }
  204. // Copy all properties from template to obj
  205. for k, v := range template {
  206. // Save previous init function
  207. if funcVal, ok := v.(*function); ok {
  208. newFunction := &function{funcVal.name, nil, obj, funcVal.declaration, funcVal.declarationVS}
  209. if k == "init" {
  210. newFunction.super = initSuperList
  211. initFunc = newFunction
  212. }
  213. obj[k] = newFunction
  214. } else {
  215. obj[k] = v
  216. }
  217. }
  218. return initFunc, err
  219. }
  220. /*
  221. DocString returns a descriptive string.
  222. */
  223. func (rf *newFunc) DocString() (string, error) {
  224. return "Creates a new object instance.", nil
  225. }
  226. // Type
  227. // =====
  228. /*
  229. typeFunc returns the underlying types and values of an object.
  230. */
  231. type typeFunc struct {
  232. *inbuildBaseFunc
  233. }
  234. /*
  235. Run executes this function.
  236. */
  237. func (rf *typeFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  238. var res interface{}
  239. err := fmt.Errorf("Need a value as first parameter")
  240. if len(args) > 0 {
  241. res = fmt.Sprintf("%#v", args[0])
  242. err = nil
  243. }
  244. return res, err
  245. }
  246. /*
  247. DocString returns a descriptive string.
  248. */
  249. func (rf *typeFunc) DocString() (string, error) {
  250. return "Returns the underlying types and values of an object.", nil
  251. }
  252. // Len
  253. // ===
  254. /*
  255. lenFunc returns the size of a list or map.
  256. */
  257. type lenFunc struct {
  258. *inbuildBaseFunc
  259. }
  260. /*
  261. Run executes this function.
  262. */
  263. func (rf *lenFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  264. var res float64
  265. err := fmt.Errorf("Need a list or a map as first parameter")
  266. if len(args) > 0 {
  267. argList, ok1 := args[0].([]interface{})
  268. argMap, ok2 := args[0].(map[interface{}]interface{})
  269. if ok1 {
  270. res = float64(len(argList))
  271. err = nil
  272. } else if ok2 {
  273. res = float64(len(argMap))
  274. err = nil
  275. }
  276. }
  277. return res, err
  278. }
  279. /*
  280. DocString returns a descriptive string.
  281. */
  282. func (rf *lenFunc) DocString() (string, error) {
  283. return "Returns the size of a list or map.", nil
  284. }
  285. // Del
  286. // ===
  287. /*
  288. delFunc removes an element from a list or map.
  289. */
  290. type delFunc struct {
  291. *inbuildBaseFunc
  292. }
  293. /*
  294. Run executes this function.
  295. */
  296. func (rf *delFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  297. var res interface{}
  298. err := fmt.Errorf("Need a list or a map as first parameter and an index or key as second parameter")
  299. if len(args) == 2 {
  300. if argList, ok1 := args[0].([]interface{}); ok1 {
  301. var index float64
  302. index, err = rf.AssertNumParam(2, args[1])
  303. if err == nil {
  304. res = append(argList[:int(index)], argList[int(index+1):]...)
  305. }
  306. }
  307. if argMap, ok2 := args[0].(map[interface{}]interface{}); ok2 {
  308. key := fmt.Sprint(args[1])
  309. delete(argMap, key)
  310. res = argMap
  311. err = nil
  312. }
  313. }
  314. return res, err
  315. }
  316. /*
  317. DocString returns a descriptive string.
  318. */
  319. func (rf *delFunc) DocString() (string, error) {
  320. return "Removes an item from a list or map.", nil
  321. }
  322. // Add
  323. // ===
  324. /*
  325. addFunc adds an element to a list.
  326. */
  327. type addFunc struct {
  328. *inbuildBaseFunc
  329. }
  330. /*
  331. Run executes this function.
  332. */
  333. func (rf *addFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  334. var res interface{}
  335. err := fmt.Errorf("Need a list as first parameter and a value as second parameter")
  336. if len(args) > 1 {
  337. var argList []interface{}
  338. if argList, err = rf.AssertListParam(1, args[0]); err == nil {
  339. if len(args) == 3 {
  340. var index float64
  341. if index, err = rf.AssertNumParam(3, args[2]); err == nil {
  342. argList = append(argList, 0)
  343. copy(argList[int(index+1):], argList[int(index):])
  344. argList[int(index)] = args[1]
  345. res = argList
  346. }
  347. } else {
  348. res = append(argList, args[1])
  349. }
  350. }
  351. }
  352. return res, err
  353. }
  354. /*
  355. DocString returns a descriptive string.
  356. */
  357. func (rf *addFunc) DocString() (string, error) {
  358. return "Adds an item to a list. The item is added at the optionally given index or at the end if no index is specified.", nil
  359. }
  360. // Concat
  361. // ======
  362. /*
  363. concatFunc joins one or more lists together.
  364. */
  365. type concatFunc struct {
  366. *inbuildBaseFunc
  367. }
  368. /*
  369. Run executes this function.
  370. */
  371. func (rf *concatFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  372. var res interface{}
  373. err := fmt.Errorf("Need at least two lists as parameters")
  374. if len(args) > 1 {
  375. var argList []interface{}
  376. resList := make([]interface{}, 0)
  377. err = nil
  378. for _, a := range args {
  379. if err == nil {
  380. if argList, err = rf.AssertListParam(1, a); err == nil {
  381. resList = append(resList, argList...)
  382. }
  383. }
  384. }
  385. if err == nil {
  386. res = resList
  387. }
  388. }
  389. return res, err
  390. }
  391. /*
  392. DocString returns a descriptive string.
  393. */
  394. func (rf *concatFunc) DocString() (string, error) {
  395. return "Joins one or more lists together. The result is a new list.", nil
  396. }
  397. // dumpenv
  398. // =======
  399. /*
  400. dumpenvFunc returns the current variable environment as a string.
  401. */
  402. type dumpenvFunc struct {
  403. *inbuildBaseFunc
  404. }
  405. /*
  406. Run executes this function.
  407. */
  408. func (rf *dumpenvFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  409. return vs.String(), nil
  410. }
  411. /*
  412. DocString returns a descriptive string.
  413. */
  414. func (rf *dumpenvFunc) DocString() (string, error) {
  415. return "Returns the current variable environment as a string.", nil
  416. }
  417. // now
  418. // ===
  419. /*
  420. nowFunc returns the current time in microseconds from 1st of January 1970 UTC.
  421. */
  422. type nowFunc struct {
  423. *inbuildBaseFunc
  424. }
  425. /*
  426. Run executes this function.
  427. */
  428. func (rf *nowFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  429. t := time.Now().UnixNano() / int64(time.Microsecond)
  430. return float64(t), nil
  431. }
  432. /*
  433. DocString returns a descriptive string.
  434. */
  435. func (rf *nowFunc) DocString() (string, error) {
  436. return "Returns the current time in microseconds from 1st of January 1970 UTC.", nil
  437. }
  438. // rand
  439. // ====
  440. /*
  441. randFunc returns a pseudo-random number between 0 and 1 from the default source.
  442. */
  443. type randFunc struct {
  444. *inbuildBaseFunc
  445. }
  446. /*
  447. Run executes this function.
  448. */
  449. func (rf *randFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  450. return rand.Float64(), nil
  451. }
  452. /*
  453. DocString returns a descriptive string.
  454. */
  455. func (rf *randFunc) DocString() (string, error) {
  456. return "Returns a pseudo-random number between 0 and 1 from the default source.", nil
  457. }
  458. // timestamp
  459. // ===
  460. /*
  461. timestampFunc returns a human readable time stamp string from a given number of microseconds since posix epoch time.
  462. */
  463. type timestampFunc struct {
  464. *inbuildBaseFunc
  465. }
  466. /*
  467. Run executes this function.
  468. */
  469. func (rf *timestampFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  470. var ret string
  471. var err error
  472. micros := float64(time.Now().UnixNano() / int64(time.Microsecond))
  473. loc := "UTC"
  474. if len(args) > 0 {
  475. micros, err = rf.AssertNumParam(1, args[0])
  476. if len(args) > 1 {
  477. loc = fmt.Sprint(args[1])
  478. }
  479. }
  480. if err == nil {
  481. var l *time.Location
  482. tsTime := time.Unix(0, int64(micros*1000))
  483. if l, err = time.LoadLocation(loc); err == nil {
  484. ret = tsTime.In(l).Format("2006-01-02T15:04:05.999999Z07:MST")
  485. }
  486. }
  487. return ret, err
  488. }
  489. /*
  490. DocString returns a descriptive string.
  491. */
  492. func (rf *timestampFunc) DocString() (string, error) {
  493. return "Returns a human readable time stamp string from a given number of microseconds since posix epoch time.", nil
  494. }
  495. // doc
  496. // ===
  497. /*
  498. docFunc returns the docstring of a function.
  499. */
  500. type docFunc struct {
  501. *inbuildBaseFunc
  502. }
  503. /*
  504. Run executes this function.
  505. */
  506. func (rf *docFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  507. var res interface{}
  508. err := fmt.Errorf("Need a function as parameter")
  509. if len(args) > 0 {
  510. funcObj, ok := args[0].(util.ECALFunction)
  511. if args[0] == nil {
  512. // Try to lookup by the given identifier
  513. c := is["astnode"].(*parser.ASTNode).Children[0].Children[0]
  514. astring := c.Token.Val
  515. if len(c.Children) > 0 {
  516. astring = fmt.Sprintf("%v.%v", astring, c.Children[0].Token.Val)
  517. }
  518. // Check for stdlib function
  519. if funcObj, ok = stdlib.GetStdlibFunc(astring); !ok {
  520. // Check for inbuild function
  521. funcObj, ok = InbuildFuncMap[astring]
  522. }
  523. }
  524. if ok {
  525. res, err = funcObj.DocString()
  526. }
  527. }
  528. return res, err
  529. }
  530. /*
  531. DocString returns a descriptive string.
  532. */
  533. func (rf *docFunc) DocString() (string, error) {
  534. return "Returns the docstring of a function.", nil
  535. }
  536. // sleep
  537. // =====
  538. /*
  539. sleepFunc pauses the current thread for a number of micro seconds.
  540. */
  541. type sleepFunc struct {
  542. *inbuildBaseFunc
  543. }
  544. /*
  545. Run executes this function.
  546. */
  547. func (rf *sleepFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  548. var res interface{}
  549. err := fmt.Errorf("Need number of micro seconds as parameter")
  550. if len(args) > 0 {
  551. var micros float64
  552. micros, err = rf.AssertNumParam(1, args[0])
  553. if err == nil {
  554. time.Sleep(time.Duration(micros) * time.Microsecond)
  555. }
  556. }
  557. return res, err
  558. }
  559. /*
  560. DocString returns a descriptive string.
  561. */
  562. func (rf *sleepFunc) DocString() (string, error) {
  563. return "Pauses the current thread for a number of micro seconds.", nil
  564. }
  565. // raise
  566. // =====
  567. /*
  568. raise returns an error. Outside of sinks this will stop the code execution
  569. if the error is not handled by try / except. Inside a sink only the specific sink
  570. will fail. This error can be used to break trigger sequences of sinks if
  571. FailOnFirstErrorInTriggerSequence is set.
  572. */
  573. type raise struct {
  574. *inbuildBaseFunc
  575. }
  576. /*
  577. Run executes this function.
  578. */
  579. func (rf *raise) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  580. var err error
  581. var detailMsg string
  582. var detail interface{}
  583. if len(args) > 0 {
  584. err = fmt.Errorf("%v", args[0])
  585. if len(args) > 1 {
  586. if args[1] != nil {
  587. detailMsg = fmt.Sprint(args[1])
  588. }
  589. if len(args) > 2 {
  590. detail = args[2]
  591. }
  592. }
  593. }
  594. erp := is["erp"].(*ECALRuntimeProvider)
  595. node := is["astnode"].(*parser.ASTNode)
  596. return nil, &util.RuntimeErrorWithDetail{
  597. RuntimeError: erp.NewRuntimeError(err, detailMsg, node).(*util.RuntimeError),
  598. Environment: vs,
  599. Data: detail,
  600. }
  601. }
  602. /*
  603. DocString returns a descriptive string.
  604. */
  605. func (rf *raise) DocString() (string, error) {
  606. return "Raise an error which stops the execution unless it is handled by a try/except block.", nil
  607. }
  608. // addEvent
  609. // ========
  610. /*
  611. addevent adds an event to trigger sinks. This function will return immediately
  612. and not wait for the event cascade to finish. Use this function for event cascades.
  613. */
  614. type addevent struct {
  615. *inbuildBaseFunc
  616. }
  617. /*
  618. Run executes this function.
  619. */
  620. func (rf *addevent) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  621. return rf.addEvent(func(proc engine.Processor, event *engine.Event, scope *engine.RuleScope) (interface{}, error) {
  622. var monitor engine.Monitor
  623. parentMonitor, ok := is["monitor"]
  624. if scope != nil || !ok {
  625. monitor = proc.NewRootMonitor(nil, scope)
  626. } else {
  627. monitor = parentMonitor.(engine.Monitor).NewChildMonitor(0)
  628. }
  629. _, err := proc.AddEvent(event, monitor)
  630. return nil, err
  631. }, is, args)
  632. }
  633. func (rf *addevent) addEvent(addFunc func(engine.Processor, *engine.Event, *engine.RuleScope) (interface{}, error),
  634. is map[string]interface{}, args []interface{}) (interface{}, error) {
  635. var res interface{}
  636. var stateMap map[interface{}]interface{}
  637. erp := is["erp"].(*ECALRuntimeProvider)
  638. proc := erp.Processor
  639. if proc.Stopped() {
  640. proc.Start()
  641. }
  642. err := fmt.Errorf("Need at least three parameters: name, kind and state")
  643. if len(args) > 2 {
  644. if stateMap, err = rf.AssertMapParam(3, args[2]); err == nil {
  645. var scope *engine.RuleScope
  646. event := engine.NewEvent(
  647. fmt.Sprint(args[0]),
  648. strings.Split(fmt.Sprint(args[1]), "."),
  649. stateMap,
  650. )
  651. if len(args) > 3 {
  652. var scopeMap map[interface{}]interface{}
  653. // Add optional scope - if not specified it is { "": true }
  654. if scopeMap, err = rf.AssertMapParam(4, args[3]); err == nil {
  655. var scopeData = map[string]bool{}
  656. for k, v := range scopeMap {
  657. b, _ := strconv.ParseBool(fmt.Sprint(v))
  658. scopeData[fmt.Sprint(k)] = b
  659. }
  660. scope = engine.NewRuleScope(scopeData)
  661. }
  662. }
  663. if err == nil {
  664. res, err = addFunc(proc, event, scope)
  665. }
  666. }
  667. }
  668. return res, err
  669. }
  670. /*
  671. DocString returns a descriptive string.
  672. */
  673. func (rf *addevent) DocString() (string, error) {
  674. return "Adds an event to trigger sinks. This function will return " +
  675. "immediately and not wait for the event cascade to finish.", nil
  676. }
  677. // addEventAndWait
  678. // ===============
  679. /*
  680. addeventandwait adds an event to trigger sinks. This function will return once
  681. the event cascade has finished and return all errors.
  682. */
  683. type addeventandwait struct {
  684. *addevent
  685. }
  686. /*
  687. Run executes this function.
  688. */
  689. func (rf *addeventandwait) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  690. return rf.addEvent(func(proc engine.Processor, event *engine.Event, scope *engine.RuleScope) (interface{}, error) {
  691. var res []interface{}
  692. rm := proc.NewRootMonitor(nil, scope)
  693. m, err := proc.AddEventAndWait(event, rm)
  694. if m != nil {
  695. allErrors := m.(*engine.RootMonitor).AllErrors()
  696. for _, e := range allErrors {
  697. errors := map[interface{}]interface{}{}
  698. for k, v := range e.ErrorMap {
  699. // Note: The variable scope of the sink (se.environment)
  700. // was also captured - for now it is not exposed to the
  701. // language environment
  702. errorItem := map[interface{}]interface{}{
  703. "error": v.Error(),
  704. }
  705. if se, ok := v.(*util.RuntimeErrorWithDetail); ok {
  706. errorItem["type"] = se.Type.Error()
  707. errorItem["detail"] = se.Detail
  708. errorItem["data"] = se.Data
  709. }
  710. errors[k] = errorItem
  711. }
  712. item := map[interface{}]interface{}{
  713. "event": map[interface{}]interface{}{
  714. "name": e.Event.Name(),
  715. "kind": strings.Join(e.Event.Kind(), "."),
  716. "state": e.Event.State(),
  717. },
  718. "errors": errors,
  719. }
  720. res = append(res, item)
  721. }
  722. }
  723. return res, err
  724. }, is, args)
  725. }
  726. /*
  727. DocString returns a descriptive string.
  728. */
  729. func (rf *addeventandwait) DocString() (string, error) {
  730. return "Adds an event to trigger sinks. This function will " +
  731. "return once the event cascade has finished.", nil
  732. }
  733. // setCronTrigger
  734. // ==============
  735. /*
  736. setCronTrigger adds a periodic cron job which fires events.
  737. */
  738. type setCronTrigger struct {
  739. *inbuildBaseFunc
  740. }
  741. /*
  742. Run executes this function.
  743. */
  744. func (ct *setCronTrigger) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  745. var res interface{}
  746. err := fmt.Errorf("Need a cronspec, an event name and an event scope as parameters")
  747. if len(args) > 2 {
  748. var cs *timeutil.CronSpec
  749. cronspec := fmt.Sprint(args[0])
  750. eventname := fmt.Sprint(args[1])
  751. eventkind := strings.Split(fmt.Sprint(args[2]), ".")
  752. erp := is["erp"].(*ECALRuntimeProvider)
  753. proc := erp.Processor
  754. if proc.Stopped() {
  755. proc.Start()
  756. }
  757. if cs, err = timeutil.NewCronSpec(cronspec); err == nil {
  758. res = cs.String()
  759. tick := 0
  760. erp.Cron.RegisterSpec(cs, func() {
  761. tick++
  762. now := erp.Cron.NowFunc()
  763. event := engine.NewEvent(eventname, eventkind, map[interface{}]interface{}{
  764. "time": now,
  765. "timestamp": fmt.Sprintf("%d", now.UnixNano()/int64(time.Millisecond)),
  766. "tick": float64(tick),
  767. })
  768. monitor := proc.NewRootMonitor(nil, nil)
  769. _, err := proc.AddEvent(event, monitor)
  770. if status := proc.Status(); status != "Stopped" && status != "Stopping" {
  771. errorutil.AssertTrue(err == nil,
  772. fmt.Sprintf("Could not add cron event for trigger %v %v %v: %v",
  773. cronspec, eventname, eventkind, err))
  774. }
  775. })
  776. }
  777. }
  778. return res, err
  779. }
  780. /*
  781. DocString returns a descriptive string.
  782. */
  783. func (ct *setCronTrigger) DocString() (string, error) {
  784. return "Adds a periodic cron job which fires events.", nil
  785. }
  786. // setPulseTrigger
  787. // ==============
  788. /*
  789. setPulseTrigger adds recurring events in very short intervals.
  790. */
  791. type setPulseTrigger struct {
  792. *inbuildBaseFunc
  793. }
  794. /*
  795. Run executes this function.
  796. */
  797. func (pt *setPulseTrigger) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
  798. err := fmt.Errorf("Need micro second interval, an event name and an event scope as parameters")
  799. if len(args) > 2 {
  800. var micros float64
  801. micros, err = pt.AssertNumParam(1, args[0])
  802. if err == nil {
  803. eventname := fmt.Sprint(args[1])
  804. eventkind := strings.Split(fmt.Sprint(args[2]), ".")
  805. erp := is["erp"].(*ECALRuntimeProvider)
  806. proc := erp.Processor
  807. if proc.Stopped() {
  808. proc.Start()
  809. }
  810. tick := 0
  811. go func() {
  812. var lastmicros int64
  813. for {
  814. time.Sleep(time.Duration(micros) * time.Microsecond)
  815. tick++
  816. now := time.Now()
  817. micros := now.UnixNano() / int64(time.Microsecond)
  818. event := engine.NewEvent(eventname, eventkind, map[interface{}]interface{}{
  819. "currentMicros": float64(micros),
  820. "lastMicros": float64(lastmicros),
  821. "timestamp": fmt.Sprintf("%d", now.UnixNano()/int64(time.Microsecond)),
  822. "tick": float64(tick),
  823. })
  824. lastmicros = micros
  825. monitor := proc.NewRootMonitor(nil, nil)
  826. _, err := proc.AddEventAndWait(event, monitor)
  827. if status := proc.Status(); status == "Stopped" || status == "Stopping" {
  828. break
  829. }
  830. errorutil.AssertTrue(err == nil,
  831. fmt.Sprintf("Could not add pulse event for trigger %v %v %v: %v",
  832. micros, eventname, eventkind, err))
  833. }
  834. }()
  835. }
  836. }
  837. return nil, err
  838. }
  839. /*
  840. DocString returns a descriptive string.
  841. */
  842. func (pt *setPulseTrigger) DocString() (string, error) {
  843. return "Adds recurring events in microsecond intervals.", nil
  844. }