graphmanager_cluster_test.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package graph
  11. import (
  12. "fmt"
  13. "io/ioutil"
  14. "log"
  15. "testing"
  16. "devt.de/krotik/eliasdb/cluster"
  17. "devt.de/krotik/eliasdb/cluster/manager"
  18. "devt.de/krotik/eliasdb/graph/data"
  19. "devt.de/krotik/eliasdb/graph/graphstorage"
  20. "devt.de/krotik/eliasdb/hash"
  21. )
  22. func TestClusterWithPhysicalStorage(t *testing.T) {
  23. log.SetOutput(ioutil.Discard)
  24. dgs1, err := graphstorage.NewDiskGraphStorage(GraphManagerTestDBDir5, false)
  25. if err != nil {
  26. t.Error(err)
  27. return
  28. }
  29. ds1, _ := cluster.NewDistributedStorage(dgs1, map[string]interface{}{
  30. manager.ConfigRPC: fmt.Sprintf("localhost:%v", 9021),
  31. manager.ConfigMemberName: fmt.Sprintf("TestClusterMember-1"),
  32. manager.ConfigClusterSecret: "test123",
  33. }, manager.NewMemStateInfo())
  34. ds1.Start()
  35. defer ds1.Close()
  36. dgs2, err := graphstorage.NewDiskGraphStorage(GraphManagerTestDBDir6, false)
  37. if err != nil {
  38. t.Error(err)
  39. return
  40. }
  41. ds2, _ := cluster.NewDistributedStorage(dgs2, map[string]interface{}{
  42. manager.ConfigRPC: fmt.Sprintf("localhost:%v", 9022),
  43. manager.ConfigMemberName: fmt.Sprintf("TestClusterMember-2"),
  44. manager.ConfigClusterSecret: "test123",
  45. }, manager.NewMemStateInfo())
  46. ds2.Start()
  47. defer ds2.Close()
  48. err = ds2.MemberManager.JoinCluster(ds1.MemberManager.Name(),
  49. ds1.MemberManager.NetAddr())
  50. if err != nil {
  51. t.Error(err)
  52. return
  53. }
  54. sm := ds1.StorageManager("foo", true)
  55. sm2 := ds2.StorageManager("foo", true)
  56. loc, err := sm.Insert("test123")
  57. if loc != 1 || err != nil {
  58. t.Error("Unexpected result:", loc, err)
  59. return
  60. }
  61. loc, err = sm2.Insert("test456")
  62. if loc != 2 || err != nil {
  63. t.Error("Unexpected result:", loc, err)
  64. return
  65. }
  66. res := ""
  67. if err := sm2.Fetch(1, &res); err != nil {
  68. t.Error(err)
  69. return
  70. }
  71. if res != "test123" {
  72. t.Error("Unexpected result:", res)
  73. return
  74. }
  75. if err := sm2.Fetch(2, &res); err != nil {
  76. t.Error(err)
  77. return
  78. }
  79. if res != "test456" {
  80. t.Error("Unexpected result:", res)
  81. return
  82. }
  83. // *** HTree storage
  84. // Use a HTree to insert to and fetch from a storage manager
  85. sm = ds1.StorageManager("foo2", true)
  86. sm2 = ds2.StorageManager("foo2", true)
  87. htree, err := hash.NewHTree(sm)
  88. if err != nil {
  89. t.Error(err)
  90. return
  91. }
  92. if valres, err := htree.Put([]byte("123"), "Test1"); err != nil || valres != nil {
  93. t.Error("Unexpected result:", valres, err)
  94. return
  95. }
  96. if valres, err := htree.Put([]byte("123"), "Test2"); err != nil || valres != "Test1" {
  97. t.Error("Unexpected result:", valres, err)
  98. return
  99. }
  100. // Try to retrieve the item again
  101. cluster.WaitForTransfer()
  102. if val, err := htree.Get([]byte("123")); err != nil || val != "Test2" {
  103. t.Error("Unexpected result:", val, err)
  104. return
  105. }
  106. htree2, _ := hash.LoadHTree(sm2, 1)
  107. if val, err := htree2.Get([]byte("123")); err != nil || val != "Test2" {
  108. t.Error("Unexpected result:", val, err)
  109. return
  110. }
  111. // *** GraphManager storage
  112. gm1 := NewGraphManager(ds1)
  113. if err := gm1.StoreNode("main", data.NewGraphNodeFromMap(map[string]interface{}{
  114. "key": "123",
  115. "kind": "testnode",
  116. "foo": "bar",
  117. })); err != nil {
  118. t.Error("Unexpected result:", err)
  119. return
  120. }
  121. cluster.WaitForTransfer()
  122. if node, err := gm1.FetchNode("main", "123", "testnode"); err != nil ||
  123. node.String() != `GraphNode:
  124. key : 123
  125. kind : testnode
  126. foo : bar
  127. ` {
  128. t.Error("Unexpected result:", node, err)
  129. return
  130. }
  131. gm2 := NewGraphManager(ds2)
  132. if node, err := gm2.FetchNode("main", "123", "testnode"); err != nil ||
  133. node.String() != `GraphNode:
  134. key : 123
  135. kind : testnode
  136. foo : bar
  137. ` {
  138. t.Error("Unexpected result:", node, err)
  139. return
  140. }
  141. }
  142. func TestClusterStorage(t *testing.T) {
  143. cluster2 := createCluster(2)
  144. joinCluster(cluster2, t)
  145. // *** Direct storage
  146. // Insert something into a storage manager and wait for the transfer
  147. sm := cluster2[0].StorageManager("foo", true)
  148. sm2 := cluster2[1].StorageManager("foo", true)
  149. loc, err := sm.Insert("test123")
  150. if loc != 1 || err != nil {
  151. t.Error("Unexpected result:", loc, err)
  152. return
  153. }
  154. cluster.WaitForTransfer()
  155. // Try to retrieve the item again
  156. // fmt.Println(cluster.DumpMemoryClusterLayout("foo"))
  157. var res string
  158. if err := sm.Fetch(1, &res); err != nil {
  159. t.Error(err)
  160. return
  161. }
  162. if res != "test123" {
  163. t.Error("Unexpected result:", res)
  164. return
  165. }
  166. res = ""
  167. if err := sm2.Fetch(1, &res); err != nil {
  168. t.Error(err)
  169. return
  170. }
  171. if res != "test123" {
  172. t.Error("Unexpected result:", res)
  173. return
  174. }
  175. // *** HTree storage
  176. // Use a HTree to insert to and fetch from a storage manager
  177. sm = cluster2[0].StorageManager("foo2", true)
  178. sm2 = cluster2[1].StorageManager("foo2", true)
  179. htree, err := hash.NewHTree(sm)
  180. if err != nil {
  181. t.Error(err)
  182. return
  183. }
  184. if valres, err := htree.Put([]byte("123"), "Test1"); err != nil || valres != nil {
  185. t.Error("Unexpected result:", valres, err)
  186. return
  187. }
  188. if valres, err := htree.Put([]byte("123"), "Test2"); err != nil || valres != "Test1" {
  189. t.Error("Unexpected result:", valres, err)
  190. return
  191. }
  192. // Try to retrieve the item again
  193. cluster.WaitForTransfer()
  194. if val, err := htree.Get([]byte("123")); err != nil || val != "Test2" {
  195. t.Error("Unexpected result:", val, err)
  196. return
  197. }
  198. htree2, _ := hash.LoadHTree(sm2, 1)
  199. if val, err := htree2.Get([]byte("123")); err != nil || val != "Test2" {
  200. t.Error("Unexpected result:", val, err)
  201. return
  202. }
  203. // *** GraphManager storage
  204. gm1 := NewGraphManager(cluster2[0])
  205. if err := gm1.StoreNode("main", data.NewGraphNodeFromMap(map[string]interface{}{
  206. "key": "123",
  207. "kind": "testnode",
  208. "foo": "bar",
  209. })); err != nil {
  210. t.Error("Unexpected result:", err)
  211. return
  212. }
  213. cluster.WaitForTransfer()
  214. if node, err := gm1.FetchNode("main", "123", "testnode"); err != nil ||
  215. node.String() != `GraphNode:
  216. key : 123
  217. kind : testnode
  218. foo : bar
  219. ` {
  220. t.Error("Unexpected result:", node, err)
  221. return
  222. }
  223. gm2 := NewGraphManager(cluster2[1])
  224. if node, err := gm2.FetchNode("main", "123", "testnode"); err != nil ||
  225. node.String() != `GraphNode:
  226. key : 123
  227. kind : testnode
  228. foo : bar
  229. ` {
  230. t.Error("Unexpected result:", node, err)
  231. return
  232. }
  233. }
  234. /*
  235. Create a cluster with n members (all storage is in memory)
  236. */
  237. func createCluster(n int) []*cluster.DistributedStorage {
  238. // By default no log output
  239. log.SetOutput(ioutil.Discard)
  240. var mgs []*graphstorage.MemoryGraphStorage
  241. var cs []*cluster.DistributedStorage
  242. cluster.ClearMSMap()
  243. for i := 0; i < n; i++ {
  244. mgs = append(mgs, graphstorage.NewMemoryGraphStorage(fmt.Sprintf("mgs%v", i+1)).(*graphstorage.MemoryGraphStorage))
  245. }
  246. for i := 0; i < n; i++ {
  247. ds, _ := cluster.NewDistributedStorage(mgs[i], map[string]interface{}{
  248. manager.ConfigRPC: fmt.Sprintf("localhost:%v", 9020+i),
  249. manager.ConfigMemberName: fmt.Sprintf("TestClusterMember-%v", i),
  250. manager.ConfigClusterSecret: "test123",
  251. }, manager.NewMemStateInfo())
  252. cs = append(cs, ds)
  253. }
  254. return cs
  255. }
  256. /*
  257. joinCluster joins up a given cluster.
  258. */
  259. func joinCluster(cluster []*cluster.DistributedStorage, t *testing.T) {
  260. for i, dd := range cluster {
  261. dd.Start()
  262. defer dd.Close()
  263. if i > 0 {
  264. err := dd.MemberManager.JoinCluster(cluster[0].MemberManager.Name(),
  265. cluster[0].MemberManager.NetAddr())
  266. if err != nil {
  267. t.Error(err)
  268. return
  269. }
  270. }
  271. }
  272. }