cluster_test.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package v1
  11. import (
  12. "bytes"
  13. "encoding/json"
  14. "errors"
  15. "fmt"
  16. "io/ioutil"
  17. "log"
  18. "os"
  19. "strings"
  20. "testing"
  21. "devt.de/krotik/common/datautil"
  22. "devt.de/krotik/eliasdb/api"
  23. "devt.de/krotik/eliasdb/cluster"
  24. "devt.de/krotik/eliasdb/cluster/manager"
  25. "devt.de/krotik/eliasdb/graph/graphstorage"
  26. )
  27. func TestClusterQuery(t *testing.T) {
  28. queryURL := "http://localhost" + TESTPORT + EndpointClusterQuery
  29. st, _, res := sendTestRequest(queryURL, "GET", nil)
  30. // We should get a failure back if clustering is not available
  31. if st != "503 Service Unavailable" || res != "Clustering is not enabled on this instance" {
  32. t.Error("Unexpected response:", st, res)
  33. return
  34. }
  35. st, _, res = sendTestRequest(queryURL, "DELETE", nil)
  36. // We should get a failure back if clustering is not available
  37. if st != "503 Service Unavailable" || res != "Clustering is not enabled on this instance" {
  38. t.Error("Unexpected response:", st, res)
  39. return
  40. }
  41. // Create now a small cluster
  42. cluster2 := createCluster(2)
  43. api.DD = cluster2[0]
  44. api.DDLog = datautil.NewRingBuffer(10)
  45. // We should now get back a state
  46. st, _, res = sendTestRequest(queryURL, "GET", nil)
  47. if st != "200 OK" || res != `
  48. {
  49. "failed": null,
  50. "members": [
  51. "TestClusterMember-0",
  52. "localhost:9020"
  53. ],
  54. "replication": 1,
  55. "ts": [
  56. "TestClusterMember-0",
  57. "1"
  58. ],
  59. "tsold": [
  60. "",
  61. "0"
  62. ]
  63. }`[1:] {
  64. t.Error("Unexpected response:", st, res)
  65. return
  66. }
  67. st, _, res = sendTestRequest(queryURL+"memberinfos", "GET", nil)
  68. if st != "200 OK" || res != `
  69. {
  70. "TestClusterMember-0": {}
  71. }`[1:] {
  72. t.Error("Unexpected response:", st, res)
  73. return
  74. }
  75. api.DDLog.Add("test cluster message1")
  76. api.DDLog.Add("test cluster message2")
  77. st, _, res = sendTestRequest(queryURL+"log", "GET", nil)
  78. if st != "200 OK" || res != `
  79. [
  80. "test cluster message1",
  81. "test cluster message2"
  82. ]`[1:] {
  83. t.Error("Unexpected response:", st, res)
  84. return
  85. }
  86. st, _, _ = sendTestRequest(queryURL+"log", "DELETE", nil)
  87. if st != "200 OK" {
  88. t.Error("Unexpected response:", st)
  89. return
  90. }
  91. st, _, res = sendTestRequest(queryURL+"log", "GET", nil)
  92. if st != "200 OK" || res != `
  93. []`[1:] {
  94. t.Error("Unexpected response:", st, res)
  95. return
  96. }
  97. st, _, _ = sendTestRequest(queryURL+"bla", "DELETE", nil)
  98. if st != "400 Bad Request" {
  99. t.Error("Unexpected response:", st)
  100. return
  101. }
  102. log.SetOutput(ioutil.Discard)
  103. cluster2[0].MemberManager.Start()
  104. cluster2[1].MemberManager.Start()
  105. defer func() {
  106. cluster2[0].MemberManager.Shutdown()
  107. cluster2[1].MemberManager.Shutdown()
  108. log.SetOutput(os.Stdout)
  109. }()
  110. jsonString, err := json.Marshal(map[string]interface{}{
  111. "name": cluster2[1].MemberManager.Name(),
  112. "netaddr": cluster2[1].MemberManager.NetAddr(),
  113. })
  114. if err != nil {
  115. t.Error(err)
  116. return
  117. }
  118. st, _, res = sendTestRequest(queryURL+"ping", "PUT", jsonString)
  119. if st != "200 OK" || res != `[
  120. "Pong"
  121. ]` {
  122. t.Error("Unexpected response:", st, res)
  123. return
  124. }
  125. manager.MemberErrors = make(map[string]error)
  126. manager.MemberErrors[cluster2[1].Name()] = errors.New("testerror")
  127. sendTestRequest(queryURL+"eject", "PUT", jsonString)
  128. st, _, res = sendTestRequest(queryURL+"ping", "PUT", jsonString)
  129. if st != "403 Forbidden" || res != "Ping returned an error: ClusterError: Member error (testerror)" {
  130. t.Error("Unexpected response:", st, res)
  131. return
  132. }
  133. manager.MemberErrors = nil
  134. st, _, res = sendTestRequest(queryURL, "PUT", nil)
  135. if st != "400 Bad Request" || res != "Need a command either: join or eject" {
  136. t.Error("Unexpected response:", st, res)
  137. return
  138. }
  139. jsonString, err = json.Marshal(map[string]interface{}{
  140. "name": "bla",
  141. })
  142. if err != nil {
  143. t.Error(err)
  144. return
  145. }
  146. st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
  147. if st != "400 Bad Request" || res != "Required argument netaddr missing in body arguments" {
  148. t.Error("Unexpected response:", st, res)
  149. return
  150. }
  151. jsonString, err = json.Marshal(map[string]interface{}{
  152. "name": "bla",
  153. "netaddr": cluster2[1].MemberManager.NetAddr(),
  154. })
  155. if err != nil {
  156. t.Error(err)
  157. return
  158. }
  159. st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
  160. if st != "403 Forbidden" || res != "Could not join the cluster: ClusterError: Member error (Unknown target member)" {
  161. t.Error("Unexpected response:", st, res)
  162. return
  163. }
  164. jsonString, err = json.Marshal(map[string]interface{}{
  165. "name": cluster2[1].MemberManager.Name(),
  166. "netaddr": cluster2[1].MemberManager.NetAddr(),
  167. })
  168. if err != nil {
  169. t.Error(err)
  170. return
  171. }
  172. st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
  173. if st != "200 OK" || res != "" {
  174. t.Error("Unexpected response:", st, res)
  175. return
  176. }
  177. // Check state info
  178. if err := checkStateInfo(cluster2[1].MemberManager, `
  179. {
  180. "failed": null,
  181. "members": [
  182. "TestClusterMember-1",
  183. "localhost:9021",
  184. "TestClusterMember-0",
  185. "localhost:9020"
  186. ],
  187. "replication": 1,
  188. "ts": [
  189. "TestClusterMember-1",
  190. "2"
  191. ],
  192. "tsold": [
  193. "TestClusterMember-1",
  194. "1"
  195. ]
  196. }
  197. `[1:]); err != nil {
  198. t.Error(err)
  199. return
  200. }
  201. // Set some member info and read it back
  202. cluster2[1].MemberManager.MemberInfo()["test123"] = "123"
  203. st, _, res = sendTestRequest(queryURL+"memberinfos", "GET", nil)
  204. if st != "200 OK" || res != `
  205. {
  206. "TestClusterMember-0": {},
  207. "TestClusterMember-1": {
  208. "test123": "123"
  209. }
  210. }`[1:] {
  211. t.Error("Unexpected response:", st, res)
  212. return
  213. }
  214. // Eject member from cluster
  215. jsonString, err = json.Marshal(map[string]interface{}{
  216. "name": cluster2[0].MemberManager.Name(),
  217. })
  218. if err != nil {
  219. t.Error(err)
  220. return
  221. }
  222. manager.MemberErrors = make(map[string]error)
  223. manager.MemberErrors[cluster2[1].Name()] = errors.New("testerror")
  224. st, _, res = sendTestRequest(queryURL+"eject", "PUT", jsonString)
  225. if st != "403 Forbidden" || res != "Could not eject TestClusterMember-0 from cluster: ClusterError: Member error (testerror)" {
  226. t.Error("Unexpected response:", st, res)
  227. return
  228. }
  229. manager.MemberErrors = nil
  230. jsonString, err = json.Marshal(map[string]interface{}{
  231. "name": cluster2[0].MemberManager.Name(),
  232. })
  233. if err != nil {
  234. t.Error(err)
  235. return
  236. }
  237. st, _, res = sendTestRequest(queryURL+"eject", "PUT", jsonString)
  238. if st != "200 OK" || res != "" {
  239. t.Error("Unexpected response:", st, res)
  240. return
  241. }
  242. if err := checkStateInfo(cluster2[1].MemberManager, `
  243. {
  244. "failed": null,
  245. "members": [
  246. "TestClusterMember-1",
  247. "localhost:9021"
  248. ],
  249. "replication": 1,
  250. "ts": [
  251. "TestClusterMember-1",
  252. "3"
  253. ],
  254. "tsold": [
  255. "TestClusterMember-1",
  256. "2"
  257. ]
  258. }
  259. `[1:]); err != nil {
  260. t.Error(err)
  261. return
  262. }
  263. // Some error cases
  264. st, _, res = sendTestRequest(queryURL+"bla", "PUT", jsonString[2:])
  265. if st != "400 Bad Request" || !strings.HasPrefix(res, "Could not decode arguments") {
  266. t.Error("Unexpected response:", st, res)
  267. return
  268. }
  269. st, _, res = sendTestRequest(queryURL+"bla", "PUT", jsonString)
  270. if st != "400 Bad Request" || res != "Unknown command: bla" {
  271. t.Error("Unexpected response:", st, res)
  272. return
  273. }
  274. }
  275. /*
  276. Create a cluster with n members (all storage is in memory)
  277. */
  278. func createCluster(n int) []*cluster.DistributedStorage {
  279. var mgs []*graphstorage.MemoryGraphStorage
  280. var cs []*cluster.DistributedStorage
  281. for i := 0; i < n; i++ {
  282. mgs = append(mgs, graphstorage.NewMemoryGraphStorage(fmt.Sprintf("mgs%v", i+1)).(*graphstorage.MemoryGraphStorage))
  283. }
  284. for i := 0; i < n; i++ {
  285. ds, _ := cluster.NewDistributedStorage(mgs[i], map[string]interface{}{
  286. manager.ConfigRPC: fmt.Sprintf("localhost:%v", 9020+i),
  287. manager.ConfigMemberName: fmt.Sprintf("TestClusterMember-%v", i),
  288. manager.ConfigClusterSecret: "test123",
  289. }, manager.NewMemStateInfo())
  290. cs = append(cs, ds)
  291. }
  292. return cs
  293. }
  294. func checkStateInfo(mm *manager.MemberManager, expectedStateInfo string) error {
  295. var w bytes.Buffer
  296. ret := json.NewEncoder(&w)
  297. ret.Encode(mm.StateInfo().Map())
  298. out := bytes.Buffer{}
  299. err := json.Indent(&out, w.Bytes(), "", " ")
  300. if err != nil {
  301. return err
  302. }
  303. if out.String() != expectedStateInfo {
  304. return fmt.Errorf("Unexpected state info: %v\nexpected: %v",
  305. out.String(), expectedStateInfo)
  306. }
  307. return nil
  308. }