distributedstorage_test.go 14 KB


  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package cluster
  11. import (
  12. "bytes"
  13. "encoding/gob"
  14. "encoding/json"
  15. "errors"
  16. "fmt"
  17. "io/ioutil"
  18. "log"
  19. "math"
  20. "testing"
  21. "devt.de/krotik/eliasdb/cluster/manager"
  22. "devt.de/krotik/eliasdb/graph/graphstorage"
  23. "devt.de/krotik/eliasdb/storage"
  24. )
  25. func TestDistributionStorageInitialisationError(t *testing.T) {
  26. gs := graphstorage.NewMemoryGraphStorage("bla")
  27. si := manager.NewMemStateInfo()
  28. // Set an unreasonable replication factor
  29. si.Put(manager.StateInfoREPFAC, 500)
  30. conf := map[string]interface{}{
  31. manager.ConfigClusterSecret: "",
  32. }
  33. // Make flush error at first
  34. storage.MsmRetFlush = errors.New("testerror")
  35. ds, err := NewDistributedStorage(gs, conf, si)
  36. if err.Error() != "testerror" || ds != nil {
  37. t.Error("Unexpected result:", ds, err)
  38. return
  39. }
  40. storage.MsmRetFlush = nil
  41. // Create a new DistributionStorage
  42. ds, err = NewDistributedStorage(gs, conf, si)
  43. if err != nil || ds == nil {
  44. t.Error("Unexpected result:", ds, err)
  45. return
  46. }
  47. // Test simple operations
  48. if res := ds.LocalName(); res != "bla" {
  49. t.Error("Unexpected local name:", res)
  50. return
  51. }
  52. if ds.FlushAll() != nil {
  53. t.Error(err)
  54. return
  55. }
  56. if ds.IsOperational() {
  57. t.Error("Cluster should not be operational at this point")
  58. return
  59. }
  60. if len(ds.MainDB()) != 0 {
  61. t.Error("MainDB should return an empty map at this point")
  62. return
  63. }
  64. ds.RollbackMain()
  65. // This returns the distTableErr
  66. if err := ds.FlushMain(); err.Error() != "Not enough members (1) for given replication factor: 500" {
  67. t.Error("Unexpected result:", err)
  68. return
  69. }
  70. if res := ds.StorageManager("test", true); res != nil {
  71. t.Error("StorageManager should return nil with missing distribution table")
  72. return
  73. }
  74. if len(ds.MainDB()) != 0 {
  75. t.Error("MainDB should return an empty map at this point")
  76. return
  77. }
  78. // This returns the mainDBErr
  79. if err := ds.FlushMain(); err.Error() != "Not enough members (1) for given replication factor: 500" {
  80. t.Error("Unexpected result:", err)
  81. return
  82. }
  83. if res := ds.StorageManager("test", true); res != nil {
  84. t.Error("StorageManager should return nil when main db is not available")
  85. return
  86. }
  87. }
  88. func TestMainDBDistribution(t *testing.T) {
  89. // Setup a cluster
  90. // Housekeeping frequency is high so we have it interfering - try to
  91. // produce dead locks, etc ...
  92. manager.FreqHousekeeping = 5
  93. defer func() { manager.FreqHousekeeping = 1000 }()
  94. cluster3, _ := createCluster(3, 2)
  95. // Debug output
  96. // manager.LogDebug = manager.LogInfo
  97. // log.SetOutput(os.Stderr)
  98. // defer func() { log.SetOutput(ioutil.Discard) }()
  99. for i, dd := range cluster3 {
  100. dd.Start()
  101. defer dd.Close()
  102. if i > 0 {
  103. err := dd.MemberManager.JoinCluster(cluster3[0].MemberManager.Name(), cluster3[0].MemberManager.NetAddr())
  104. if err != nil {
  105. t.Error(err)
  106. return
  107. }
  108. }
  109. }
  110. // Insert stuff
  111. cluster3[0].MainDB()["test1"] = "123"
  112. cluster3[0].FlushMain()
  113. cluster3[1].MainDB()["test2"] = "234"
  114. cluster3[1].FlushMain()
  115. cluster3[2].MainDB()["test3"] = "345"
  116. cluster3[2].FlushMain()
  117. mdb := cluster3[1].MainDB()
  118. if mdb["test1"] != "123" || mdb["test2"] != "234" || mdb["test3"] != "345" || len(mdb) != 3 {
  119. t.Error("Unexpected main db:", mdb)
  120. return
  121. }
  122. }
  123. func TestSimpleDataDistribution(t *testing.T) {
  124. // Set a low distribution range
  125. defaultDistributionRange = 10
  126. defer func() { defaultDistributionRange = math.MaxUint64 }()
  127. // Make sure the transfer worker is not running
  128. runTransferWorker = false
  129. defer func() { runTransferWorker = true }()
  130. // Setup a cluster
  131. manager.FreqHousekeeping = 5
  132. defer func() { manager.FreqHousekeeping = 1000 }()
  133. cluster3, ds := createCluster(3, 2)
  134. // Debug output
  135. // manager.LogDebug = manager.LogInfo
  136. // log.SetOutput(os.Stderr)
  137. // defer func() { log.SetOutput(ioutil.Discard) }()
  138. for i, dd := range cluster3 {
  139. dd.Start()
  140. defer dd.Close()
  141. if i > 0 {
  142. err := dd.MemberManager.JoinCluster(cluster3[0].MemberManager.Name(), cluster3[0].MemberManager.NetAddr())
  143. if err != nil {
  144. t.Error(err)
  145. return
  146. }
  147. }
  148. }
  149. // Make sure the replication factor is distributed in the cluster. Once set
  150. // in the configuration it becomes part of the cluster state.
  151. if rf := cluster3[2].MemberManager.StateInfo().Map()[manager.StateInfoREPFAC].(int); rf != 2 ||
  152. cluster3[1].distributionTable.repFac != 2 || cluster3[2].distributionTable.repFac != 2 {
  153. t.Error("Unexpected replication factor in the cluster:", rf)
  154. return
  155. }
  156. // Initially the storage manager should not exist
  157. if smtest := cluster3[1].StorageManager("test1", false); smtest != nil {
  158. t.Error("Not existing storage manager should be nil")
  159. return
  160. }
  161. // Simple insert requests - data is stored on all members
  162. sm := cluster3[0].StorageManager("test1", true)
  163. if res := sm.Name(); res != "DistributedStorageManager: test1" {
  164. t.Error("Unexpected name:", res)
  165. return
  166. }
  167. // Test handling of distribution table errors with existing storage manager objects
  168. cluster3[0].distributionTableError = errors.New("TestError")
  169. sm.SetRoot(1, 5)
  170. if sm.Root(1) != 0 {
  171. t.Error("All root values should be returned as 0 with distribution table errors")
  172. return
  173. }
  174. if _, err := sm.Insert("test"); err.Error() != "TestError" {
  175. t.Error("Unexpected response:", err)
  176. return
  177. }
  178. if err := sm.Update(5, "test"); err.Error() != "TestError" {
  179. t.Error("Unexpected response:", err)
  180. return
  181. }
  182. if err := sm.Free(5); err.Error() != "TestError" {
  183. t.Error("Unexpected response:", err)
  184. return
  185. }
  186. if _, err := sm.FetchCached(5); err.(*storage.ManagerError).Type != storage.ErrNotInCache {
  187. t.Error("Unexpected response:", err)
  188. return
  189. }
  190. if err := sm.Fetch(5, nil); err.Error() != "TestError" {
  191. t.Error("Unexpected response:", err)
  192. return
  193. }
  194. if err := sm.Close(); err.Error() != "TestError" {
  195. t.Error("Unexpected response:", err)
  196. return
  197. }
  198. if err := sm.Flush(); err.Error() != "TestError" {
  199. t.Error("Unexpected response:", err)
  200. return
  201. }
  202. // No effect on NOP operations
  203. if err := sm.Rollback(); err != nil {
  204. t.Error("Unexpected response:", err)
  205. return
  206. }
  207. cluster3[0].distributionTableError = nil
  208. if err := sm.Close(); err != nil {
  209. t.Error("Unexpected response:", err)
  210. return
  211. }
  212. // Even after the first creation call should the storage manage not exist
  213. if smtest := cluster3[1].StorageManager("test1", false); smtest != nil {
  214. t.Error("Not existing storage manager should be nil")
  215. return
  216. }
  217. if loc, err := sm.Insert("test1"); loc != 1 || err != nil {
  218. t.Error("Unexpected result:", loc, err)
  219. return
  220. }
  221. sm.Flush()
  222. // After the first insert the storage manage should exist
  223. if smtest := cluster3[1].StorageManager("test1", false); smtest == nil {
  224. t.Error("Existing storage manager should be not nil")
  225. return
  226. }
  227. if loc, err := sm.Insert("test2"); loc != 3 || err != nil {
  228. t.Error("Unexpected result:", loc, err)
  229. return
  230. }
  231. sm.Flush()
  232. if loc, err := sm.Insert("test3"); loc != 6 || err != nil {
  233. t.Error("Unexpected result:", loc, err)
  234. return
  235. }
  236. sm.Flush()
  237. if loc, err := sm.Insert("test4"); loc != 2 || err != nil {
  238. t.Error("Unexpected result:", loc, err)
  239. return
  240. }
  241. if loc, err := sm.Insert("test5"); loc != 4 || err != nil {
  242. t.Error("Unexpected result:", loc, err)
  243. return
  244. }
  245. // Lookup the data again
  246. var res string
  247. if err := sm.Fetch(1, &res); res != "test1" || err != nil {
  248. t.Error("Unexpected result:", res, err)
  249. return
  250. }
  251. if err := sm.Fetch(2, &res); res != "test4" || err != nil {
  252. t.Error("Unexpected result:", res, err)
  253. return
  254. }
  255. if err := sm.Fetch(3, &res); res != "test2" || err != nil {
  256. t.Error("Unexpected result:", res, err)
  257. return
  258. }
  259. if err := sm.Fetch(4, &res); res != "test5" || err != nil {
  260. t.Error("Unexpected result:", res, err)
  261. return
  262. }
  263. if err := sm.Fetch(6, &res); res != "test3" || err != nil {
  264. t.Error("Unexpected result:", res, err)
  265. return
  266. }
  267. // Update some data
  268. if err := sm.Update(1, "test11"); err != nil {
  269. t.Error("Unexpected result:", err)
  270. return
  271. }
  272. if err := sm.Update(6, "test44"); err != nil {
  273. t.Error("Unexpected result:", err)
  274. return
  275. }
  276. // Lookup the data again
  277. if err := sm.Fetch(1, &res); res != "test11" || err != nil {
  278. t.Error("Unexpected result:", res, err)
  279. return
  280. }
  281. if err := sm.Fetch(6, &res); res != "test44" || err != nil {
  282. t.Error("Unexpected result:", res, err)
  283. return
  284. }
  285. // Test sending outdated request
  286. request := &DataRequest{RTUpdate, map[DataRequestArg]interface{}{
  287. RPStoreName: "test1",
  288. RPLoc: uint64(6),
  289. RPVer: uint64(1),
  290. }, []byte("1111"), true}
  291. _, err := ds[1].ds.sendDataRequest(cluster3[2].MemberManager.Name(), request)
  292. if err != nil {
  293. t.Error("Unexpected response:", err)
  294. return
  295. }
  296. // Check that the outdated transfer request was ignored
  297. if err := sm.Fetch(6, &res); res != "test44" || err != nil {
  298. t.Error("Unexpected result:", res, err)
  299. return
  300. }
  301. // Try updating something which does not exist
  302. request = &DataRequest{RTUpdate, map[DataRequestArg]interface{}{
  303. RPStoreName: "test1",
  304. RPLoc: uint64(99),
  305. RPVer: uint64(1),
  306. }, []byte("1111"), true}
  307. _, err = ds[1].ds.sendDataRequest(cluster3[2].MemberManager.Name(), request)
  308. if err.Error() != "ClusterError: Member error (Cluster slot not found (TestClusterMember-2 - Location: 99))" {
  309. t.Error("Unexpected response:", err)
  310. return
  311. }
  312. lsm := ds[0].dataStorage("test1", false)
  313. // Destroy the gob encoding in cluster slot 0 (local slot 1)
  314. lsm.Update(1, "test11")
  315. if err := sm.Fetch(1, &res); err.Error() !=
  316. "gob: decoding into local type *[]uint8, received remote type string" {
  317. t.Error("Unexpected result:", res, err)
  318. return
  319. }
  320. // Delete some data
  321. if err := sm.Free(1); err != nil {
  322. t.Error("Unexpected result:", err)
  323. return
  324. }
  325. if err := sm.Free(6); err != nil {
  326. t.Error("Unexpected result:", err)
  327. return
  328. }
  329. // Lookup the data again
  330. res = ""
  331. if err := sm.Fetch(0, &res); res != "" || err.Error() != "Cluster slot not found (TestClusterMember-0 - Location: 0)" {
  332. t.Error("Unexpected result:", res, err)
  333. return
  334. }
  335. res = ""
  336. if err := sm.Fetch(4, &res); res != "test5" || err != nil {
  337. t.Error("Unexpected result:", res, err)
  338. return
  339. }
  340. res = ""
  341. if err := sm.Fetch(6, &res); res != "" || err.Error() != "ClusterError: Member error (Cluster slot not found (TestClusterMember-2 - Location: 6))" {
  342. t.Error("Unexpected result:", res, err)
  343. return
  344. }
  345. // Set and retrieve root values
  346. sm = cluster3[0].StorageManager("test1", true)
  347. sm.SetRoot(4, 67)
  348. sm = cluster3[1].StorageManager("test1", true)
  349. sm.SetRoot(5, 88)
  350. sm = cluster3[2].StorageManager("test1", true)
  351. sm.SetRoot(4, 22)
  352. sm = cluster3[0].StorageManager("test1", true)
  353. if root := sm.Root(4); root != 22 {
  354. t.Error("Unexpected result:", root)
  355. return
  356. }
  357. sm = cluster3[2].StorageManager("test1", true)
  358. if root := sm.Root(5); root != 88 {
  359. t.Error("Unexpected result:", root)
  360. return
  361. }
  362. // Test certain errors
  363. var RPMyRequest RequestType = "RPMyRequest"
  364. request = &DataRequest{RPMyRequest, map[DataRequestArg]interface{}{}, nil, false}
  365. _, err = ds[1].ds.sendDataRequest(cluster3[0].MemberManager.Name(), request)
  366. if err.Error() != "ClusterError: Member error (Unknown request type)" {
  367. t.Error("Unexpected response:", err)
  368. return
  369. }
  370. cluster3[0].distributionTableError = errors.New("testerror")
  371. _, err = ds[1].ds.sendDataRequest(cluster3[0].MemberManager.Name(), request)
  372. if err.Error() != "ClusterError: Member error (Storage is currently disabled on member: TestClusterMember-0 (testerror))" {
  373. t.Error("Unexpected response:", err)
  374. return
  375. }
  376. }
  377. /*
  378. createCluster creates a cluster with n members (all storage is in memory)
  379. */
  380. func createCluster(n int, rep float64) ([]*DistributedStorage, []*memberStorage) {
  381. // By default no log output
  382. log.SetOutput(ioutil.Discard)
  383. var mgs []*graphstorage.MemoryGraphStorage
  384. var dss []*DistributedStorage
  385. var mss []*memberStorage
  386. for i := 0; i < n; i++ {
  387. mgs = append(mgs, graphstorage.NewMemoryGraphStorage(fmt.Sprintf("mgs%v", i+1)).(*graphstorage.MemoryGraphStorage))
  388. }
  389. for i := 0; i < n; i++ {
  390. ds, ms, _ := newDistributedAndMemberStorage(mgs[i], map[string]interface{}{
  391. manager.ConfigRPC: fmt.Sprintf("localhost:%v", 9020+i),
  392. manager.ConfigMemberName: fmt.Sprintf("TestClusterMember-%v", i),
  393. manager.ConfigClusterSecret: "test123",
  394. manager.ConfigReplicationFactor: (rep + float64(i)),
  395. }, manager.NewMemStateInfo())
  396. dss = append(dss, ds)
  397. mss = append(mss, ms)
  398. }
  399. return dss, mss
  400. }
  401. /*
  402. clusterLayout returns the current storage layout in a cluster. Parameters is an
  403. array of memberStorages and a storage name.
  404. */
  405. func clusterLayout(ms []*memberStorage, smname string) string {
  406. buf := new(bytes.Buffer)
  407. for _, m := range ms {
  408. buf.WriteString(m.dump(smname))
  409. }
  410. return buf.String()
  411. }
  412. /*
  413. retrieveStringFromClusterLoc tries to retrieve a given cluster location from a given member storage.
  414. */
  415. func retrieveStringFromClusterLoc(ms *memberStorage, smname string, cloc uint64, exp string) error {
  416. var out interface{}
  417. var res string
  418. err := ms.handleFetchRequest(ms.ds.distributionTable, &DataRequest{RTFetch, map[DataRequestArg]interface{}{
  419. RPStoreName: smname,
  420. RPLoc: cloc,
  421. }, nil, false}, &out, true)
  422. if err == nil {
  423. // Decode stored value (this would be otherwise done on the receiving end)
  424. err = gob.NewDecoder(bytes.NewReader(out.([]byte))).Decode(&res)
  425. if err == nil && res != exp {
  426. err = fmt.Errorf("Unexpected cloc result: %v (expected: %v)", res, exp)
  427. }
  428. }
  429. return err
  430. }
  431. func checkStateInfo(mm *manager.MemberManager, expectedStateInfo string) error {
  432. var w bytes.Buffer
  433. ret := json.NewEncoder(&w)
  434. si := mm.StateInfo().Map()
  435. // We don't care about timestamps in this test since goroutines run
  436. // concurrently - we can't say which one will do the update first
  437. delete(si, "ts")
  438. delete(si, "tsold")
  439. ret.Encode(si)
  440. out := bytes.Buffer{}
  441. err := json.Indent(&out, w.Bytes(), "", " ")
  442. if err != nil {
  443. return err
  444. }
  445. if out.String() != expectedStateInfo {
  446. return fmt.Errorf("Unexpected state info: %v\nexpected: %v",
  447. out.String(), expectedStateInfo)
  448. }
  449. return nil
  450. }
  451. // Test network failure
  452. type testNetError struct {
  453. }
  454. func (*testNetError) Error() string {
  455. return "test.net.Error"
  456. }
  457. func (*testNetError) Timeout() bool {
  458. return false
  459. }
  460. func (*testNetError) Temporary() bool {
  461. return true
  462. }