cluster_test.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package v1
  11. import (
  12. "bytes"
  13. "encoding/json"
  14. "errors"
  15. "fmt"
  16. "io/ioutil"
  17. "log"
  18. "os"
  19. "strings"
  20. "testing"
  21. "devt.de/krotik/common/datautil"
  22. "devt.de/krotik/eliasdb/api"
  23. "devt.de/krotik/eliasdb/cluster"
  24. "devt.de/krotik/eliasdb/cluster/manager"
  25. "devt.de/krotik/eliasdb/graph"
  26. "devt.de/krotik/eliasdb/graph/graphstorage"
  27. )
  28. func TestClusterStorage(t *testing.T) {
  29. clusterQueryURL := "http://localhost" + TESTPORT + EndpointClusterQuery
  30. graphURL := "http://localhost" + TESTPORT + EndpointGraph
  31. cluster2 := createCluster(2)
  32. oldGM := api.GM
  33. oldGS := api.GS
  34. api.GS = cluster2[0]
  35. api.GM = graph.NewGraphManager(cluster2[0])
  36. api.DD = cluster2[0]
  37. api.DDLog = datautil.NewRingBuffer(10)
  38. defer func() {
  39. api.GM = oldGM
  40. api.GS = oldGS
  41. api.DD = nil
  42. api.DDLog = nil
  43. }()
  44. // We should now get back a state
  45. st, _, res := sendTestRequest(clusterQueryURL, "GET", nil)
  46. if st != "200 OK" || res != `
  47. {
  48. "failed": null,
  49. "members": [
  50. "TestClusterMember-0",
  51. "localhost:9020"
  52. ],
  53. "replication": 1,
  54. "ts": [
  55. "TestClusterMember-0",
  56. "1"
  57. ],
  58. "tsold": [
  59. "",
  60. "0"
  61. ]
  62. }`[1:] {
  63. t.Error("Unexpected response:", st, res)
  64. return
  65. }
  66. // Insert some data
  67. st, _, res = sendTestRequest(graphURL+"i41health/n", "POST", []byte(`
  68. [{
  69. "key":"3",
  70. "kind":"Upload",
  71. "parcel": "12345"
  72. }]
  73. `[1:]))
  74. cluster.WaitForTransfer()
  75. fmt.Println(cluster.DumpMemoryClusterLayout("i41healthUpload.nodes"))
  76. n, err := api.GM.FetchNode("i41health", "3", "Upload")
  77. fmt.Println("res:", n, err)
  78. }
  79. func TestClusterQuery(t *testing.T) {
  80. queryURL := "http://localhost" + TESTPORT + EndpointClusterQuery
  81. st, _, res := sendTestRequest(queryURL, "GET", nil)
  82. // We should get a failure back if clustering is not available
  83. if st != "503 Service Unavailable" || res != "Clustering is not enabled on this instance" {
  84. t.Error("Unexpected response:", st, res)
  85. return
  86. }
  87. st, _, res = sendTestRequest(queryURL, "DELETE", nil)
  88. // We should get a failure back if clustering is not available
  89. if st != "503 Service Unavailable" || res != "Clustering is not enabled on this instance" {
  90. t.Error("Unexpected response:", st, res)
  91. return
  92. }
  93. // Create now a small cluster
  94. cluster2 := createCluster(2)
  95. oldGM := api.GM
  96. oldGS := api.GS
  97. api.GS = cluster2[0]
  98. api.GM = graph.NewGraphManager(cluster2[0])
  99. api.DD = cluster2[0]
  100. api.DDLog = datautil.NewRingBuffer(10)
  101. defer func() {
  102. api.GM = oldGM
  103. api.GS = oldGS
  104. api.DD = nil
  105. api.DDLog = nil
  106. }()
  107. // We should now get back a state
  108. st, _, res = sendTestRequest(queryURL, "GET", nil)
  109. if st != "200 OK" || res != `
  110. {
  111. "failed": null,
  112. "members": [
  113. "TestClusterMember-0",
  114. "localhost:9020"
  115. ],
  116. "replication": 1,
  117. "ts": [
  118. "TestClusterMember-0",
  119. "1"
  120. ],
  121. "tsold": [
  122. "",
  123. "0"
  124. ]
  125. }`[1:] {
  126. t.Error("Unexpected response:", st, res)
  127. return
  128. }
  129. st, _, res = sendTestRequest(queryURL+"memberinfos", "GET", nil)
  130. if st != "200 OK" || res != `
  131. {
  132. "TestClusterMember-0": {}
  133. }`[1:] {
  134. t.Error("Unexpected response:", st, res)
  135. return
  136. }
  137. api.DDLog.Add("test cluster message1")
  138. api.DDLog.Add("test cluster message2")
  139. st, _, res = sendTestRequest(queryURL+"log", "GET", nil)
  140. if st != "200 OK" || res != `
  141. [
  142. "test cluster message1",
  143. "test cluster message2"
  144. ]`[1:] {
  145. t.Error("Unexpected response:", st, res)
  146. return
  147. }
  148. st, _, _ = sendTestRequest(queryURL+"log", "DELETE", nil)
  149. if st != "200 OK" {
  150. t.Error("Unexpected response:", st)
  151. return
  152. }
  153. st, _, res = sendTestRequest(queryURL+"log", "GET", nil)
  154. if st != "200 OK" || res != `
  155. []`[1:] {
  156. t.Error("Unexpected response:", st, res)
  157. return
  158. }
  159. st, _, _ = sendTestRequest(queryURL+"bla", "DELETE", nil)
  160. if st != "400 Bad Request" {
  161. t.Error("Unexpected response:", st)
  162. return
  163. }
  164. log.SetOutput(ioutil.Discard)
  165. cluster2[0].MemberManager.Start()
  166. cluster2[1].MemberManager.Start()
  167. defer func() {
  168. cluster2[0].MemberManager.Shutdown()
  169. cluster2[1].MemberManager.Shutdown()
  170. log.SetOutput(os.Stdout)
  171. }()
  172. jsonString, err := json.Marshal(map[string]interface{}{
  173. "name": cluster2[1].MemberManager.Name(),
  174. "netaddr": cluster2[1].MemberManager.NetAddr(),
  175. })
  176. if err != nil {
  177. t.Error(err)
  178. return
  179. }
  180. st, _, res = sendTestRequest(queryURL+"ping", "PUT", jsonString)
  181. if st != "200 OK" || res != `[
  182. "Pong"
  183. ]` {
  184. t.Error("Unexpected response:", st, res)
  185. return
  186. }
  187. manager.MemberErrors = make(map[string]error)
  188. manager.MemberErrors[cluster2[1].Name()] = errors.New("testerror")
  189. sendTestRequest(queryURL+"eject", "PUT", jsonString)
  190. st, _, res = sendTestRequest(queryURL+"ping", "PUT", jsonString)
  191. if st != "403 Forbidden" || res != "Ping returned an error: ClusterError: Member error (testerror)" {
  192. t.Error("Unexpected response:", st, res)
  193. return
  194. }
  195. manager.MemberErrors = nil
  196. st, _, res = sendTestRequest(queryURL, "PUT", nil)
  197. if st != "400 Bad Request" || res != "Need a command either: join or eject" {
  198. t.Error("Unexpected response:", st, res)
  199. return
  200. }
  201. jsonString, err = json.Marshal(map[string]interface{}{
  202. "name": "bla",
  203. })
  204. if err != nil {
  205. t.Error(err)
  206. return
  207. }
  208. st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
  209. if st != "400 Bad Request" || res != "Required argument netaddr missing in body arguments" {
  210. t.Error("Unexpected response:", st, res)
  211. return
  212. }
  213. jsonString, err = json.Marshal(map[string]interface{}{
  214. "name": "bla",
  215. "netaddr": cluster2[1].MemberManager.NetAddr(),
  216. })
  217. if err != nil {
  218. t.Error(err)
  219. return
  220. }
  221. st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
  222. if st != "403 Forbidden" || res != "Could not join the cluster: ClusterError: Member error (Unknown target member)" {
  223. t.Error("Unexpected response:", st, res)
  224. return
  225. }
  226. jsonString, err = json.Marshal(map[string]interface{}{
  227. "name": cluster2[1].MemberManager.Name(),
  228. "netaddr": cluster2[1].MemberManager.NetAddr(),
  229. })
  230. if err != nil {
  231. t.Error(err)
  232. return
  233. }
  234. st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
  235. if st != "200 OK" || res != "" {
  236. t.Error("Unexpected response:", st, res)
  237. return
  238. }
  239. // Check state info
  240. if err := checkStateInfo(cluster2[1].MemberManager, `
  241. {
  242. "failed": null,
  243. "members": [
  244. "TestClusterMember-1",
  245. "localhost:9021",
  246. "TestClusterMember-0",
  247. "localhost:9020"
  248. ],
  249. "replication": 1,
  250. "ts": [
  251. "TestClusterMember-1",
  252. "2"
  253. ],
  254. "tsold": [
  255. "TestClusterMember-1",
  256. "1"
  257. ]
  258. }
  259. `[1:]); err != nil {
  260. t.Error(err)
  261. return
  262. }
  263. // Set some member info and read it back
  264. cluster2[1].MemberManager.MemberInfo()["test123"] = "123"
  265. st, _, res = sendTestRequest(queryURL+"memberinfos", "GET", nil)
  266. if st != "200 OK" || res != `
  267. {
  268. "TestClusterMember-0": {},
  269. "TestClusterMember-1": {
  270. "test123": "123"
  271. }
  272. }`[1:] {
  273. t.Error("Unexpected response:", st, res)
  274. return
  275. }
  276. // Eject member from cluster
  277. jsonString, err = json.Marshal(map[string]interface{}{
  278. "name": cluster2[0].MemberManager.Name(),
  279. })
  280. if err != nil {
  281. t.Error(err)
  282. return
  283. }
  284. manager.MemberErrors = make(map[string]error)
  285. manager.MemberErrors[cluster2[1].Name()] = errors.New("testerror")
  286. st, _, res = sendTestRequest(queryURL+"eject", "PUT", jsonString)
  287. if st != "403 Forbidden" || res != "Could not eject TestClusterMember-0 from cluster: ClusterError: Member error (testerror)" {
  288. t.Error("Unexpected response:", st, res)
  289. return
  290. }
  291. manager.MemberErrors = nil
  292. jsonString, err = json.Marshal(map[string]interface{}{
  293. "name": cluster2[0].MemberManager.Name(),
  294. })
  295. if err != nil {
  296. t.Error(err)
  297. return
  298. }
  299. st, _, res = sendTestRequest(queryURL+"eject", "PUT", jsonString)
  300. if st != "200 OK" || res != "" {
  301. t.Error("Unexpected response:", st, res)
  302. return
  303. }
  304. if err := checkStateInfo(cluster2[1].MemberManager, `
  305. {
  306. "failed": null,
  307. "members": [
  308. "TestClusterMember-1",
  309. "localhost:9021"
  310. ],
  311. "replication": 1,
  312. "ts": [
  313. "TestClusterMember-1",
  314. "3"
  315. ],
  316. "tsold": [
  317. "TestClusterMember-1",
  318. "2"
  319. ]
  320. }
  321. `[1:]); err != nil {
  322. t.Error(err)
  323. return
  324. }
  325. // Some error cases
  326. st, _, res = sendTestRequest(queryURL+"bla", "PUT", jsonString[2:])
  327. if st != "400 Bad Request" || !strings.HasPrefix(res, "Could not decode arguments") {
  328. t.Error("Unexpected response:", st, res)
  329. return
  330. }
  331. st, _, res = sendTestRequest(queryURL+"bla", "PUT", jsonString)
  332. if st != "400 Bad Request" || res != "Unknown command: bla" {
  333. t.Error("Unexpected response:", st, res)
  334. return
  335. }
  336. }
  337. /*
  338. Create a cluster with n members (all storage is in memory)
  339. */
  340. func createCluster(n int) []*cluster.DistributedStorage {
  341. var mgs []*graphstorage.MemoryGraphStorage
  342. var cs []*cluster.DistributedStorage
  343. cluster.ClearMSMap()
  344. for i := 0; i < n; i++ {
  345. mgs = append(mgs, graphstorage.NewMemoryGraphStorage(fmt.Sprintf("mgs%v", i+1)).(*graphstorage.MemoryGraphStorage))
  346. }
  347. for i := 0; i < n; i++ {
  348. ds, _ := cluster.NewDistributedStorage(mgs[i], map[string]interface{}{
  349. manager.ConfigRPC: fmt.Sprintf("localhost:%v", 9020+i),
  350. manager.ConfigMemberName: fmt.Sprintf("TestClusterMember-%v", i),
  351. manager.ConfigClusterSecret: "test123",
  352. }, manager.NewMemStateInfo())
  353. cs = append(cs, ds)
  354. }
  355. return cs
  356. }
  357. func checkStateInfo(mm *manager.MemberManager, expectedStateInfo string) error {
  358. var w bytes.Buffer
  359. ret := json.NewEncoder(&w)
  360. ret.Encode(mm.StateInfo().Map())
  361. out := bytes.Buffer{}
  362. err := json.Indent(&out, w.Bytes(), "", " ")
  363. if err != nil {
  364. return err
  365. }
  366. if out.String() != expectedStateInfo {
  367. return fmt.Errorf("Unexpected state info: %v\nexpected: %v",
  368. out.String(), expectedStateInfo)
  369. }
  370. return nil
  371. }