cluster_test.go 8.8 KB


  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package v1
  11. import (
  12. "bytes"
  13. "encoding/json"
  14. "errors"
  15. "fmt"
  16. "io/ioutil"
  17. "log"
  18. "os"
  19. "strings"
  20. "testing"
  21. "devt.de/krotik/common/datautil"
  22. "devt.de/krotik/eliasdb/api"
  23. "devt.de/krotik/eliasdb/cluster"
  24. "devt.de/krotik/eliasdb/cluster/manager"
  25. "devt.de/krotik/eliasdb/graph"
  26. "devt.de/krotik/eliasdb/graph/graphstorage"
  27. )
  28. func TestClusterStorage(t *testing.T) {
  29. cluster2 := createCluster(2)
  30. oldGM := api.GM
  31. oldGS := api.GS
  32. api.GS = cluster2[0]
  33. api.GM = graph.NewGraphManager(cluster2[0])
  34. api.DD = cluster2[0]
  35. api.DDLog = datautil.NewRingBuffer(10)
  36. defer func() {
  37. api.GM = oldGM
  38. api.GS = oldGS
  39. api.DD = nil
  40. api.DDLog = nil
  41. }()
  42. }
  43. func TestClusterQuery(t *testing.T) {
  44. queryURL := "http://localhost" + TESTPORT + EndpointClusterQuery
  45. st, _, res := sendTestRequest(queryURL, "GET", nil)
  46. // We should get a failure back if clustering is not available
  47. if st != "503 Service Unavailable" || res != "Clustering is not enabled on this instance" {
  48. t.Error("Unexpected response:", st, res)
  49. return
  50. }
  51. st, _, res = sendTestRequest(queryURL, "DELETE", nil)
  52. // We should get a failure back if clustering is not available
  53. if st != "503 Service Unavailable" || res != "Clustering is not enabled on this instance" {
  54. t.Error("Unexpected response:", st, res)
  55. return
  56. }
  57. // Create now a small cluster
  58. cluster2 := createCluster(2)
  59. oldGM := api.GM
  60. oldGS := api.GS
  61. api.GS = cluster2[0]
  62. api.GM = graph.NewGraphManager(cluster2[0])
  63. api.DD = cluster2[0]
  64. api.DDLog = datautil.NewRingBuffer(10)
  65. defer func() {
  66. api.GM = oldGM
  67. api.GS = oldGS
  68. api.DD = nil
  69. api.DDLog = nil
  70. }()
  71. // We should now get back a state
  72. st, _, res = sendTestRequest(queryURL, "GET", nil)
  73. if st != "200 OK" || res != `
  74. {
  75. "failed": null,
  76. "members": [
  77. "TestClusterMember-0",
  78. "localhost:9020"
  79. ],
  80. "replication": 1,
  81. "ts": [
  82. "TestClusterMember-0",
  83. "1"
  84. ],
  85. "tsold": [
  86. "",
  87. "0"
  88. ]
  89. }`[1:] {
  90. t.Error("Unexpected response:", st, res)
  91. return
  92. }
  93. st, _, res = sendTestRequest(queryURL+"memberinfos", "GET", nil)
  94. if st != "200 OK" || res != `
  95. {
  96. "TestClusterMember-0": {}
  97. }`[1:] {
  98. t.Error("Unexpected response:", st, res)
  99. return
  100. }
  101. api.DDLog.Add("test cluster message1")
  102. api.DDLog.Add("test cluster message2")
  103. st, _, res = sendTestRequest(queryURL+"log", "GET", nil)
  104. if st != "200 OK" || res != `
  105. [
  106. "test cluster message1",
  107. "test cluster message2"
  108. ]`[1:] {
  109. t.Error("Unexpected response:", st, res)
  110. return
  111. }
  112. st, _, _ = sendTestRequest(queryURL+"log", "DELETE", nil)
  113. if st != "200 OK" {
  114. t.Error("Unexpected response:", st)
  115. return
  116. }
  117. st, _, res = sendTestRequest(queryURL+"log", "GET", nil)
  118. if st != "200 OK" || res != `
  119. []`[1:] {
  120. t.Error("Unexpected response:", st, res)
  121. return
  122. }
  123. st, _, _ = sendTestRequest(queryURL+"bla", "DELETE", nil)
  124. if st != "400 Bad Request" {
  125. t.Error("Unexpected response:", st)
  126. return
  127. }
  128. log.SetOutput(ioutil.Discard)
  129. cluster2[0].MemberManager.Start()
  130. cluster2[1].MemberManager.Start()
  131. defer func() {
  132. cluster2[0].MemberManager.Shutdown()
  133. cluster2[1].MemberManager.Shutdown()
  134. log.SetOutput(os.Stdout)
  135. }()
  136. jsonString, err := json.Marshal(map[string]interface{}{
  137. "name": cluster2[1].MemberManager.Name(),
  138. "netaddr": cluster2[1].MemberManager.NetAddr(),
  139. })
  140. if err != nil {
  141. t.Error(err)
  142. return
  143. }
  144. st, _, res = sendTestRequest(queryURL+"ping", "PUT", jsonString)
  145. if st != "200 OK" || res != `[
  146. "Pong"
  147. ]` {
  148. t.Error("Unexpected response:", st, res)
  149. return
  150. }
  151. manager.MemberErrors = make(map[string]error)
  152. manager.MemberErrors[cluster2[1].Name()] = errors.New("testerror")
  153. sendTestRequest(queryURL+"eject", "PUT", jsonString)
  154. st, _, res = sendTestRequest(queryURL+"ping", "PUT", jsonString)
  155. if st != "403 Forbidden" || res != "Ping returned an error: ClusterError: Member error (testerror)" {
  156. t.Error("Unexpected response:", st, res)
  157. return
  158. }
  159. manager.MemberErrors = nil
  160. st, _, res = sendTestRequest(queryURL, "PUT", nil)
  161. if st != "400 Bad Request" || res != "Need a command either: join or eject" {
  162. t.Error("Unexpected response:", st, res)
  163. return
  164. }
  165. jsonString, err = json.Marshal(map[string]interface{}{
  166. "name": "bla",
  167. })
  168. if err != nil {
  169. t.Error(err)
  170. return
  171. }
  172. st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
  173. if st != "400 Bad Request" || res != "Required argument netaddr missing in body arguments" {
  174. t.Error("Unexpected response:", st, res)
  175. return
  176. }
  177. jsonString, err = json.Marshal(map[string]interface{}{
  178. "name": "bla",
  179. "netaddr": cluster2[1].MemberManager.NetAddr(),
  180. })
  181. if err != nil {
  182. t.Error(err)
  183. return
  184. }
  185. st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
  186. if st != "403 Forbidden" || res != "Could not join the cluster: ClusterError: Member error (Unknown target member)" {
  187. t.Error("Unexpected response:", st, res)
  188. return
  189. }
  190. jsonString, err = json.Marshal(map[string]interface{}{
  191. "name": cluster2[1].MemberManager.Name(),
  192. "netaddr": cluster2[1].MemberManager.NetAddr(),
  193. })
  194. if err != nil {
  195. t.Error(err)
  196. return
  197. }
  198. st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
  199. if st != "200 OK" || res != "" {
  200. t.Error("Unexpected response:", st, res)
  201. return
  202. }
  203. // Check state info
  204. if err := checkStateInfo(cluster2[1].MemberManager, `
  205. {
  206. "failed": null,
  207. "members": [
  208. "TestClusterMember-1",
  209. "localhost:9021",
  210. "TestClusterMember-0",
  211. "localhost:9020"
  212. ],
  213. "replication": 1,
  214. "ts": [
  215. "TestClusterMember-1",
  216. "2"
  217. ],
  218. "tsold": [
  219. "TestClusterMember-1",
  220. "1"
  221. ]
  222. }
  223. `[1:]); err != nil {
  224. t.Error(err)
  225. return
  226. }
  227. // Set some member info and read it back
  228. cluster2[1].MemberManager.MemberInfo()["test123"] = "123"
  229. st, _, res = sendTestRequest(queryURL+"memberinfos", "GET", nil)
  230. if st != "200 OK" || res != `
  231. {
  232. "TestClusterMember-0": {},
  233. "TestClusterMember-1": {
  234. "test123": "123"
  235. }
  236. }`[1:] {
  237. t.Error("Unexpected response:", st, res)
  238. return
  239. }
  240. // Eject member from cluster
  241. jsonString, err = json.Marshal(map[string]interface{}{
  242. "name": cluster2[0].MemberManager.Name(),
  243. })
  244. if err != nil {
  245. t.Error(err)
  246. return
  247. }
  248. manager.MemberErrors = make(map[string]error)
  249. manager.MemberErrors[cluster2[1].Name()] = errors.New("testerror")
  250. st, _, res = sendTestRequest(queryURL+"eject", "PUT", jsonString)
  251. if st != "403 Forbidden" || res != "Could not eject TestClusterMember-0 from cluster: ClusterError: Member error (testerror)" {
  252. t.Error("Unexpected response:", st, res)
  253. return
  254. }
  255. manager.MemberErrors = nil
  256. jsonString, err = json.Marshal(map[string]interface{}{
  257. "name": cluster2[0].MemberManager.Name(),
  258. })
  259. if err != nil {
  260. t.Error(err)
  261. return
  262. }
  263. st, _, res = sendTestRequest(queryURL+"eject", "PUT", jsonString)
  264. if st != "200 OK" || res != "" {
  265. t.Error("Unexpected response:", st, res)
  266. return
  267. }
  268. if err := checkStateInfo(cluster2[1].MemberManager, `
  269. {
  270. "failed": null,
  271. "members": [
  272. "TestClusterMember-1",
  273. "localhost:9021"
  274. ],
  275. "replication": 1,
  276. "ts": [
  277. "TestClusterMember-1",
  278. "3"
  279. ],
  280. "tsold": [
  281. "TestClusterMember-1",
  282. "2"
  283. ]
  284. }
  285. `[1:]); err != nil {
  286. t.Error(err)
  287. return
  288. }
  289. // Some error cases
  290. st, _, res = sendTestRequest(queryURL+"bla", "PUT", jsonString[2:])
  291. if st != "400 Bad Request" || !strings.HasPrefix(res, "Could not decode arguments") {
  292. t.Error("Unexpected response:", st, res)
  293. return
  294. }
  295. st, _, res = sendTestRequest(queryURL+"bla", "PUT", jsonString)
  296. if st != "400 Bad Request" || res != "Unknown command: bla" {
  297. t.Error("Unexpected response:", st, res)
  298. return
  299. }
  300. }
  301. /*
  302. Create a cluster with n members (all storage is in memory)
  303. */
  304. func createCluster(n int) []*cluster.DistributedStorage {
  305. var mgs []*graphstorage.MemoryGraphStorage
  306. var cs []*cluster.DistributedStorage
  307. for i := 0; i < n; i++ {
  308. mgs = append(mgs, graphstorage.NewMemoryGraphStorage(fmt.Sprintf("mgs%v", i+1)).(*graphstorage.MemoryGraphStorage))
  309. }
  310. for i := 0; i < n; i++ {
  311. ds, _ := cluster.NewDistributedStorage(mgs[i], map[string]interface{}{
  312. manager.ConfigRPC: fmt.Sprintf("localhost:%v", 9020+i),
  313. manager.ConfigMemberName: fmt.Sprintf("TestClusterMember-%v", i),
  314. manager.ConfigClusterSecret: "test123",
  315. }, manager.NewMemStateInfo())
  316. cs = append(cs, ds)
  317. }
  318. return cs
  319. }
  320. func checkStateInfo(mm *manager.MemberManager, expectedStateInfo string) error {
  321. var w bytes.Buffer
  322. ret := json.NewEncoder(&w)
  323. ret.Encode(mm.StateInfo().Map())
  324. out := bytes.Buffer{}
  325. err := json.Indent(&out, w.Bytes(), "", " ")
  326. if err != nil {
  327. return err
  328. }
  329. if out.String() != expectedStateInfo {
  330. return fmt.Errorf("Unexpected state info: %v\nexpected: %v",
  331. out.String(), expectedStateInfo)
  332. }
  333. return nil
  334. }