housekeeping.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package manager
  11. import (
  12. "fmt"
  13. "strconv"
  14. "strings"
  15. )
  16. /*
  17. runHouseKeeping flag to switch off automatic start of housekeeping
  18. */
  19. var runHousekeeping = true
  20. /*
  21. FreqHousekeeping is the frequency of running housekeeping tasks (ms)
  22. */
  23. var FreqHousekeeping float64 = 1000
  24. /*
  25. logHousekeeping flag to write a log message every time the housekeeping task is running
  26. */
  27. var logHousekeeping = false
  28. /*
  29. HousekeepingWorker is the background thread which handles various tasks to provide
  30. "eventual" consistency for the cluster.
  31. */
  32. func (mm *MemberManager) HousekeepingWorker() {
  33. mm.housekeepingLock.Lock()
  34. defer mm.housekeepingLock.Unlock()
  35. if mm.StopHousekeeping {
  36. return
  37. } else if logHousekeeping {
  38. LogDebug(mm.name, "(HK): Running housekeeping task")
  39. }
  40. // Special function which ensures that the given member is removed from the
  41. // failed list.
  42. removeFromFailedState := func(peer string) {
  43. mm.Client.maplock.Lock()
  44. defer mm.Client.maplock.Unlock()
  45. if _, ok := mm.Client.failed[peer]; ok {
  46. // Remove a member from the failed state list and send an update
  47. LogDebug(mm.name, "(HK): ",
  48. fmt.Sprintf("Removing %v from list of failed members", peer))
  49. delete(mm.Client.failed, peer)
  50. }
  51. }
  52. // Housekeeping will try to talk to all peers
  53. resolveConflict := false // Flag to resolve a state conflict at the end of a cycle.
  54. for peer := range mm.Client.peers {
  55. LogDebug(mm.name, "(HK): ",
  56. fmt.Sprintf("Housekeeping talking to: %v", peer))
  57. // Send a ping to the member
  58. res, err := mm.Client.SendPing(peer, "")
  59. if err != nil {
  60. LogDebug(mm.name, "(HK): ",
  61. fmt.Sprintf("Error pinging %v - %v", peer, err))
  62. continue
  63. } else if len(res) == 1 {
  64. LogDebug(mm.name, "(HK): ",
  65. fmt.Sprintf("Member %v says this instance is not part of the cluster", peer))
  66. mm.Client.maplock.Lock()
  67. mm.Client.failed[peer] = ErrNotMember.Error()
  68. mm.Client.maplock.Unlock()
  69. continue
  70. }
  71. // Check timestamp on the result and see where this member is:
  72. peerTsMember := res[1]
  73. peerTsTS, _ := strconv.ParseInt(res[2], 10, 64)
  74. peerTsOldMember := res[3]
  75. peerTsOldTS, _ := strconv.ParseInt(res[4], 10, 64)
  76. simmTS, _ := mm.stateInfo.Get(StateInfoTS)
  77. mmTS := simmTS.([]string)
  78. simmOldTS, _ := mm.stateInfo.Get(StateInfoTSOLD)
  79. mmOldTS := simmOldTS.([]string)
  80. mmTsMember := mmTS[0]
  81. mmTsTS, _ := strconv.ParseInt(mmTS[1], 10, 64)
  82. mmTsOldMember := mmOldTS[0]
  83. mmTsOldTS, _ := strconv.ParseInt(mmOldTS[1], 10, 64)
  84. LogDebug(mm.name, "(HK): ",
  85. fmt.Sprintf("TS Me : Curr:%v:%v - Old:%v:%v", mmTsMember, mmTsTS, mmTsOldMember, mmTsOldTS))
  86. LogDebug(mm.name, "(HK): ",
  87. fmt.Sprintf("TS Peer: Curr:%v:%v - Old:%v:%v", peerTsMember, peerTsTS, peerTsOldMember, peerTsOldTS))
  88. if peerTsTS > mmTsTS || peerTsMember != mmTsMember {
  89. // Peer has a newer version
  90. if peerTsMember == mmTsMember && peerTsOldMember == mmTsMember && peerTsOldTS == mmTsTS {
  91. // Peer has the next state info version - update the local state info
  92. sf, err := mm.Client.SendStateInfoRequest(peer)
  93. if err == nil {
  94. LogDebug(mm.name, ": Updating state info of member")
  95. mm.applyStateInfo(sf)
  96. }
  97. } else {
  98. // Peer has a different version - potential conflict send a
  99. // state update at the end of the cycle
  100. if sf, err := mm.Client.SendStateInfoRequest(peer); err == nil {
  101. LogDebug(mm.name, ": Merging members in state infos")
  102. // Add any newly known cluster members
  103. mm.applyStateInfoPeers(sf, false)
  104. resolveConflict = true
  105. }
  106. }
  107. // Remove the member from the failed state list if it is on there
  108. removeFromFailedState(peer)
  109. } else if peerTsTS == mmTsTS && peerTsMember == mmTsMember {
  110. // Peer is up-to-date - check if it is in a failed state list
  111. removeFromFailedState(peer)
  112. }
  113. // We do nothing with members using an outdated cluster state
  114. // they should update eventually through their own housekeeping
  115. }
  116. // Check if there is a new failed members list
  117. sfFailed, _ := mm.stateInfo.Get(StateInfoFAILED)
  118. if len(sfFailed.([]string))/2 != len(mm.Client.failed) || resolveConflict {
  119. LogDebug(mm.name, "(HK): ",
  120. fmt.Sprintf("Updating other members with current failed members list: %v",
  121. strings.Join(mm.Client.FailedPeerErrors(), ", ")))
  122. if err := mm.UpdateClusterStateInfo(); err != nil {
  123. // Just update local state info if we could not update the peers
  124. LogDebug(mm.name, "(HK): ",
  125. fmt.Sprintf("Could not update cluster state: %v", err.Error()))
  126. mm.updateStateInfo(true)
  127. }
  128. }
  129. // Notify others that housekeeping has finished
  130. mm.notifyHouseKeeping()
  131. }