eventpump.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. /*
  2. * Public Domain Software
  3. *
  4. * I (Matthias Ladkau) am the author of the source code in this file.
  5. * I have placed the source code in this file in the public domain.
  6. *
  7. * For further information see: http://creativecommons.org/publicdomain/zero/1.0/
  8. */
  9. /*
  10. Package flowutil contains utilities to manage control flow.
  11. */
  12. package flowutil
  13. import "sync"
  14. /*
  15. EventPump implements the observer pattern. Observers can subscribe to receive
  16. notifications on certain events. Observed objects can send notifications.
  17. */
  18. type EventPump struct {
  19. eventsObservers map[string]map[interface{}][]EventCallback
  20. eventsObserversLock *sync.Mutex
  21. }
  22. /*
  23. EventCallback is the callback function which is called when an event was observed.
  24. */
  25. type EventCallback func(event string, eventSource interface{})
  26. /*
  27. NewEventPump creates a new event pump.
  28. */
  29. func NewEventPump() *EventPump {
  30. return &EventPump{make(map[string]map[interface{}][]EventCallback), &sync.Mutex{}}
  31. }
  32. /*
  33. AddObserver adds a new observer to the event pump. An observer can subscribe to
  34. a given event from a given event source. If the event is an empty string then
  35. the observer subscribes to all events from the event source. If the
  36. eventSource is nil then the observer subscribes to all event sources.
  37. */
  38. func (ep *EventPump) AddObserver(event string, eventSource interface{}, callback EventCallback) {
  39. // Ignore requests with non-existent callbacks
  40. if callback == nil {
  41. return
  42. }
  43. ep.eventsObserversLock.Lock()
  44. defer ep.eventsObserversLock.Unlock()
  45. sources, ok := ep.eventsObservers[event]
  46. if !ok {
  47. sources = make(map[interface{}][]EventCallback)
  48. ep.eventsObservers[event] = sources
  49. }
  50. callbacks, ok := sources[eventSource]
  51. if !ok {
  52. callbacks = []EventCallback{callback}
  53. sources[eventSource] = callbacks
  54. } else {
  55. sources[eventSource] = append(callbacks, callback)
  56. }
  57. }
  58. /*
  59. PostEvent posts an event to this event pump from a given event source.
  60. */
  61. func (ep *EventPump) PostEvent(event string, eventSource interface{}) {
  62. if event == "" || eventSource == nil {
  63. panic("Posting an event requires the event and its source")
  64. }
  65. ep.eventsObserversLock.Lock()
  66. defer ep.eventsObserversLock.Unlock()
  67. postEvent := func(event string, eventSource interface{}) {
  68. if sources, ok := ep.eventsObservers[event]; ok {
  69. for source, callbacks := range sources {
  70. if source == eventSource || source == nil {
  71. for _, callback := range callbacks {
  72. ep.eventsObserversLock.Unlock()
  73. callback(event, eventSource)
  74. ep.eventsObserversLock.Lock()
  75. }
  76. }
  77. }
  78. }
  79. }
  80. postEvent(event, eventSource)
  81. postEvent("", eventSource)
  82. }
  83. /*
  84. RemoveObservers removes observers from the event pump. If the event is an
  85. empty string then the observer is removed from all events. If the
  86. eventSource is nil then all observers of the event are dropped.
  87. */
  88. func (ep *EventPump) RemoveObservers(event string, eventSource interface{}) {
  89. ep.eventsObserversLock.Lock()
  90. defer ep.eventsObserversLock.Unlock()
  91. // Clear everything
  92. if event == "" && eventSource == nil {
  93. ep.eventsObservers = make(map[string]map[interface{}][]EventCallback)
  94. } else if eventSource == nil {
  95. delete(ep.eventsObservers, event)
  96. } else if event == "" {
  97. for _, sources := range ep.eventsObservers {
  98. delete(sources, eventSource)
  99. }
  100. } else {
  101. if sources, ok := ep.eventsObservers[event]; ok {
  102. delete(sources, eventSource)
  103. }
  104. }
  105. }