processor_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. /*
  2. * ECAL
  3. *
  4. * Copyright 2020 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. package engine
  11. import (
  12. "bytes"
  13. "errors"
  14. "fmt"
  15. "os"
  16. "sync"
  17. "testing"
  18. "time"
  19. "devt.de/krotik/common/errorutil"
  20. "devt.de/krotik/common/pools"
  21. )
  22. func TestProcessorSimpleCascade(t *testing.T) {
  23. UnitTestResetIDs()
  24. // Add debug logging
  25. var debugBuffer bytes.Buffer
  26. EventTracer.out = &debugBuffer
  27. EventTracer.MonitorEvent("core.*", map[interface{}]interface{}{
  28. "foo": "bar",
  29. "foo2": nil,
  30. })
  31. EventTracer.MonitorEvent("core.*", map[interface{}]interface{}{
  32. "foo2": "test",
  33. })
  34. defer func() {
  35. EventTracer.Reset()
  36. EventTracer.out = os.Stdout
  37. }()
  38. // Do the normal testing
  39. var log bytes.Buffer
  40. proc := NewProcessor(1)
  41. if res := fmt.Sprint(proc); res != "RumbleProcessor 1 (workers:1)" {
  42. t.Error("Unexpected result:", res)
  43. return
  44. }
  45. // Add rules to the processor
  46. rule1 := &Rule{
  47. "TestRule1", // Name
  48. "", // Description
  49. []string{"core.main.event1"}, // Kind match
  50. []string{"data"}, // Match on event cascade scope
  51. nil, // No state match
  52. 2, // Priority of the rule
  53. []string{"TestRule3", "TestRule3Copy"}, // List of suppressed rules by this rule
  54. func(p Processor, m Monitor, e *Event) error { // Action of the rule
  55. log.WriteString("TestRule1\n")
  56. // Add another event
  57. p.AddEvent(&Event{
  58. "InitialEvent",
  59. []string{"core", "main", "event2"},
  60. map[interface{}]interface{}{
  61. "foo": "bar",
  62. "foo2": "bla",
  63. },
  64. }, m.NewChildMonitor(1))
  65. return nil
  66. },
  67. }
  68. rule2 := &Rule{
  69. "TestRule2", // Name
  70. "", // Description
  71. []string{"core.main.*"}, // Kind match
  72. []string{"data.read"}, // Match on event cascade scope
  73. nil, // No state match
  74. 5, // Priority of the rule
  75. nil, // List of suppressed rules by this rule
  76. func(p Processor, m Monitor, e *Event) error { // Action of the rule
  77. log.WriteString("TestRule2\n")
  78. return nil
  79. },
  80. }
  81. rule3 := &Rule{
  82. "TestRule3", // Name
  83. "", // Description
  84. []string{"core.main.*"}, // Kind match
  85. []string{"data.read"}, // Match on event cascade scope
  86. nil, // No state match
  87. 0, // Priority of the rule
  88. nil, // List of suppressed rules by this rule
  89. func(p Processor, m Monitor, e *Event) error { // Action of the rule
  90. log.WriteString("TestRule3\n")
  91. return nil
  92. },
  93. }
  94. proc.AddRule(rule1)
  95. proc.AddRule(rule2)
  96. proc.AddRule(rule3)
  97. if r := len(proc.Rules()); r != 3 {
  98. t.Error("Unexpected rule number:", r)
  99. return
  100. }
  101. // Start the processor
  102. proc.Start()
  103. // Push a root event
  104. e := NewEvent(
  105. "InitialEvent",
  106. []string{"core", "main", "event1"},
  107. map[interface{}]interface{}{
  108. "foo": "bar",
  109. "foo2": "bla",
  110. },
  111. )
  112. if e.Name() != e.name || e.Kind() == nil || e.State() == nil {
  113. t.Error("Unepxected getter result:", e)
  114. return
  115. }
  116. rootm := proc.NewRootMonitor(nil, nil)
  117. rootm.SetFinishHandler(func(p Processor) {
  118. log.WriteString("finished!")
  119. })
  120. proc.AddEvent(e, rootm)
  121. if err := proc.AddRule(rule3); err.Error() != "Cannot add rule if the processor has not stopped" {
  122. t.Error("Unexpected error:", err)
  123. return
  124. }
  125. if err := proc.Reset(); err.Error() != "Cannot reset processor if it has not stopped" {
  126. t.Error("Unexpected error:", err)
  127. return
  128. }
  129. // Finish the processor
  130. proc.Finish()
  131. // Finish the processor
  132. // Rule 1, 2 and 3 trigger on event1 but rule 3 is suppressed by rule 1
  133. // Rule 1 adds a new event which triggers only rule 2 and 3
  134. // Rule 3 comes first since it has the higher priority
  135. if log.String() != `TestRule1
  136. TestRule2
  137. TestRule3
  138. TestRule2
  139. finished!` {
  140. t.Error("Unexpected result:", log.String())
  141. return
  142. }
  143. log.Reset()
  144. if err := proc.AddRule(rule3.CopyAs("TestRule3Copy")); err != nil {
  145. t.Error("Unexpected error:", err)
  146. return
  147. }
  148. // Start the processor
  149. proc.Start()
  150. // Push a root event
  151. proc.AddEvent(&Event{
  152. "InitialEvent",
  153. []string{"core", "main", "event1"},
  154. nil,
  155. }, nil)
  156. // Finish the processor
  157. proc.Finish()
  158. if log.String() != `TestRule1
  159. TestRule2
  160. TestRule3
  161. TestRule3
  162. TestRule2
  163. ` {
  164. t.Error("Unexpected result:", log.String())
  165. return
  166. }
  167. // Test the case when the event is pointless
  168. log.Reset()
  169. proc.Start()
  170. proc.AddEvent(&Event{
  171. "InitialEventFoo",
  172. []string{"core", "foo", "event1"},
  173. nil,
  174. }, nil)
  175. rm := proc.NewRootMonitor(nil, nil)
  176. proc.AddEvent(&Event{
  177. "InitialEventFoo",
  178. []string{"core", "foo", "event1"},
  179. nil,
  180. }, rm)
  181. if !rm.IsFinished() {
  182. t.Error("Monitor which monitored a non-triggering event should still finished")
  183. return
  184. }
  185. proc.Finish()
  186. if log.String() != "" {
  187. t.Error("Unexpected result:", log.String())
  188. return
  189. }
  190. proc.Reset()
  191. if r := len(proc.Rules()); r != 0 {
  192. t.Error("Unexpected rule number:", r)
  193. return
  194. }
  195. if debugBuffer.String() == "" {
  196. t.Error("Nothing was recorded in the debug buffer")
  197. return
  198. }
  199. }
  200. func TestProcessorSimplePriorities(t *testing.T) {
  201. UnitTestResetIDs()
  202. var logLock = sync.Mutex{}
  203. var log bytes.Buffer
  204. testPriorities := func(p1, p2 int) int {
  205. proc := NewProcessor(2)
  206. // Add rules to the processor
  207. rule1 := &Rule{
  208. "TestRule1", // Name
  209. "", // Description
  210. []string{"core.main.event1"}, // Kind match
  211. []string{"data"}, // Match on event cascade scope
  212. nil, // No state match
  213. 0, // Priority of the rule
  214. nil, // List of suppressed rules by this rule
  215. func(p Processor, m Monitor, e *Event) error { // Action of the rule
  216. logLock.Lock()
  217. log.WriteString("TestRule1\n")
  218. logLock.Unlock()
  219. time.Sleep(2 * time.Millisecond)
  220. return nil
  221. },
  222. }
  223. rule2 := &Rule{
  224. "TestRule2", // Name
  225. "", // Description
  226. []string{"core.main.event2"}, // Kind match
  227. []string{"data"}, // Match on event cascade scope
  228. nil, // No state match
  229. 0, // Priority of the rule
  230. nil, // List of suppressed rules by this rule
  231. func(p Processor, m Monitor, e *Event) error { // Action of the rule
  232. logLock.Lock()
  233. log.WriteString("TestRule2\n")
  234. logLock.Unlock()
  235. time.Sleep(2 * time.Millisecond)
  236. return nil
  237. },
  238. }
  239. proc.AddRule(rule1)
  240. proc.AddRule(rule2)
  241. proc.Start()
  242. m := proc.NewRootMonitor(nil, nil)
  243. // Push a root event
  244. for i := 0; i < 3; i++ {
  245. proc.AddEvent(&Event{
  246. "InitialEvent1",
  247. []string{"core", "main", "event1"},
  248. nil,
  249. }, m.NewChildMonitor(p1))
  250. }
  251. proc.AddEvent(&Event{
  252. "InitialEvent2",
  253. []string{"core", "main", "event2"},
  254. nil,
  255. }, m.NewChildMonitor(p2))
  256. proc.AddEvent(&Event{
  257. "InitialEvent1",
  258. []string{"core", "main", "event1"},
  259. nil,
  260. }, m.NewChildMonitor(p1))
  261. hp := m.HighestPriority()
  262. // Finish the processor
  263. proc.Finish()
  264. errorutil.AssertTrue(m.HighestPriority() == -1,
  265. "Highest priority should be -1 once a monitor has finished")
  266. return hp
  267. }
  268. // Since rule 1 has the higher priority it is more likely to be
  269. // executed
  270. if res := testPriorities(3, 5); res != 3 {
  271. t.Error("Unexpected highest priority:", res)
  272. return
  273. }
  274. if log.String() != `TestRule1
  275. TestRule1
  276. TestRule1
  277. TestRule1
  278. TestRule2
  279. ` && log.String() != `TestRule1
  280. TestRule1
  281. TestRule1
  282. TestRule2
  283. TestRule1
  284. ` {
  285. t.Error("Unexpected result:", log.String())
  286. return
  287. }
  288. log.Reset()
  289. // Since rule 2 has the higher priority it is more likely to be
  290. // executed
  291. if res := testPriorities(5, 2); res != 2 {
  292. t.Error("Unexpected highest priority:", res)
  293. return
  294. }
  295. if log.String() != `TestRule2
  296. TestRule1
  297. TestRule1
  298. TestRule1
  299. TestRule1
  300. ` && log.String() != `TestRule1
  301. TestRule2
  302. TestRule1
  303. TestRule1
  304. TestRule1
  305. ` && log.String() != `TestRule1
  306. TestRule1
  307. TestRule2
  308. TestRule1
  309. TestRule1
  310. ` {
  311. t.Error("Unexpected result:", log.String())
  312. return
  313. }
  314. }
  315. func TestProcessorScopeHandling(t *testing.T) {
  316. UnitTestResetIDs()
  317. var logLock = sync.Mutex{}
  318. var log bytes.Buffer
  319. proc := NewProcessor(10)
  320. // Add rules to the processor
  321. rule1 := &Rule{
  322. "TestRule1", // Name
  323. "", // Description
  324. []string{"core.main.*"}, // Kind match
  325. []string{"data.write"}, // Match on event cascade scope
  326. nil, // No state match
  327. 0, // Priority of the rule
  328. nil, // List of suppressed rules by this rule
  329. func(p Processor, m Monitor, e *Event) error { // Action of the rule
  330. logLock.Lock()
  331. log.WriteString("TestRule1\n")
  332. logLock.Unlock()
  333. time.Sleep(2 * time.Millisecond)
  334. return nil
  335. },
  336. }
  337. rule2 := &Rule{
  338. "TestRule2", // Name
  339. "", // Description
  340. []string{"core.main.*"}, // Kind match
  341. []string{"data"}, // Match on event cascade scope
  342. nil, // No state match
  343. 0, // Priority of the rule
  344. nil, // List of suppressed rules by this rule
  345. func(p Processor, m Monitor, e *Event) error { // Action of the rule
  346. logLock.Lock()
  347. log.WriteString("TestRule2\n")
  348. logLock.Unlock()
  349. time.Sleep(2 * time.Millisecond)
  350. return nil
  351. },
  352. }
  353. proc.AddRule(rule1)
  354. proc.AddRule(rule2)
  355. if proc.Status() != pools.StatusStopped || !proc.Stopped() {
  356. t.Error("Unexpected status:", proc.Status(), proc.Stopped())
  357. return
  358. }
  359. proc.Start()
  360. if proc.Status() != pools.StatusRunning || proc.Stopped() {
  361. t.Error("Unexpected status:", proc.Status(), proc.Stopped())
  362. return
  363. }
  364. scope1 := NewRuleScope(map[string]bool{
  365. "data": true,
  366. "data.read": true,
  367. "data.write": false,
  368. })
  369. m := proc.NewRootMonitor(nil, scope1)
  370. // Push a root event
  371. proc.AddEvent(&Event{
  372. "InitialEvent",
  373. []string{"core", "main", "event1"},
  374. nil,
  375. }, m)
  376. // Finish the processor
  377. proc.Finish()
  378. // Only rule 2 should trigger since the monitor has only access
  379. // to data and data.read
  380. if log.String() != `TestRule2
  381. ` {
  382. t.Error("Unexpected result:", log.String())
  383. return
  384. }
  385. log.Reset()
  386. proc.Start()
  387. scope2 := NewRuleScope(map[string]bool{
  388. "data": true,
  389. "data.read": true,
  390. "data.write": true,
  391. })
  392. m = proc.NewRootMonitor(nil, scope2)
  393. // Push a root event
  394. proc.AddEvent(&Event{
  395. "InitialEvent",
  396. []string{"core", "main", "event1"},
  397. nil,
  398. }, m)
  399. // Finish the processor
  400. proc.Finish()
  401. // Now both rules should trigger
  402. if log.String() != `TestRule1
  403. TestRule2
  404. ` {
  405. t.Error("Unexpected result:", log.String())
  406. return
  407. }
  408. }
  409. func TestProcessorStateMatching(t *testing.T) {
  410. UnitTestResetIDs()
  411. var logLock = sync.Mutex{}
  412. var log bytes.Buffer
  413. proc := NewProcessor(10)
  414. if res := proc.Workers(); res != 10 {
  415. t.Error("Unexpected number of workers:", res)
  416. return
  417. }
  418. // Add rules to the processor
  419. rule1 := &Rule{
  420. "TestRule1", // Name
  421. "", // Description
  422. []string{"core.main.*"}, // Kind match
  423. []string{"data"}, // Match on event cascade scope
  424. map[string]interface{}{"name": nil, "test": 1}, // Simple state match
  425. 0, // Priority of the rule
  426. nil, // List of suppressed rules by this rule
  427. func(p Processor, m Monitor, e *Event) error { // Action of the rule
  428. logLock.Lock()
  429. log.WriteString("TestRule1\n")
  430. logLock.Unlock()
  431. time.Sleep(2 * time.Millisecond)
  432. return nil
  433. },
  434. }
  435. rule2 := &Rule{
  436. "TestRule2", // Name
  437. "", // Description
  438. []string{"core.main.*"}, // Kind match
  439. []string{"data"}, // Match on event cascade scope
  440. map[string]interface{}{"name": nil, "test": "123"}, // Simple state match
  441. 0, // Priority of the rule
  442. nil, // List of suppressed rules by this rule
  443. func(p Processor, m Monitor, e *Event) error { // Action of the rule
  444. logLock.Lock()
  445. log.WriteString("TestRule2\n")
  446. logLock.Unlock()
  447. time.Sleep(2 * time.Millisecond)
  448. return nil
  449. },
  450. }
  451. proc.AddRule(rule1)
  452. proc.AddRule(rule2)
  453. proc.Start()
  454. // Push a root event
  455. proc.AddEvent(&Event{
  456. "InitialEvent",
  457. []string{"core", "main", "event1"},
  458. map[interface{}]interface{}{"name": "foo", "test": "123"},
  459. }, nil)
  460. proc.Finish()
  461. if log.String() != `TestRule2
  462. ` {
  463. t.Error("Unexpected result:", log.String())
  464. return
  465. }
  466. proc.Start()
  467. proc.AddEvent(&Event{
  468. "InitialEvent",
  469. []string{"core", "main", "event1"},
  470. map[interface{}]interface{}{"name": nil, "test": 1, "foobar": 123},
  471. }, nil)
  472. proc.AddEvent(&Event{
  473. "InitialEvent",
  474. []string{"core", "main", "event1"},
  475. map[interface{}]interface{}{"name": "bar", "test": 1},
  476. }, nil)
  477. // The following rule should not trigger as it is missing name
  478. proc.AddEvent(&Event{
  479. "InitialEvent",
  480. []string{"core", "main", "event1"},
  481. map[interface{}]interface{}{"foobar": nil, "test": "123"},
  482. }, nil)
  483. proc.Finish()
  484. if log.String() != `TestRule2
  485. TestRule1
  486. TestRule1
  487. ` {
  488. t.Error("Unexpected result:", log.String())
  489. return
  490. }
  491. }
  492. func TestProcessorSimpleErrorHandling(t *testing.T) {
  493. UnitTestResetIDs()
  494. proc := NewProcessor(10)
  495. // Add rules to the processor
  496. rule1 := &Rule{
  497. "TestRule1", // Name
  498. "", // Description
  499. []string{"core.main.event1"}, // Kind match
  500. []string{"data"}, // Match on event cascade scope
  501. nil,
  502. 0, // Priority of the rule
  503. nil, // List of suppressed rules by this rule
  504. func(p Processor, m Monitor, e *Event) error { // Action of the rule
  505. p.AddEvent(&Event{
  506. "event2",
  507. []string{"core", "main", "event2"},
  508. nil,
  509. }, m.NewChildMonitor(1))
  510. return errors.New("testerror")
  511. },
  512. }
  513. rule2 := &Rule{
  514. "TestRule2", // Name
  515. "", // Description
  516. []string{"core.main.event2"}, // Kind match
  517. []string{"data"}, // Match on event cascade scope
  518. nil,
  519. 0, // Priority of the rule
  520. nil, // List of suppressed rules by this rule
  521. func(p Processor, m Monitor, e *Event) error { // Action of the rule
  522. p.AddEvent(&Event{
  523. "event3",
  524. []string{"core", "main", "event3"},
  525. nil,
  526. }, m.NewChildMonitor(1))
  527. return nil
  528. },
  529. }
  530. rule3 := &Rule{
  531. "TestRule3", // Name
  532. "", // Description
  533. []string{"core.main.event3", "core.main.event1"}, // Kind match
  534. []string{"data"}, // Match on event cascade scope
  535. nil,
  536. 0, // Priority of the rule
  537. nil, // List of suppressed rules by this rule
  538. func(p Processor, m Monitor, e *Event) error { // Action of the rule
  539. return errors.New("testerror2")
  540. },
  541. }
  542. // Add rule 1 twice
  543. proc.AddRule(rule1)
  544. proc.AddRule(rule1.CopyAs("TestRule1Copy"))
  545. proc.AddRule(rule2)
  546. proc.AddRule(rule3)
  547. recordedErrors := 0
  548. proc.SetRootMonitorErrorObserver(func(rm *RootMonitor) {
  549. recordedErrors = len(rm.AllErrors()[0].ErrorMap)
  550. })
  551. proc.Start()
  552. // Push a root event
  553. mon, err := proc.AddEvent(&Event{
  554. "InitialEvent",
  555. []string{"core", "main", "event1"},
  556. map[interface{}]interface{}{"name": "foo", "test": "123"},
  557. }, nil)
  558. rmon, ok := mon.(*RootMonitor)
  559. if !ok {
  560. t.Error("Root monitor expected:", mon, err)
  561. return
  562. }
  563. proc.Finish()
  564. if fmt.Sprint(mon) != "Monitor 1 (parent: <nil> priority: 0 activated: true finished: true)" {
  565. t.Error("Unexpected result:", mon)
  566. return
  567. }
  568. _, err = proc.AddEvent(&Event{}, nil)
  569. if err.Error() != "Cannot add event if the processor is not running" {
  570. t.Error("Unexpected error", err)
  571. return
  572. }
  573. // Two errors should have been collected
  574. errs := rmon.AllErrors()
  575. if len(errs) != 3 {
  576. t.Error("Unexpected number of errors:", len(errs))
  577. return
  578. }
  579. if recordedErrors != 3 {
  580. t.Error("Unexpected number of recorded errors:", recordedErrors)
  581. return
  582. }
  583. if fmt.Sprint(errs) != `[Taskerrors:
  584. InitialEvent -> TestRule1 : testerror
  585. InitialEvent -> TestRule1Copy : testerror
  586. InitialEvent -> TestRule3 : testerror2 Taskerror:
  587. InitialEvent -> event2 -> event3 -> TestRule3 : testerror2 Taskerror:
  588. InitialEvent -> event2 -> event3 -> TestRule3 : testerror2]` {
  589. t.Error("Unexpected errors:", errs)
  590. return
  591. }
  592. }