rebalance_test.go 9.7 KB


  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package cluster
  11. import (
  12. "bytes"
  13. "encoding/gob"
  14. "math"
  15. "testing"
  16. "time"
  17. "devt.de/krotik/eliasdb/cluster/manager"
  18. )
  19. func TestRebalancing(t *testing.T) {
  20. // Set a low distribution range
  21. defaultDistributionRange = 10
  22. defer func() { defaultDistributionRange = math.MaxUint64 }()
  23. // Setup a cluster
  24. manager.FreqHousekeeping = 5
  25. defer func() { manager.FreqHousekeeping = 1000 }()
  26. // Log transfer worker runs
  27. logTransferWorker = true
  28. defer func() { logTransferWorker = false }()
  29. // Log rebalance worker runs
  30. logRebalanceWorker = true
  31. defer func() { logRebalanceWorker = false }()
  32. // Create a cluster with 3 members and a replication factor of 2
  33. cluster3, ms := createCluster(4, 2)
  34. // Debug output
  35. //manager.LogDebug = manager.LogInfo
  36. //log.SetOutput(os.Stderr)
  37. //defer func() { log.SetOutput(ioutil.Discard) }()
  38. // At first join up only 3 members
  39. for i, dd := range cluster3[:3] {
  40. dd.Start()
  41. defer dd.Close()
  42. if i > 0 {
  43. err := dd.MemberManager.JoinCluster(cluster3[0].MemberManager.Name(),
  44. cluster3[0].MemberManager.NetAddr())
  45. if err != nil {
  46. t.Error(err)
  47. return
  48. }
  49. }
  50. }
  51. sm := cluster3[1].StorageManager("test", true)
  52. // Insert two strings into the store
  53. if loc, err := sm.Insert("test1"); loc != 1 || err != nil {
  54. t.Error("Unexpected result:", loc, err)
  55. return
  56. }
  57. sm.Flush()
  58. time.Sleep(10 * time.Millisecond)
  59. if loc, err := sm.Insert("test2"); loc != 3 || err != nil {
  60. t.Error("Unexpected result:", loc, err)
  61. return
  62. }
  63. sm.Flush()
  64. time.Sleep(10 * time.Millisecond)
  65. if loc, err := sm.Insert("test3"); loc != 6 || err != nil {
  66. t.Error("Unexpected result:", loc, err)
  67. return
  68. }
  69. sm.Flush()
  70. // Ensure the transfer worker is running on all members
  71. for _, m := range ms {
  72. m.transferWorker()
  73. for m.transferRunning {
  74. time.Sleep(time.Millisecond)
  75. }
  76. }
  77. // Check that we have a certain storage layout in the cluster
  78. if res := clusterLayout(ms, "test"); res != `
  79. TestClusterMember-0 MemberStorageManager mgs1/ls_test
  80. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  81. cloc: 1 (v:1) - lloc: 1 - "\b\f\x00\x05test1"
  82. cloc: 6 (v:1) - lloc: 2 - "\b\f\x00\x05test3"
  83. TestClusterMember-1 MemberStorageManager mgs2/ls_test
  84. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  85. cloc: 1 (v:1) - lloc: 1 - "\b\f\x00\x05test1"
  86. cloc: 3 (v:1) - lloc: 2 - "\b\f\x00\x05test2"
  87. TestClusterMember-2 MemberStorageManager mgs3/ls_test
  88. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  89. cloc: 3 (v:1) - lloc: 1 - "\b\f\x00\x05test2"
  90. cloc: 6 (v:1) - lloc: 2 - "\b\f\x00\x05test3"
  91. `[1:] {
  92. t.Error("Unexpected cluster storage layout: ", res)
  93. return
  94. }
  95. // Join the 4th member
  96. cluster3[3].Start()
  97. defer cluster3[3].Close()
  98. err := cluster3[3].MemberManager.JoinCluster(cluster3[0].MemberManager.Name(),
  99. cluster3[0].MemberManager.NetAddr())
  100. if err != nil {
  101. t.Error(err)
  102. return
  103. }
  104. // Switch off rebalance for now
  105. runRebalanceWorker = false
  106. for ms[3].rebalanceRunning {
  107. time.Sleep(time.Millisecond)
  108. }
  109. if loc, err := sm.Insert("test4"); loc != 7 || err != nil {
  110. t.Error("Unexpected result:", loc, err)
  111. return
  112. }
  113. // Ensure the transfer worker is running on all members
  114. for _, m := range ms {
  115. m.transferWorker()
  116. for m.transferRunning {
  117. time.Sleep(time.Millisecond)
  118. }
  119. }
  120. // Check that we have a certain storage layout in the cluster
  121. if res := clusterLayout(ms, "test"); res != `
  122. TestClusterMember-0 MemberStorageManager mgs1/ls_test
  123. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  124. cloc: 1 (v:1) - lloc: 1 - "\b\f\x00\x05test1"
  125. cloc: 6 (v:1) - lloc: 2 - "\b\f\x00\x05test3"
  126. cloc: 7 (v:1) - lloc: 3 - "\b\f\x00\x05test4"
  127. TestClusterMember-1 MemberStorageManager mgs2/ls_test
  128. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  129. cloc: 1 (v:1) - lloc: 1 - "\b\f\x00\x05test1"
  130. cloc: 3 (v:1) - lloc: 2 - "\b\f\x00\x05test2"
  131. TestClusterMember-2 MemberStorageManager mgs3/ls_test
  132. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  133. cloc: 3 (v:1) - lloc: 1 - "\b\f\x00\x05test2"
  134. cloc: 6 (v:1) - lloc: 2 - "\b\f\x00\x05test3"
  135. TestClusterMember-3 MemberStorageManager mgs4/ls_test
  136. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  137. cloc: 7 (v:1) - lloc: 1 - "\b\f\x00\x05test4"
  138. `[1:] {
  139. t.Error("Unexpected cluster storage layout: ", res)
  140. return
  141. }
  142. // Check distribution table
  143. if dt, _ := cluster3[2].DistributionTable(); dt.String() != `
  144. Location ranges:
  145. TestClusterMember-0: 0 -> 1
  146. TestClusterMember-1: 2 -> 3
  147. TestClusterMember-2: 4 -> 5
  148. TestClusterMember-3: 6 -> 10
  149. Replicas (factor=2) :
  150. TestClusterMember-0: [TestClusterMember-1]
  151. TestClusterMember-1: [TestClusterMember-2]
  152. TestClusterMember-2: [TestClusterMember-3]
  153. TestClusterMember-3: [TestClusterMember-0]
  154. `[1:] {
  155. t.Error("Unexpected distribution table: ", dt.String())
  156. return
  157. }
  158. // Switch on rebalancing
  159. runRebalanceWorker = true
  160. // Ensure the rebalance worker is running on all members
  161. for _, m := range ms {
  162. m.rebalanceWorker(true)
  163. for m.rebalanceRunning {
  164. time.Sleep(time.Millisecond)
  165. }
  166. }
  167. // Check that the rebalancing was successful
  168. if res := clusterLayout(ms, "test"); res != `
  169. TestClusterMember-0 MemberStorageManager mgs1/ls_test
  170. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  171. cloc: 1 (v:1) - lloc: 1 - "\b\f\x00\x05test1"
  172. cloc: 6 (v:1) - lloc: 2 - "\b\f\x00\x05test3"
  173. cloc: 7 (v:1) - lloc: 3 - "\b\f\x00\x05test4"
  174. TestClusterMember-1 MemberStorageManager mgs2/ls_test
  175. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  176. cloc: 1 (v:1) - lloc: 1 - "\b\f\x00\x05test1"
  177. cloc: 3 (v:1) - lloc: 2 - "\b\f\x00\x05test2"
  178. TestClusterMember-2 MemberStorageManager mgs3/ls_test
  179. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  180. cloc: 3 (v:1) - lloc: 1 - "\b\f\x00\x05test2"
  181. TestClusterMember-3 MemberStorageManager mgs4/ls_test
  182. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  183. cloc: 7 (v:1) - lloc: 1 - "\b\f\x00\x05test4"
  184. cloc: 6 (v:1) - lloc: 2 - "\b\f\x00\x05test3"
  185. `[1:] {
  186. t.Error("Unexpected cluster storage layout: ", res)
  187. return
  188. }
  189. // Update a piece of data on a replica. This simulates the situation when a
  190. // primary storage member was down and an update was only received on a replica.
  191. var bb bytes.Buffer
  192. if err := gob.NewEncoder(&bb).Encode("test3_updated"); err != nil {
  193. t.Error(err)
  194. return
  195. }
  196. localsm := ms[0].gs.StorageManager(LocalStoragePrefix+"test", false)
  197. localsm.Update(2, bb.Bytes())
  198. ms[0].at.SetTransClusterLoc("test", 6, 2, 2)
  199. if res := clusterLayout(ms, "test"); res != `
  200. TestClusterMember-0 MemberStorageManager mgs1/ls_test
  201. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  202. cloc: 1 (v:1) - lloc: 1 - "\b\f\x00\x05test1"
  203. cloc: 6 (v:2) - lloc: 2 - "\x10\f\x00\rtest3_updated"
  204. cloc: 7 (v:1) - lloc: 3 - "\b\f\x00\x05test4"
  205. TestClusterMember-1 MemberStorageManager mgs2/ls_test
  206. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  207. cloc: 1 (v:1) - lloc: 1 - "\b\f\x00\x05test1"
  208. cloc: 3 (v:1) - lloc: 2 - "\b\f\x00\x05test2"
  209. TestClusterMember-2 MemberStorageManager mgs3/ls_test
  210. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  211. cloc: 3 (v:1) - lloc: 1 - "\b\f\x00\x05test2"
  212. TestClusterMember-3 MemberStorageManager mgs4/ls_test
  213. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  214. cloc: 7 (v:1) - lloc: 1 - "\b\f\x00\x05test4"
  215. cloc: 6 (v:1) - lloc: 2 - "\b\f\x00\x05test3"
  216. `[1:] {
  217. t.Error("Unexpected cluster storage layout: ", res)
  218. return
  219. }
  220. manager.MemberErrors = make(map[string]error)
  221. defer func() { manager.MemberErrors = nil }()
  222. // Simulate an error on member 3
  223. manager.MemberErrors[cluster3[3].MemberManager.Name()] = &testNetError{}
  224. // Ensure the transfer worker is running on all members
  225. for _, m := range ms {
  226. m.rebalanceWorker(true)
  227. }
  228. if res := clusterLayout(ms, "test"); res != `
  229. TestClusterMember-0 MemberStorageManager mgs1/ls_test
  230. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  231. cloc: 1 (v:1) - lloc: 1 - "\b\f\x00\x05test1"
  232. cloc: 6 (v:2) - lloc: 2 - "\x10\f\x00\rtest3_updated"
  233. cloc: 7 (v:1) - lloc: 3 - "\b\f\x00\x05test4"
  234. TestClusterMember-1 MemberStorageManager mgs2/ls_test
  235. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  236. cloc: 1 (v:1) - lloc: 1 - "\b\f\x00\x05test1"
  237. cloc: 3 (v:1) - lloc: 2 - "\b\f\x00\x05test2"
  238. TestClusterMember-2 MemberStorageManager mgs3/ls_test
  239. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  240. cloc: 3 (v:1) - lloc: 1 - "\b\f\x00\x05test2"
  241. TestClusterMember-3 MemberStorageManager mgs4/ls_test
  242. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  243. cloc: 7 (v:1) - lloc: 1 - "\b\f\x00\x05test4"
  244. cloc: 6 (v:1) - lloc: 2 - "\b\f\x00\x05test3"
  245. `[1:] {
  246. t.Error("Unexpected cluster storage layout: ", res)
  247. return
  248. }
  249. // Remove the error
  250. delete(manager.MemberErrors, cluster3[3].MemberManager.Name())
  251. // Ensure the transfer worker is running on all members
  252. for _, m := range ms {
  253. m.rebalanceWorker(true)
  254. }
  255. // Check that update has happened
  256. if res := clusterLayout(ms, "test"); res != `
  257. TestClusterMember-0 MemberStorageManager mgs1/ls_test
  258. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  259. cloc: 1 (v:1) - lloc: 1 - "\b\f\x00\x05test1"
  260. cloc: 6 (v:2) - lloc: 2 - "\x10\f\x00\rtest3_updated"
  261. cloc: 7 (v:1) - lloc: 3 - "\b\f\x00\x05test4"
  262. TestClusterMember-1 MemberStorageManager mgs2/ls_test
  263. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  264. cloc: 1 (v:1) - lloc: 1 - "\b\f\x00\x05test1"
  265. cloc: 3 (v:1) - lloc: 2 - "\b\f\x00\x05test2"
  266. TestClusterMember-2 MemberStorageManager mgs3/ls_test
  267. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  268. cloc: 3 (v:1) - lloc: 1 - "\b\f\x00\x05test2"
  269. TestClusterMember-3 MemberStorageManager mgs4/ls_test
  270. Roots: 0=0 1=0 2=0 3=0 4=0 5=0 6=0 7=0 8=0 9=0
  271. cloc: 7 (v:1) - lloc: 1 - "\b\f\x00\x05test4"
  272. cloc: 6 (v:2) - lloc: 2 - "\x10\f\x00\rtest3_updated"
  273. `[1:] {
  274. t.Error("Unexpected cluster storage layout: ", res)
  275. return
  276. }
  277. }