rebalance.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package cluster
  11. import (
  12. "fmt"
  13. "strconv"
  14. "strings"
  15. "devt.de/krotik/eliasdb/cluster/manager"
  16. "devt.de/krotik/eliasdb/hash"
  17. )
  18. /*
  19. MaxSizeRebalanceLists is the maximum size for rebalancing lists within one rebalance request.
  20. */
  21. const MaxSizeRebalanceLists = 100
  22. /*
  23. runRebalanceWorker flag to switch off automatic rebalancing
  24. */
  25. var runRebalanceWorker = true
  26. /*
  27. logRebalanceWorker flag to write a log message every time automatic rebalancing is running
  28. */
  29. var logRebalanceWorker = false
  30. /*
  31. rebalanceHousekeepingInterval defines how often housekeeping needs to run before
  32. a rebalance task is run.
  33. */
  34. var rebalanceHousekeepingInterval = 180
  35. /*
  36. rebalanceWorker is the background thread which handles automatic rebalancing
  37. when the configuration of the cluster changes or to autocorrect certain errors.
  38. */
  39. func (ms *memberStorage) rebalanceWorker(forceRun bool) {
  40. // Make sure only one transfer task is running at a time and that
  41. // subsequent requests are not queued up
  42. ms.rebalanceLock.Lock()
  43. if !runRebalanceWorker || ms.rebalanceRunning {
  44. ms.rebalanceLock.Unlock()
  45. return
  46. }
  47. // Make sure rebalancing only runs every rebalanceHousekeepingInterval
  48. if !forceRun && ms.rebalanceCounter > 0 {
  49. ms.rebalanceCounter--
  50. ms.rebalanceLock.Unlock()
  51. return
  52. }
  53. ms.rebalanceCounter = rebalanceHousekeepingInterval
  54. ms.rebalanceRunning = true
  55. ms.rebalanceLock.Unlock()
  56. defer func() {
  57. ms.rebalanceLock.Lock()
  58. ms.rebalanceRunning = false
  59. ms.rebalanceLock.Unlock()
  60. }()
  61. if logRebalanceWorker {
  62. manager.LogDebug(ms.ds.Name(), "(RB): Running rebalance worker task")
  63. }
  64. distTable, err := ms.ds.DistributionTable()
  65. if err != nil {
  66. manager.LogDebug(ms.ds.Name(), "(RB): Cannot rebalance not operational cluster: ",
  67. err.Error())
  68. return
  69. }
  70. // Go through all maintained stuff and collect storage name, location and version
  71. it := hash.NewHTreeIterator(ms.at.translation)
  72. for it.HasNext() {
  73. chunks := MaxSizeRebalanceLists
  74. maintLocs := make([]uint64, 0, MaxSizeRebalanceLists)
  75. maintVers := make([]uint64, 0, MaxSizeRebalanceLists)
  76. maintMgmts := make([]string, 0, MaxSizeRebalanceLists)
  77. for it.HasNext() || chunks <= 0 {
  78. key, val := it.Next()
  79. if tr, ok := val.(*translationRec); ok {
  80. smname := strings.Split(string(key[len(transPrefix):]), "#")[0]
  81. cloc, _ := strconv.ParseUint(string(key[len(fmt.Sprint(transPrefix, smname, "#")):]), 10, 64)
  82. maintMgmts = append(maintMgmts, smname)
  83. maintLocs = append(maintLocs, cloc)
  84. maintVers = append(maintVers, tr.Ver)
  85. }
  86. }
  87. // Send info about maintained stuff to all relevant members
  88. receiverMap := make(map[string]string)
  89. for _, cloc := range maintLocs {
  90. primary, replicas := distTable.LocationHome(cloc)
  91. members := make([]string, 0, len(replicas)+1)
  92. members = append(members, primary)
  93. members = append(members, replicas...)
  94. for _, member := range members {
  95. _, ok := receiverMap[member]
  96. if member == ms.ds.MemberManager.Name() || ok {
  97. continue
  98. }
  99. receiverMap[member] = ""
  100. request := &DataRequest{RTRebalance, map[DataRequestArg]interface{}{
  101. RPStoreName: maintMgmts,
  102. RPLoc: maintLocs,
  103. RPVer: maintVers,
  104. RPSrc: ms.ds.MemberManager.Name(),
  105. }, nil, false}
  106. _, err := ms.ds.sendDataRequest(member, request)
  107. if err != nil {
  108. manager.LogDebug(ms.ds.Name(), "(RB): ",
  109. fmt.Sprintf("Member %v Error: %v", member, err))
  110. }
  111. }
  112. }
  113. }
  114. }