cluster_test.go 13 KB


  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package v1
  11. import (
  12. "bytes"
  13. "encoding/json"
  14. "errors"
  15. "fmt"
  16. "io/ioutil"
  17. "log"
  18. "os"
  19. "strings"
  20. "testing"
  21. "devt.de/krotik/common/datautil"
  22. "devt.de/krotik/eliasdb/api"
  23. "devt.de/krotik/eliasdb/cluster"
  24. "devt.de/krotik/eliasdb/cluster/manager"
  25. "devt.de/krotik/eliasdb/graph"
  26. "devt.de/krotik/eliasdb/graph/graphstorage"
  27. )
  28. func TestClusterStorage(t *testing.T) {
  29. clusterQueryURL := "http://localhost" + TESTPORT + EndpointClusterQuery
  30. graphURL := "http://localhost" + TESTPORT + EndpointGraph
  31. cluster2 := createCluster(2)
  32. joinCluster(cluster2, t)
  33. oldGM := api.GM
  34. oldGS := api.GS
  35. api.GS = cluster2[0]
  36. api.GM = graph.NewGraphManager(cluster2[0])
  37. api.DD = cluster2[0]
  38. api.DDLog = datautil.NewRingBuffer(10)
  39. defer func() {
  40. api.GM = oldGM
  41. api.GS = oldGS
  42. api.DD = nil
  43. api.DDLog = nil
  44. }()
  45. // We should now get back a state
  46. st, _, res := sendTestRequest(clusterQueryURL, "GET", nil)
  47. if st != "200 OK" || res != `
  48. {
  49. "failed": null,
  50. "members": [
  51. "TestClusterMember-0",
  52. "localhost:9020",
  53. "TestClusterMember-1",
  54. "localhost:9021"
  55. ],
  56. "replication": 1,
  57. "ts": [
  58. "TestClusterMember-0",
  59. "2"
  60. ],
  61. "tsold": [
  62. "TestClusterMember-0",
  63. "1"
  64. ]
  65. }`[1:] {
  66. t.Error("Unexpected response:", st, res)
  67. return
  68. }
  69. // Insert some data
  70. st, _, res = sendTestRequest(graphURL+"i41health/n", "POST", []byte(`
  71. [{
  72. "key":"3",
  73. "kind":"Upload",
  74. "parcel": "12345"
  75. }]
  76. `[1:]))
  77. cluster.WaitForTransfer()
  78. n, err := api.GM.FetchNode("i41health", "3", "Upload")
  79. if err != nil || n.String() != `GraphNode:
  80. key : 3
  81. kind : Upload
  82. parcel : 12345
  83. ` {
  84. t.Error("Unexpected result:", n, err)
  85. return
  86. }
  87. }
  88. func TestClusterQuery(t *testing.T) {
  89. queryURL := "http://localhost" + TESTPORT + EndpointClusterQuery
  90. st, _, res := sendTestRequest(queryURL, "GET", nil)
  91. // We should get a failure back if clustering is not available
  92. if st != "503 Service Unavailable" || res != "Clustering is not enabled on this instance" {
  93. t.Error("Unexpected response:", st, res)
  94. return
  95. }
  96. st, _, res = sendTestRequest(queryURL, "DELETE", nil)
  97. // We should get a failure back if clustering is not available
  98. if st != "503 Service Unavailable" || res != "Clustering is not enabled on this instance" {
  99. t.Error("Unexpected response:", st, res)
  100. return
  101. }
  102. // Create now a small cluster
  103. cluster2 := createCluster(2)
  104. oldGM := api.GM
  105. oldGS := api.GS
  106. api.GS = cluster2[0]
  107. api.GM = graph.NewGraphManager(cluster2[0])
  108. api.DD = cluster2[0]
  109. api.DDLog = datautil.NewRingBuffer(10)
  110. defer func() {
  111. api.GM = oldGM
  112. api.GS = oldGS
  113. api.DD = nil
  114. api.DDLog = nil
  115. }()
  116. // We should now get back a state
  117. st, _, res = sendTestRequest(queryURL, "GET", nil)
  118. if st != "200 OK" || res != `
  119. {
  120. "failed": null,
  121. "members": [
  122. "TestClusterMember-0",
  123. "localhost:9020"
  124. ],
  125. "replication": 1,
  126. "ts": [
  127. "TestClusterMember-0",
  128. "1"
  129. ],
  130. "tsold": [
  131. "",
  132. "0"
  133. ]
  134. }`[1:] {
  135. t.Error("Unexpected response:", st, res)
  136. return
  137. }
  138. st, _, res = sendTestRequest(queryURL+"memberinfos", "GET", nil)
  139. if st != "200 OK" || res != `
  140. {
  141. "TestClusterMember-0": {}
  142. }`[1:] {
  143. t.Error("Unexpected response:", st, res)
  144. return
  145. }
  146. api.DDLog.Add("test cluster message1")
  147. api.DDLog.Add("test cluster message2")
  148. st, _, res = sendTestRequest(queryURL+"log", "GET", nil)
  149. if st != "200 OK" || res != `
  150. [
  151. "test cluster message1",
  152. "test cluster message2"
  153. ]`[1:] {
  154. t.Error("Unexpected response:", st, res)
  155. return
  156. }
  157. st, _, _ = sendTestRequest(queryURL+"log", "DELETE", nil)
  158. if st != "200 OK" {
  159. t.Error("Unexpected response:", st)
  160. return
  161. }
  162. st, _, res = sendTestRequest(queryURL+"log", "GET", nil)
  163. if st != "200 OK" || res != `
  164. []`[1:] {
  165. t.Error("Unexpected response:", st, res)
  166. return
  167. }
  168. st, _, _ = sendTestRequest(queryURL+"bla", "DELETE", nil)
  169. if st != "400 Bad Request" {
  170. t.Error("Unexpected response:", st)
  171. return
  172. }
  173. log.SetOutput(ioutil.Discard)
  174. cluster2[0].MemberManager.Start()
  175. cluster2[1].MemberManager.Start()
  176. defer func() {
  177. cluster2[0].MemberManager.Shutdown()
  178. cluster2[1].MemberManager.Shutdown()
  179. log.SetOutput(os.Stdout)
  180. }()
  181. jsonString, err := json.Marshal(map[string]interface{}{
  182. "name": cluster2[1].MemberManager.Name(),
  183. "netaddr": cluster2[1].MemberManager.NetAddr(),
  184. })
  185. if err != nil {
  186. t.Error(err)
  187. return
  188. }
  189. st, _, res = sendTestRequest(queryURL+"ping", "PUT", jsonString)
  190. if st != "200 OK" || res != `[
  191. "Pong"
  192. ]` {
  193. t.Error("Unexpected response:", st, res)
  194. return
  195. }
  196. manager.MemberErrors = make(map[string]error)
  197. manager.MemberErrors[cluster2[1].Name()] = errors.New("testerror")
  198. sendTestRequest(queryURL+"eject", "PUT", jsonString)
  199. st, _, res = sendTestRequest(queryURL+"ping", "PUT", jsonString)
  200. if st != "403 Forbidden" || res != "Ping returned an error: ClusterError: Member error (testerror)" {
  201. t.Error("Unexpected response:", st, res)
  202. return
  203. }
  204. manager.MemberErrors = nil
  205. st, _, res = sendTestRequest(queryURL, "PUT", nil)
  206. if st != "400 Bad Request" || res != "Need a command either: join or eject" {
  207. t.Error("Unexpected response:", st, res)
  208. return
  209. }
  210. jsonString, err = json.Marshal(map[string]interface{}{
  211. "name": "bla",
  212. })
  213. if err != nil {
  214. t.Error(err)
  215. return
  216. }
  217. st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
  218. if st != "400 Bad Request" || res != "Required argument netaddr missing in body arguments" {
  219. t.Error("Unexpected response:", st, res)
  220. return
  221. }
  222. jsonString, err = json.Marshal(map[string]interface{}{
  223. "name": "bla",
  224. "netaddr": cluster2[1].MemberManager.NetAddr(),
  225. })
  226. if err != nil {
  227. t.Error(err)
  228. return
  229. }
  230. st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
  231. if st != "403 Forbidden" || res != "Could not join the cluster: ClusterError: Member error (Unknown target member)" {
  232. t.Error("Unexpected response:", st, res)
  233. return
  234. }
  235. jsonString, err = json.Marshal(map[string]interface{}{
  236. "name": cluster2[1].MemberManager.Name(),
  237. "netaddr": cluster2[1].MemberManager.NetAddr(),
  238. })
  239. if err != nil {
  240. t.Error(err)
  241. return
  242. }
  243. st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
  244. if st != "200 OK" || res != "" {
  245. t.Error("Unexpected response:", st, res)
  246. return
  247. }
  248. // Check state info
  249. if err := checkStateInfo(cluster2[1].MemberManager, `
  250. {
  251. "failed": null,
  252. "members": [
  253. "TestClusterMember-1",
  254. "localhost:9021",
  255. "TestClusterMember-0",
  256. "localhost:9020"
  257. ],
  258. "replication": 1,
  259. "ts": [
  260. "TestClusterMember-1",
  261. "2"
  262. ],
  263. "tsold": [
  264. "TestClusterMember-1",
  265. "1"
  266. ]
  267. }
  268. `[1:]); err != nil {
  269. t.Error(err)
  270. return
  271. }
  272. // Set some member info and read it back
  273. cluster2[1].MemberManager.MemberInfo()["test123"] = "123"
  274. st, _, res = sendTestRequest(queryURL+"memberinfos", "GET", nil)
  275. if st != "200 OK" || res != `
  276. {
  277. "TestClusterMember-0": {},
  278. "TestClusterMember-1": {
  279. "test123": "123"
  280. }
  281. }`[1:] {
  282. t.Error("Unexpected response:", st, res)
  283. return
  284. }
  285. // Eject member from cluster
  286. jsonString, err = json.Marshal(map[string]interface{}{
  287. "name": cluster2[0].MemberManager.Name(),
  288. })
  289. if err != nil {
  290. t.Error(err)
  291. return
  292. }
  293. manager.MemberErrors = make(map[string]error)
  294. manager.MemberErrors[cluster2[1].Name()] = errors.New("testerror")
  295. st, _, res = sendTestRequest(queryURL+"eject", "PUT", jsonString)
  296. if st != "403 Forbidden" || res != "Could not eject TestClusterMember-0 from cluster: ClusterError: Member error (testerror)" {
  297. t.Error("Unexpected response:", st, res)
  298. return
  299. }
  300. manager.MemberErrors = nil
  301. jsonString, err = json.Marshal(map[string]interface{}{
  302. "name": cluster2[0].MemberManager.Name(),
  303. })
  304. if err != nil {
  305. t.Error(err)
  306. return
  307. }
  308. st, _, res = sendTestRequest(queryURL+"eject", "PUT", jsonString)
  309. if st != "200 OK" || res != "" {
  310. t.Error("Unexpected response:", st, res)
  311. return
  312. }
  313. if err := checkStateInfo(cluster2[1].MemberManager, `
  314. {
  315. "failed": null,
  316. "members": [
  317. "TestClusterMember-1",
  318. "localhost:9021"
  319. ],
  320. "replication": 1,
  321. "ts": [
  322. "TestClusterMember-1",
  323. "3"
  324. ],
  325. "tsold": [
  326. "TestClusterMember-1",
  327. "2"
  328. ]
  329. }
  330. `[1:]); err != nil {
  331. t.Error(err)
  332. return
  333. }
  334. // Some error cases
  335. st, _, res = sendTestRequest(queryURL+"bla", "PUT", jsonString[2:])
  336. if st != "400 Bad Request" || !strings.HasPrefix(res, "Could not decode arguments") {
  337. t.Error("Unexpected response:", st, res)
  338. return
  339. }
  340. st, _, res = sendTestRequest(queryURL+"bla", "PUT", jsonString)
  341. if st != "400 Bad Request" || res != "Unknown command: bla" {
  342. t.Error("Unexpected response:", st, res)
  343. return
  344. }
  345. }
  346. func TestClusterQueryBigCluster(t *testing.T) {
  347. queryURL := "http://localhost" + TESTPORT + EndpointClusterQuery
  348. // Create a big cluster
  349. cluster3 := createCluster(3)
  350. for _, dd := range cluster3 {
  351. dd.Start()
  352. defer dd.Close()
  353. }
  354. oldGM := api.GM
  355. oldGS := api.GS
  356. api.GS = cluster3[0]
  357. api.GM = graph.NewGraphManager(cluster3[0])
  358. api.DD = cluster3[0]
  359. api.DDLog = datautil.NewRingBuffer(10)
  360. defer func() {
  361. api.GM = oldGM
  362. api.GS = oldGS
  363. api.DD = nil
  364. api.DDLog = nil
  365. }()
  366. // We should now get back a state
  367. st, _, res := sendTestRequest(queryURL, "GET", nil)
  368. if st != "200 OK" || res != `
  369. {
  370. "failed": null,
  371. "members": [
  372. "TestClusterMember-0",
  373. "localhost:9020"
  374. ],
  375. "replication": 1,
  376. "ts": [
  377. "TestClusterMember-0",
  378. "1"
  379. ],
  380. "tsold": [
  381. "",
  382. "0"
  383. ]
  384. }`[1:] {
  385. t.Error("Unexpected response:", st, res)
  386. return
  387. }
  388. // Join more cluster members
  389. api.DD = cluster3[1]
  390. api.DDLog = datautil.NewRingBuffer(10)
  391. jsonString, err := json.Marshal(map[string]interface{}{
  392. "name": cluster3[0].MemberManager.Name(),
  393. "netaddr": cluster3[0].MemberManager.NetAddr(),
  394. })
  395. if err != nil {
  396. t.Error(err)
  397. return
  398. }
  399. st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
  400. if st != "200 OK" || res != "" {
  401. t.Error("Unexpected response:", st, res)
  402. return
  403. }
  404. st, _, res = sendTestRequest(queryURL, "GET", nil)
  405. if st != "200 OK" || res != `
  406. {
  407. "failed": null,
  408. "members": [
  409. "TestClusterMember-1",
  410. "localhost:9021",
  411. "TestClusterMember-0",
  412. "localhost:9020"
  413. ],
  414. "replication": 1,
  415. "ts": [
  416. "TestClusterMember-0",
  417. "2"
  418. ],
  419. "tsold": [
  420. "TestClusterMember-0",
  421. "1"
  422. ]
  423. }`[1:] {
  424. t.Error("Unexpected response:", st, res)
  425. return
  426. }
  427. api.DD = cluster3[2]
  428. api.DDLog = datautil.NewRingBuffer(10)
  429. jsonString, err = json.Marshal(map[string]interface{}{
  430. "name": cluster3[0].MemberManager.Name(),
  431. "netaddr": cluster3[0].MemberManager.NetAddr(),
  432. })
  433. if err != nil {
  434. t.Error(err)
  435. return
  436. }
  437. st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
  438. if st != "200 OK" || res != "" {
  439. t.Error("Unexpected response:", st, res)
  440. return
  441. }
  442. st, _, res = sendTestRequest(queryURL, "GET", nil)
  443. if st != "200 OK" || res != `
  444. {
  445. "failed": null,
  446. "members": [
  447. "TestClusterMember-2",
  448. "localhost:9022",
  449. "TestClusterMember-0",
  450. "localhost:9020",
  451. "TestClusterMember-1",
  452. "localhost:9021"
  453. ],
  454. "replication": 1,
  455. "ts": [
  456. "TestClusterMember-0",
  457. "3"
  458. ],
  459. "tsold": [
  460. "TestClusterMember-0",
  461. "2"
  462. ]
  463. }`[1:] {
  464. t.Error("Unexpected response:", st, res)
  465. return
  466. }
  467. }
  468. /*
  469. Create a cluster with n members (all storage is in memory)
  470. */
  471. func createCluster(n int) []*cluster.DistributedStorage {
  472. // By default no log output
  473. log.SetOutput(ioutil.Discard)
  474. var mgs []*graphstorage.MemoryGraphStorage
  475. var cs []*cluster.DistributedStorage
  476. cluster.ClearMSMap()
  477. for i := 0; i < n; i++ {
  478. mgs = append(mgs, graphstorage.NewMemoryGraphStorage(fmt.Sprintf("mgs%v", i+1)).(*graphstorage.MemoryGraphStorage))
  479. }
  480. for i := 0; i < n; i++ {
  481. ds, _ := cluster.NewDistributedStorage(mgs[i], map[string]interface{}{
  482. manager.ConfigRPC: fmt.Sprintf("localhost:%v", 9020+i),
  483. manager.ConfigMemberName: fmt.Sprintf("TestClusterMember-%v", i),
  484. manager.ConfigClusterSecret: "test123",
  485. }, manager.NewMemStateInfo())
  486. cs = append(cs, ds)
  487. }
  488. return cs
  489. }
  490. /*
  491. joinCluster joins up a given cluster.
  492. */
  493. func joinCluster(cluster []*cluster.DistributedStorage, t *testing.T) {
  494. for i, dd := range cluster {
  495. dd.Start()
  496. defer dd.Close()
  497. if i > 0 {
  498. err := dd.MemberManager.JoinCluster(cluster[0].MemberManager.Name(),
  499. cluster[0].MemberManager.NetAddr())
  500. if err != nil {
  501. t.Error(err)
  502. return
  503. }
  504. }
  505. }
  506. }
  507. func checkStateInfo(mm *manager.MemberManager, expectedStateInfo string) error {
  508. var w bytes.Buffer
  509. ret := json.NewEncoder(&w)
  510. ret.Encode(mm.StateInfo().Map())
  511. out := bytes.Buffer{}
  512. err := json.Indent(&out, w.Bytes(), "", " ")
  513. if err != nil {
  514. return err
  515. }
  516. if out.String() != expectedStateInfo {
  517. return fmt.Errorf("Unexpected state info: %v\nexpected: %v",
  518. out.String(), expectedStateInfo)
  519. }
  520. return nil
  521. }