graphmanager_cluster_test.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package graph
  11. import (
  12. "fmt"
  13. "io/ioutil"
  14. "log"
  15. "testing"
  16. "devt.de/krotik/eliasdb/cluster"
  17. "devt.de/krotik/eliasdb/cluster/manager"
  18. "devt.de/krotik/eliasdb/graph/data"
  19. "devt.de/krotik/eliasdb/graph/graphstorage"
  20. "devt.de/krotik/eliasdb/hash"
  21. )
  22. func TestClusterStorage(t *testing.T) {
  23. cluster2 := createCluster(2)
  24. joinCluster(cluster2, t)
  25. // *** Direct storage
  26. // Insert something into a storage manager and wait for the transfer
  27. sm := cluster2[0].StorageManager("foo", true)
  28. sm2 := cluster2[1].StorageManager("foo", true)
  29. loc, err := sm.Insert("test123")
  30. if loc != 1 || err != nil {
  31. t.Error("Unexpected result:", loc, err)
  32. return
  33. }
  34. cluster.WaitForTransfer()
  35. // Try to retrieve the item again
  36. // fmt.Println(cluster.DumpMemoryClusterLayout("foo"))
  37. var res string
  38. if err := sm.Fetch(1, &res); err != nil {
  39. t.Error(err)
  40. return
  41. }
  42. if res != "test123" {
  43. t.Error("Unexpected result:", res)
  44. return
  45. }
  46. res = ""
  47. if err := sm2.Fetch(1, &res); err != nil {
  48. t.Error(err)
  49. return
  50. }
  51. if res != "test123" {
  52. t.Error("Unexpected result:", res)
  53. return
  54. }
  55. // *** HTree storage
  56. // Use a HTree to insert to and fetch from a storage manager
  57. sm = cluster2[0].StorageManager("foo2", true)
  58. sm2 = cluster2[1].StorageManager("foo2", true)
  59. htree, err := hash.NewHTree(sm)
  60. if err != nil {
  61. t.Error(err)
  62. return
  63. }
  64. if valres, err := htree.Put([]byte("123"), "Test1"); err != nil || valres != nil {
  65. t.Error("Unexpected result:", valres, err)
  66. return
  67. }
  68. if valres, err := htree.Put([]byte("123"), "Test2"); err != nil || valres != "Test1" {
  69. t.Error("Unexpected result:", valres, err)
  70. return
  71. }
  72. // Try to retrieve the item again
  73. cluster.WaitForTransfer()
  74. if val, err := htree.Get([]byte("123")); err != nil || val != "Test2" {
  75. t.Error("Unexpected result:", val, err)
  76. return
  77. }
  78. htree2, err := hash.LoadHTree(sm2, 1)
  79. if val, err := htree2.Get([]byte("123")); err != nil || val != "Test2" {
  80. t.Error("Unexpected result:", val, err)
  81. return
  82. }
  83. // *** GraphManager storage
  84. gm1 := NewGraphManager(cluster2[0])
  85. if err := gm1.StoreNode("main", data.NewGraphNodeFromMap(map[string]interface{}{
  86. "key": "123",
  87. "kind": "testnode",
  88. "foo": "bar",
  89. })); err != nil {
  90. t.Error("Unexpected result:", err)
  91. return
  92. }
  93. cluster.WaitForTransfer()
  94. if node, err := gm1.FetchNode("main", "123", "testnode"); err != nil ||
  95. node.String() != `GraphNode:
  96. key : 123
  97. kind : testnode
  98. foo : bar
  99. ` {
  100. t.Error("Unexpected result:", node, err)
  101. return
  102. }
  103. gm2 := NewGraphManager(cluster2[1])
  104. if node, err := gm2.FetchNode("main", "123", "testnode"); err != nil ||
  105. node.String() != `GraphNode:
  106. key : 123
  107. kind : testnode
  108. foo : bar
  109. ` {
  110. t.Error("Unexpected result:", node, err)
  111. return
  112. }
  113. }
  114. /*
  115. Create a cluster with n members (all storage is in memory)
  116. */
  117. func createCluster(n int) []*cluster.DistributedStorage {
  118. // By default no log output
  119. log.SetOutput(ioutil.Discard)
  120. var mgs []*graphstorage.MemoryGraphStorage
  121. var cs []*cluster.DistributedStorage
  122. cluster.ClearMSMap()
  123. for i := 0; i < n; i++ {
  124. mgs = append(mgs, graphstorage.NewMemoryGraphStorage(fmt.Sprintf("mgs%v", i+1)).(*graphstorage.MemoryGraphStorage))
  125. }
  126. for i := 0; i < n; i++ {
  127. ds, _ := cluster.NewDistributedStorage(mgs[i], map[string]interface{}{
  128. manager.ConfigRPC: fmt.Sprintf("localhost:%v", 9020+i),
  129. manager.ConfigMemberName: fmt.Sprintf("TestClusterMember-%v", i),
  130. manager.ConfigClusterSecret: "test123",
  131. }, manager.NewMemStateInfo())
  132. cs = append(cs, ds)
  133. }
  134. return cs
  135. }
  136. /*
  137. joinCluster joins up a given cluster.
  138. */
  139. func joinCluster(cluster []*cluster.DistributedStorage, t *testing.T) {
  140. for i, dd := range cluster {
  141. dd.Start()
  142. defer dd.Close()
  143. if i > 0 {
  144. err := dd.MemberManager.JoinCluster(cluster[0].MemberManager.Name(),
  145. cluster[0].MemberManager.NetAddr())
  146. if err != nil {
  147. t.Error(err)
  148. return
  149. }
  150. }
  151. }
  152. }