eventpump.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. /*
  2. * ECAL
  3. *
  4. * Copyright 2020 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. /*
  11. Package pubsub contains a pub/sub event handling implementation.
  12. */
  13. package pubsub
  14. import "sync"
  15. /*
  16. EventPump implements the observer pattern. Observers can subscribe to receive
  17. notifications on certain events. Observed objects can send notifications.
  18. */
  19. type EventPump struct {
  20. eventsObservers map[string]map[interface{}][]EventCallback
  21. eventsObserversLock *sync.Mutex
  22. }
  23. /*
  24. EventCallback is the callback function which is called when an event was observed.
  25. */
  26. type EventCallback func(event string, eventSource interface{})
  27. /*
  28. NewEventPump creates a new event pump.
  29. */
  30. func NewEventPump() *EventPump {
  31. return &EventPump{make(map[string]map[interface{}][]EventCallback), &sync.Mutex{}}
  32. }
  33. /*
  34. AddObserver adds a new observer to the event pump. An observer can subscribe to
  35. a given event from a given event source. If the event is an empty string then
  36. the observer subscribes to all events from the event source. If the
  37. eventSource is nil then the observer subscribes to all event sources.
  38. */
  39. func (ep *EventPump) AddObserver(event string, eventSource interface{}, callback EventCallback) {
  40. // Ignore requests with non-existent callbacks
  41. if callback == nil {
  42. return
  43. }
  44. ep.eventsObserversLock.Lock()
  45. defer ep.eventsObserversLock.Unlock()
  46. sources, ok := ep.eventsObservers[event]
  47. if !ok {
  48. sources = make(map[interface{}][]EventCallback)
  49. ep.eventsObservers[event] = sources
  50. }
  51. callbacks, ok := sources[eventSource]
  52. if !ok {
  53. callbacks = []EventCallback{callback}
  54. sources[eventSource] = callbacks
  55. } else {
  56. sources[eventSource] = append(callbacks, callback)
  57. }
  58. }
  59. /*
  60. PostEvent posts an event to this event pump from a given event source.
  61. */
  62. func (ep *EventPump) PostEvent2(event string, eventSource interface{}) {
  63. if event == "" || eventSource == nil {
  64. panic("Posting an event requires the event and its source")
  65. }
  66. ep.eventsObserversLock.Lock()
  67. defer ep.eventsObserversLock.Unlock()
  68. postEvent := func(event string, eventSource interface{}) {
  69. if sources, ok := ep.eventsObservers[event]; ok {
  70. for source, callbacks := range sources {
  71. if source == eventSource || source == nil {
  72. for _, callback := range callbacks {
  73. ep.eventsObserversLock.Unlock()
  74. callback(event, eventSource)
  75. ep.eventsObserversLock.Lock()
  76. }
  77. }
  78. }
  79. }
  80. }
  81. postEvent(event, eventSource)
  82. postEvent("", eventSource)
  83. }
  84. /*
  85. PostEvent posts an event to this event pump from a given event source.
  86. */
  87. func (ep *EventPump) PostEvent(event string, eventSource interface{}) {
  88. if event == "" || eventSource == nil {
  89. panic("Posting an event requires the event and its source")
  90. }
  91. postEvent := func(event string, eventSource interface{}) {
  92. ep.eventsObserversLock.Lock()
  93. sources, ok := ep.eventsObservers[event]
  94. if ok {
  95. // Create a local copy of sources before executing the callbacks
  96. origsources := sources
  97. sources = make(map[interface{}][]EventCallback)
  98. for source, callbacks := range origsources {
  99. sources[source] = callbacks
  100. }
  101. }
  102. ep.eventsObserversLock.Unlock()
  103. if ok {
  104. for source, callbacks := range sources {
  105. if source == eventSource || source == nil {
  106. for _, callback := range callbacks {
  107. callback(event, eventSource)
  108. }
  109. }
  110. }
  111. }
  112. }
  113. postEvent(event, eventSource)
  114. postEvent("", eventSource)
  115. }
  116. /*
  117. RemoveObservers removes observers from the event pump. If the event is an
  118. empty string then the observer is removed from all events. If the
  119. eventSource is nil then all observers of the event are dropped.
  120. */
  121. func (ep *EventPump) RemoveObservers(event string, eventSource interface{}) {
  122. ep.eventsObserversLock.Lock()
  123. defer ep.eventsObserversLock.Unlock()
  124. // Clear everything
  125. if event == "" && eventSource == nil {
  126. ep.eventsObservers = make(map[string]map[interface{}][]EventCallback)
  127. } else if eventSource == nil {
  128. delete(ep.eventsObservers, event)
  129. } else if event == "" {
  130. for _, sources := range ep.eventsObservers {
  131. delete(sources, eventSource)
  132. }
  133. } else {
  134. if sources, ok := ep.eventsObservers[event]; ok {
  135. delete(sources, eventSource)
  136. }
  137. }
  138. }