processor_test.go 19 KB


  1. /*
  2. * ECAL
  3. *
  4. * Copyright 2020 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. package engine
  11. import (
  12. "bytes"
  13. "errors"
  14. "fmt"
  15. "os"
  16. "sync"
  17. "testing"
  18. "time"
  19. "devt.de/krotik/common/errorutil"
  20. "devt.de/krotik/ecal/engine/pool"
  21. )
  22. func TestProcessorSimpleCascade(t *testing.T) {
  23. UnitTestResetIDs()
  24. // Add debug logging
  25. var debugBuffer bytes.Buffer
  26. EventTracer.out = &debugBuffer
  27. EventTracer.MonitorEvent("core.*", map[interface{}]interface{}{
  28. "foo": "bar",
  29. "foo2": nil,
  30. })
  31. EventTracer.MonitorEvent("core.*", map[interface{}]interface{}{
  32. "foo2": "test",
  33. })
  34. defer func() {
  35. EventTracer.Reset()
  36. EventTracer.out = os.Stdout
  37. }()
  38. // Do the normal testing
  39. var log bytes.Buffer
  40. proc := NewProcessor(1)
  41. if res := fmt.Sprint(proc); res != "RumbleProcessor 1 (workers:1)" {
  42. t.Error("Unexpected result:", res)
  43. return
  44. }
  45. // Add rules to the processor
  46. rule1 := &Rule{
  47. "TestRule1", // Name
  48. "", // Description
  49. []string{"core.main.event1"}, // Kind match
  50. []string{"data"}, // Match on event cascade scope
  51. nil, // No state match
  52. 2, // Priority of the rule
  53. []string{"TestRule3", "TestRule3Copy"}, // List of suppressed rules by this rule
  54. func(p Processor, m Monitor, e *Event, tid uint64) error { // Action of the rule
  55. log.WriteString("TestRule1\n")
  56. // Add another event
  57. p.AddEvent(&Event{
  58. "InitialEvent",
  59. []string{"core", "main", "event2"},
  60. map[interface{}]interface{}{
  61. "foo": "bar",
  62. "foo2": "bla",
  63. },
  64. }, m.NewChildMonitor(1))
  65. return nil
  66. },
  67. }
  68. rule2 := &Rule{
  69. "TestRule2", // Name
  70. "", // Description
  71. []string{"core.main.*"}, // Kind match
  72. []string{"data.read"}, // Match on event cascade scope
  73. nil, // No state match
  74. 5, // Priority of the rule
  75. nil, // List of suppressed rules by this rule
  76. func(p Processor, m Monitor, e *Event, tid uint64) error { // Action of the rule
  77. log.WriteString("TestRule2\n")
  78. return nil
  79. },
  80. }
  81. rule3 := &Rule{
  82. "TestRule3", // Name
  83. "", // Description
  84. []string{"core.main.*"}, // Kind match
  85. []string{"data.read"}, // Match on event cascade scope
  86. nil, // No state match
  87. 0, // Priority of the rule
  88. nil, // List of suppressed rules by this rule
  89. func(p Processor, m Monitor, e *Event, tid uint64) error { // Action of the rule
  90. log.WriteString("TestRule3\n")
  91. return nil
  92. },
  93. }
  94. proc.AddRule(rule1)
  95. proc.AddRule(rule2)
  96. proc.AddRule(rule3)
  97. if r := len(proc.Rules()); r != 3 {
  98. t.Error("Unexpected rule number:", r)
  99. return
  100. }
  101. // Start the processor
  102. proc.Start()
  103. // Push a root event
  104. e := NewEvent(
  105. "InitialEvent",
  106. []string{"core", "main", "event1"},
  107. map[interface{}]interface{}{
  108. "foo": "bar",
  109. "foo2": "bla",
  110. },
  111. )
  112. if e.Name() != e.name || e.Kind() == nil || e.State() == nil {
  113. t.Error("Unepxected getter result:", e)
  114. return
  115. }
  116. rootm := proc.NewRootMonitor(nil, nil)
  117. rootm.SetFinishHandler(func(p Processor) {
  118. log.WriteString("finished!")
  119. })
  120. proc.AddEvent(e, rootm)
  121. if err := proc.AddRule(rule3); err.Error() != "Cannot add rule if the processor has not stopped" {
  122. t.Error("Unexpected error:", err)
  123. return
  124. }
  125. if err := proc.Reset(); err.Error() != "Cannot reset processor if it has not stopped" {
  126. t.Error("Unexpected error:", err)
  127. return
  128. }
  129. // Finish the processor
  130. proc.Finish()
  131. // Finish the processor
  132. // Rule 1, 2 and 3 trigger on event1 but rule 3 is suppressed by rule 1
  133. // Rule 1 adds a new event which triggers only rule 2 and 3
  134. // Rule 3 comes first since it has the higher priority
  135. if log.String() != `TestRule1
  136. TestRule2
  137. TestRule3
  138. TestRule2
  139. finished!` {
  140. t.Error("Unexpected result:", log.String())
  141. return
  142. }
  143. log.Reset()
  144. if err := proc.AddRule(rule3.CopyAs("TestRule3Copy")); err != nil {
  145. t.Error("Unexpected error:", err)
  146. return
  147. }
  148. // Start the processor
  149. proc.Start()
  150. // Push a root event
  151. proc.AddEvent(&Event{
  152. "InitialEvent",
  153. []string{"core", "main", "event1"},
  154. nil,
  155. }, nil)
  156. // Finish the processor
  157. proc.Finish()
  158. if log.String() != `TestRule1
  159. TestRule2
  160. TestRule3
  161. TestRule3
  162. TestRule2
  163. ` {
  164. t.Error("Unexpected result:", log.String())
  165. return
  166. }
  167. // Test the case when the event is pointless
  168. log.Reset()
  169. proc.Start()
  170. proc.AddEvent(&Event{
  171. "InitialEventFoo",
  172. []string{"core", "foo", "event1"},
  173. nil,
  174. }, nil)
  175. rm := proc.NewRootMonitor(nil, nil)
  176. proc.AddEvent(&Event{
  177. "InitialEventFoo",
  178. []string{"core", "foo", "event1"},
  179. nil,
  180. }, rm)
  181. if !rm.IsFinished() {
  182. t.Error("Monitor which monitored a non-triggering event should still finished")
  183. return
  184. }
  185. proc.Finish()
  186. if log.String() != "" {
  187. t.Error("Unexpected result:", log.String())
  188. return
  189. }
  190. proc.Reset()
  191. if r := len(proc.Rules()); r != 0 {
  192. t.Error("Unexpected rule number:", r)
  193. return
  194. }
  195. if debugBuffer.String() == "" {
  196. t.Error("Nothing was recorded in the debug buffer")
  197. return
  198. }
  199. }
  200. func TestProcessorSimplePriorities(t *testing.T) {
  201. UnitTestResetIDs()
  202. var logLock = sync.Mutex{}
  203. var log bytes.Buffer
  204. testPriorities := func(p1, p2 int) int {
  205. proc := NewProcessor(2)
  206. // Add rules to the processor
  207. rule1 := &Rule{
  208. "TestRule1", // Name
  209. "", // Description
  210. []string{"core.main.event1"}, // Kind match
  211. []string{"data"}, // Match on event cascade scope
  212. nil, // No state match
  213. 0, // Priority of the rule
  214. nil, // List of suppressed rules by this rule
  215. func(p Processor, m Monitor, e *Event, tid uint64) error { // Action of the rule
  216. logLock.Lock()
  217. log.WriteString("TestRule1\n")
  218. logLock.Unlock()
  219. time.Sleep(2 * time.Millisecond)
  220. return nil
  221. },
  222. }
  223. rule2 := &Rule{
  224. "TestRule2", // Name
  225. "", // Description
  226. []string{"core.main.event2"}, // Kind match
  227. []string{"data"}, // Match on event cascade scope
  228. nil, // No state match
  229. 0, // Priority of the rule
  230. nil, // List of suppressed rules by this rule
  231. func(p Processor, m Monitor, e *Event, tid uint64) error { // Action of the rule
  232. logLock.Lock()
  233. log.WriteString("TestRule2\n")
  234. logLock.Unlock()
  235. time.Sleep(2 * time.Millisecond)
  236. return nil
  237. },
  238. }
  239. proc.AddRule(rule1)
  240. proc.AddRule(rule2)
  241. proc.Start()
  242. m := proc.NewRootMonitor(nil, nil)
  243. // Push a root event
  244. for i := 0; i < 3; i++ {
  245. proc.AddEvent(&Event{
  246. "InitialEvent1",
  247. []string{"core", "main", "event1"},
  248. nil,
  249. }, m.NewChildMonitor(p1))
  250. }
  251. proc.AddEvent(&Event{
  252. "InitialEvent2",
  253. []string{"core", "main", "event2"},
  254. nil,
  255. }, m.NewChildMonitor(p2))
  256. proc.AddEvent(&Event{
  257. "InitialEvent1",
  258. []string{"core", "main", "event1"},
  259. nil,
  260. }, m.NewChildMonitor(p1))
  261. hp := m.HighestPriority()
  262. // Finish the processor
  263. proc.Finish()
  264. errorutil.AssertTrue(m.HighestPriority() == -1,
  265. "Highest priority should be -1 once a monitor has finished")
  266. return hp
  267. }
  268. // Since rule 1 has the higher priority it is more likely to be
  269. // executed
  270. if res := testPriorities(3, 5); res != 3 {
  271. t.Error("Unexpected highest priority:", res)
  272. return
  273. }
  274. if log.String() != `TestRule1
  275. TestRule1
  276. TestRule1
  277. TestRule1
  278. TestRule2
  279. ` && log.String() != `TestRule1
  280. TestRule1
  281. TestRule1
  282. TestRule2
  283. TestRule1
  284. ` {
  285. t.Error("Unexpected result:", log.String())
  286. return
  287. }
  288. log.Reset()
  289. // Since rule 2 has the higher priority it is more likely to be
  290. // executed
  291. if res := testPriorities(5, 2); res != 2 {
  292. t.Error("Unexpected highest priority:", res)
  293. return
  294. }
  295. if log.String() != `TestRule2
  296. TestRule1
  297. TestRule1
  298. TestRule1
  299. TestRule1
  300. ` && log.String() != `TestRule1
  301. TestRule2
  302. TestRule1
  303. TestRule1
  304. TestRule1
  305. ` && log.String() != `TestRule1
  306. TestRule1
  307. TestRule2
  308. TestRule1
  309. TestRule1
  310. ` {
  311. t.Error("Unexpected result:", log.String())
  312. return
  313. }
  314. }
  315. func TestProcessorScopeHandling(t *testing.T) {
  316. UnitTestResetIDs()
  317. var logLock = sync.Mutex{}
  318. var log bytes.Buffer
  319. proc := NewProcessor(10)
  320. // Add rules to the processor
  321. rule1 := &Rule{
  322. "TestRule1", // Name
  323. "", // Description
  324. []string{"core.main.*"}, // Kind match
  325. []string{"data.write"}, // Match on event cascade scope
  326. nil, // No state match
  327. 0, // Priority of the rule
  328. nil, // List of suppressed rules by this rule
  329. func(p Processor, m Monitor, e *Event, tid uint64) error { // Action of the rule
  330. logLock.Lock()
  331. log.WriteString("TestRule1\n")
  332. logLock.Unlock()
  333. time.Sleep(2 * time.Millisecond)
  334. return nil
  335. },
  336. }
  337. rule2 := &Rule{
  338. "TestRule2", // Name
  339. "", // Description
  340. []string{"core.main.*"}, // Kind match
  341. []string{"data"}, // Match on event cascade scope
  342. nil, // No state match
  343. 0, // Priority of the rule
  344. nil, // List of suppressed rules by this rule
  345. func(p Processor, m Monitor, e *Event, tid uint64) error { // Action of the rule
  346. logLock.Lock()
  347. log.WriteString("TestRule2\n")
  348. logLock.Unlock()
  349. time.Sleep(2 * time.Millisecond)
  350. return nil
  351. },
  352. }
  353. proc.AddRule(rule1)
  354. proc.AddRule(rule2)
  355. if proc.Status() != pool.StatusStopped || !proc.Stopped() {
  356. t.Error("Unexpected status:", proc.Status(), proc.Stopped())
  357. return
  358. }
  359. proc.Start()
  360. if proc.Status() != pool.StatusRunning || proc.Stopped() {
  361. t.Error("Unexpected status:", proc.Status(), proc.Stopped())
  362. return
  363. }
  364. scope1 := NewRuleScope(map[string]bool{
  365. "data": true,
  366. "data.read": true,
  367. "data.write": false,
  368. })
  369. m := proc.NewRootMonitor(nil, scope1)
  370. // Push a root event
  371. proc.AddEvent(&Event{
  372. "InitialEvent",
  373. []string{"core", "main", "event1"},
  374. nil,
  375. }, m)
  376. // Finish the processor
  377. proc.Finish()
  378. // Only rule 2 should trigger since the monitor has only access
  379. // to data and data.read
  380. if log.String() != `TestRule2
  381. ` {
  382. t.Error("Unexpected result:", log.String())
  383. return
  384. }
  385. log.Reset()
  386. proc.Start()
  387. scope2 := NewRuleScope(map[string]bool{
  388. "data": true,
  389. "data.read": true,
  390. "data.write": true,
  391. })
  392. m = proc.NewRootMonitor(nil, scope2)
  393. // Push a root event
  394. proc.AddEvent(&Event{
  395. "InitialEvent",
  396. []string{"core", "main", "event1"},
  397. nil,
  398. }, m)
  399. // Finish the processor
  400. proc.Finish()
  401. // Now both rules should trigger
  402. if log.String() != `TestRule1
  403. TestRule2
  404. ` {
  405. t.Error("Unexpected result:", log.String())
  406. return
  407. }
  408. }
  409. func TestProcessorStateMatching(t *testing.T) {
  410. UnitTestResetIDs()
  411. var logLock = sync.Mutex{}
  412. var log bytes.Buffer
  413. proc := NewProcessor(10)
  414. if res := proc.Workers(); res != 10 {
  415. t.Error("Unexpected number of workers:", res)
  416. return
  417. }
  418. // Add rules to the processor
  419. rule1 := &Rule{
  420. "TestRule1", // Name
  421. "", // Description
  422. []string{"core.main.*"}, // Kind match
  423. []string{"data"}, // Match on event cascade scope
  424. map[string]interface{}{"name": nil, "test": 1}, // Simple state match
  425. 0, // Priority of the rule
  426. nil, // List of suppressed rules by this rule
  427. func(p Processor, m Monitor, e *Event, tid uint64) error { // Action of the rule
  428. logLock.Lock()
  429. log.WriteString("TestRule1\n")
  430. logLock.Unlock()
  431. time.Sleep(2 * time.Millisecond)
  432. return nil
  433. },
  434. }
  435. rule2 := &Rule{
  436. "TestRule2", // Name
  437. "", // Description
  438. []string{"core.main.*"}, // Kind match
  439. []string{"data"}, // Match on event cascade scope
  440. map[string]interface{}{"name": nil, "test": "123"}, // Simple state match
  441. 0, // Priority of the rule
  442. nil, // List of suppressed rules by this rule
  443. func(p Processor, m Monitor, e *Event, tid uint64) error { // Action of the rule
  444. logLock.Lock()
  445. log.WriteString("TestRule2\n")
  446. logLock.Unlock()
  447. time.Sleep(2 * time.Millisecond)
  448. return nil
  449. },
  450. }
  451. proc.AddRule(rule1)
  452. proc.AddRule(rule2)
  453. proc.Start()
  454. // Push a root event
  455. proc.AddEvent(&Event{
  456. "InitialEvent",
  457. []string{"core", "main", "event1"},
  458. map[interface{}]interface{}{"name": "foo", "test": "123"},
  459. }, nil)
  460. proc.Finish()
  461. if log.String() != `TestRule2
  462. ` {
  463. t.Error("Unexpected result:", log.String())
  464. return
  465. }
  466. proc.Start()
  467. proc.AddEvent(&Event{
  468. "InitialEvent",
  469. []string{"core", "main", "event1"},
  470. map[interface{}]interface{}{"name": nil, "test": 1, "foobar": 123},
  471. }, nil)
  472. proc.AddEvent(&Event{
  473. "InitialEvent",
  474. []string{"core", "main", "event1"},
  475. map[interface{}]interface{}{"name": "bar", "test": 1},
  476. }, nil)
  477. // The following rule should not trigger as it is missing name
  478. proc.AddEvent(&Event{
  479. "InitialEvent",
  480. []string{"core", "main", "event1"},
  481. map[interface{}]interface{}{"foobar": nil, "test": "123"},
  482. }, nil)
  483. proc.Finish()
  484. if log.String() != `TestRule2
  485. TestRule1
  486. TestRule1
  487. ` {
  488. t.Error("Unexpected result:", log.String())
  489. return
  490. }
  491. }
  492. func TestProcessorSimpleErrorHandling(t *testing.T) {
  493. UnitTestResetIDs()
  494. proc := NewProcessor(10)
  495. if proc.ThreadPool() == nil {
  496. t.Error("Should have a thread pool")
  497. return
  498. }
  499. // Add rules to the processor
  500. rule1 := &Rule{
  501. "TestRule1", // Name
  502. "", // Description
  503. []string{"core.main.event1"}, // Kind match
  504. []string{"data"}, // Match on event cascade scope
  505. nil,
  506. 0, // Priority of the rule
  507. nil, // List of suppressed rules by this rule
  508. func(p Processor, m Monitor, e *Event, tid uint64) error { // Action of the rule
  509. p.AddEvent(&Event{
  510. "event2",
  511. []string{"core", "main", "event2"},
  512. nil,
  513. }, m.NewChildMonitor(1))
  514. return errors.New("testerror")
  515. },
  516. }
  517. rule2 := &Rule{
  518. "TestRule2", // Name
  519. "", // Description
  520. []string{"core.main.event2"}, // Kind match
  521. []string{"data"}, // Match on event cascade scope
  522. nil,
  523. 0, // Priority of the rule
  524. nil, // List of suppressed rules by this rule
  525. func(p Processor, m Monitor, e *Event, tid uint64) error { // Action of the rule
  526. p.AddEvent(&Event{
  527. "event3",
  528. []string{"core", "main", "event3"},
  529. nil,
  530. }, m.NewChildMonitor(1))
  531. return nil
  532. },
  533. }
  534. rule3 := &Rule{
  535. "TestRule3", // Name
  536. "", // Description
  537. []string{"core.main.event3", "core.main.event1"}, // Kind match
  538. []string{"data"}, // Match on event cascade scope
  539. nil,
  540. 0, // Priority of the rule
  541. nil, // List of suppressed rules by this rule
  542. func(p Processor, m Monitor, e *Event, tid uint64) error { // Action of the rule
  543. return errors.New("testerror2")
  544. },
  545. }
  546. // Add rule 1 twice
  547. proc.AddRule(rule1)
  548. proc.AddRule(rule1.CopyAs("TestRule1Copy"))
  549. proc.AddRule(rule2)
  550. proc.AddRule(rule3)
  551. recordedErrors := 0
  552. proc.SetRootMonitorErrorObserver(func(rm *RootMonitor) {
  553. recordedErrors = len(rm.AllErrors()[0].ErrorMap)
  554. })
  555. // First test will always execute all rules and collect all errors
  556. proc.SetFailOnFirstErrorInTriggerSequence(false)
  557. proc.Start()
  558. // Push a root event
  559. mon, err := proc.AddEvent(&Event{
  560. "InitialEvent",
  561. []string{"core", "main", "event1"},
  562. map[interface{}]interface{}{"name": "foo", "test": "123"},
  563. }, nil)
  564. rmon, ok := mon.(*RootMonitor)
  565. if !ok {
  566. t.Error("Root monitor expected:", mon, err)
  567. return
  568. }
  569. proc.Finish()
  570. if fmt.Sprint(mon) != "Monitor 1 (parent: <nil> priority: 0 activated: true finished: true)" {
  571. t.Error("Unexpected result:", mon)
  572. return
  573. }
  574. _, err = proc.AddEvent(&Event{}, nil)
  575. if err.Error() != "Cannot add event if the processor is not running" {
  576. t.Error("Unexpected error", err)
  577. return
  578. }
  579. // Two errors should have been collected
  580. errs := rmon.AllErrors()
  581. if len(errs) != 3 {
  582. t.Error("Unexpected number of errors:", len(errs))
  583. return
  584. }
  585. if recordedErrors != 3 {
  586. t.Error("Unexpected number of recorded errors:", recordedErrors)
  587. return
  588. }
  589. if fmt.Sprint(errs) != `[Taskerrors:
  590. InitialEvent -> TestRule1 : testerror
  591. InitialEvent -> TestRule1Copy : testerror
  592. InitialEvent -> TestRule3 : testerror2 Taskerror:
  593. InitialEvent -> event2 -> event3 -> TestRule3 : testerror2 Taskerror:
  594. InitialEvent -> event2 -> event3 -> TestRule3 : testerror2]` {
  595. t.Error("Unexpected errors:", errs)
  596. return
  597. }
  598. // Second test will fail on the first failed rule in an event trigger sequence
  599. proc.SetFailOnFirstErrorInTriggerSequence(true)
  600. proc.Start()
  601. mon, err = proc.AddEvent(&Event{
  602. "InitialEvent",
  603. []string{"core", "main", "event1"},
  604. map[interface{}]interface{}{"name": "foo", "test": "123"},
  605. }, nil)
  606. rmon, ok = mon.(*RootMonitor)
  607. if !ok {
  608. t.Error("Root monitor expected:", mon, err)
  609. return
  610. }
  611. proc.Finish()
  612. errs = rmon.AllErrors()
  613. if len(errs) != 2 {
  614. t.Error("Unexpected number of errors:", len(errs))
  615. return
  616. }
  617. if fmt.Sprint(errs) != `[Taskerror:
  618. InitialEvent -> TestRule1 : testerror Taskerror:
  619. InitialEvent -> event2 -> event3 -> TestRule3 : testerror2]` {
  620. t.Error("Unexpected errors:", errs)
  621. return
  622. }
  623. if recordedErrors != 1 {
  624. t.Error("Unexpected number of recorded errors:", recordedErrors)
  625. return
  626. }
  627. // Now test AddEventAndWait
  628. proc.SetFailOnFirstErrorInTriggerSequence(false)
  629. proc.Start()
  630. mon, err = proc.AddEventAndWait(&Event{
  631. "InitialEvent1",
  632. []string{"core", "main", "event5"},
  633. map[interface{}]interface{}{"name": "foo", "test": "123"},
  634. }, nil)
  635. if mon != nil {
  636. t.Error("Nothing should have triggered")
  637. return
  638. }
  639. // Push a root event
  640. mon, err = proc.AddEventAndWait(&Event{
  641. "InitialEvent",
  642. []string{"core", "main", "event1"},
  643. map[interface{}]interface{}{"name": "foo", "test": "123"},
  644. }, nil)
  645. rmon, ok = mon.(*RootMonitor)
  646. if !ok {
  647. t.Error("Root monitor expected:", mon, err)
  648. return
  649. }
  650. if fmt.Sprint(mon) != "Monitor 10 (parent: <nil> priority: 0 activated: true finished: true)" {
  651. t.Error("Unexpected result:", mon)
  652. return
  653. }
  654. if proc.Stopped() {
  655. t.Error("Processor should not be stopped at this point")
  656. return
  657. }
  658. errs = rmon.AllErrors()
  659. if len(errs) != 3 {
  660. t.Error("Unexpected number of errors:", len(errs))
  661. return
  662. }
  663. if recordedErrors != 3 {
  664. t.Error("Unexpected number of recorded errors:", recordedErrors)
  665. return
  666. }
  667. if fmt.Sprint(errs) != `[Taskerrors:
  668. InitialEvent -> TestRule1 : testerror
  669. InitialEvent -> TestRule1Copy : testerror
  670. InitialEvent -> TestRule3 : testerror2 Taskerror:
  671. InitialEvent -> event2 -> event3 -> TestRule3 : testerror2 Taskerror:
  672. InitialEvent -> event2 -> event3 -> TestRule3 : testerror2]` {
  673. t.Error("Unexpected errors:", errs)
  674. return
  675. }
  676. proc.Finish()
  677. }