eventbridge.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. /*
  11. Package ecal contains the main API for the event condition action language (ECAL).
  12. */
  13. package ecal
  14. import (
  15. "fmt"
  16. "strings"
  17. "devt.de/krotik/common/errorutil"
  18. "devt.de/krotik/ecal/engine"
  19. "devt.de/krotik/ecal/scope"
  20. "devt.de/krotik/ecal/util"
  21. "devt.de/krotik/eliasdb/graph"
  22. "devt.de/krotik/eliasdb/graph/data"
  23. )
  24. /*
  25. EventMapping is a mapping between EliasDB event types to EliasDB specific event kinds in ECAL.
  26. */
  27. var EventMapping = map[int]string{
  28. /*
  29. EventNodeCreated is thrown when a node was created.
  30. Parameters: partition of created node, created node
  31. */
  32. graph.EventNodeCreated: "db.node.created",
  33. /*
  34. EventNodeUpdated is thrown when a node was updated.
  35. Parameters: partition of updated node, updated node, old node
  36. */
  37. graph.EventNodeUpdated: "db.node.updated",
  38. /*
  39. EventNodeDeleted is thrown when a node was deleted.
  40. Parameters: partition of deleted node, deleted node
  41. */
  42. graph.EventNodeDeleted: "db.node.deleted",
  43. /*
  44. EventEdgeCreated is thrown when an edge was created.
  45. Parameters: partition of created edge, created edge
  46. */
  47. graph.EventEdgeCreated: "db.edge.created",
  48. /*
  49. EventEdgeUpdated is thrown when an edge was updated.
  50. Parameters: partition of updated edge, updated edge, old edge
  51. */
  52. graph.EventEdgeUpdated: "db.edge.updated",
  53. /*
  54. EventEdgeDeleted is thrown when an edge was deleted.
  55. Parameters: partition of deleted edge, deleted edge
  56. */
  57. graph.EventEdgeDeleted: "db.edge.deleted",
  58. /*
  59. EventNodeStore is thrown before a node is stored (always overwriting existing values).
  60. Parameters: partition of node to store, node to store
  61. */
  62. graph.EventNodeStore: "db.node.store",
  63. /*
  64. EventNodeUpdate is thrown before a node is updated.
  65. Parameters: partition of node to update, node to update
  66. */
  67. graph.EventNodeUpdate: "db.node.update",
  68. /*
  69. EventNodeDelete is thrown before a node is deleted.
  70. Parameters: partition of node to delete, key of node to delete, kind of node to delete
  71. */
  72. graph.EventNodeDelete: "db.node.delete",
  73. /*
  74. EventEdgeStore is thrown before an edge is stored (always overwriting existing values).
  75. Parameters: partition of stored edge, stored edge
  76. */
  77. graph.EventEdgeStore: "db.edge.store",
  78. /*
  79. EventEdgeDelete is thrown before an edge is deleted.
  80. Parameters: partition of deleted edge, deleted edge
  81. */
  82. graph.EventEdgeDelete: "db.edge.delete",
  83. }
  84. /*
  85. EventBridge is a rule for a graph manager to forward all graph events to ECAL.
  86. */
  87. type EventBridge struct {
  88. Processor engine.Processor
  89. Logger util.Logger
  90. }
  91. /*
  92. Name returns the name of the rule.
  93. */
  94. func (eb *EventBridge) Name() string {
  95. return "ecal.eventbridge"
  96. }
  97. /*
  98. Handles returns a list of events which are handled by this rule.
  99. */
  100. func (eb *EventBridge) Handles() []int {
  101. return []int{
  102. graph.EventNodeCreated,
  103. graph.EventNodeUpdated,
  104. graph.EventNodeDeleted,
  105. graph.EventEdgeCreated,
  106. graph.EventEdgeUpdated,
  107. graph.EventEdgeDeleted,
  108. graph.EventNodeStore,
  109. graph.EventNodeUpdate,
  110. graph.EventNodeDelete,
  111. graph.EventEdgeStore,
  112. graph.EventEdgeDelete,
  113. }
  114. }
  115. /*
  116. Handle handles an event.
  117. */
  118. func (eb *EventBridge) Handle(gm *graph.Manager, trans graph.Trans, event int, ed ...interface{}) error {
  119. var err error
  120. if name, ok := EventMapping[event]; ok {
  121. eventName := fmt.Sprintf("EliasDB: %v", name)
  122. eventKind := strings.Split(name, ".")
  123. // Construct an event which can be used to check if any rule will trigger.
  124. // This is to avoid the relative costly state construction below for events
  125. // which would not trigger any rules.
  126. triggerCheckEvent := engine.NewEvent(eventName, eventKind, nil)
  127. if !eb.Processor.IsTriggering(triggerCheckEvent) {
  128. return nil
  129. }
  130. // Build up state
  131. state := map[interface{}]interface{}{
  132. "part": fmt.Sprint(ed[0]),
  133. "trans": trans,
  134. }
  135. // Include the right arguments into the state
  136. switch event {
  137. case graph.EventNodeCreated, graph.EventNodeUpdate, graph.EventNodeDeleted, graph.EventNodeStore:
  138. state["node"] = scope.ConvertJSONToECALObject(ed[1].(data.Node).Data())
  139. case graph.EventNodeUpdated:
  140. state["node"] = scope.ConvertJSONToECALObject(ed[1].(data.Node).Data())
  141. state["old_node"] = scope.ConvertJSONToECALObject(ed[2].(data.Node).Data())
  142. case graph.EventEdgeCreated, graph.EventEdgeDeleted, graph.EventEdgeStore:
  143. state["edge"] = scope.ConvertJSONToECALObject(ed[1].(data.Edge).Data())
  144. case graph.EventEdgeUpdated:
  145. state["edge"] = scope.ConvertJSONToECALObject(ed[1].(data.Edge).Data())
  146. state["old_edge"] = scope.ConvertJSONToECALObject(ed[2].(data.Edge).Data())
  147. case graph.EventNodeDelete, graph.EventEdgeDelete:
  148. state["key"] = fmt.Sprint(ed[1])
  149. state["kind"] = fmt.Sprint(ed[2])
  150. }
  151. // Try to inject the event
  152. event := engine.NewEvent(fmt.Sprintf("EliasDB: %v", name), strings.Split(name, "."), state)
  153. var m engine.Monitor
  154. m, err = eb.Processor.AddEventAndWait(event, nil)
  155. if err == nil {
  156. // If there was no direct error adding the event then check if an error was
  157. // raised in a sink
  158. if errs := m.(*engine.RootMonitor).AllErrors(); len(errs) > 0 {
  159. var errList []error
  160. for _, e := range errs {
  161. addError := true
  162. for _, se := range e.ErrorMap {
  163. // Check if the sink returned a special graph.ErrEventHandled error
  164. if re, ok := se.(*util.RuntimeErrorWithDetail); ok && re.Detail == graph.ErrEventHandled.Error() {
  165. addError = false
  166. }
  167. }
  168. if addError {
  169. errList = append(errList, e)
  170. }
  171. }
  172. if len(errList) > 0 {
  173. err = &errorutil.CompositeError{Errors: errList}
  174. } else {
  175. err = graph.ErrEventHandled
  176. }
  177. }
  178. }
  179. if err != nil {
  180. eb.Logger.LogDebug(fmt.Sprintf("EliasDB event %v was handled by ECAL and returned: %v", name, err))
  181. }
  182. }
  183. return err
  184. }