server.go 9.8 KB


  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package manager
  11. import (
  12. "crypto/sha512"
  13. "fmt"
  14. "net/rpc"
  15. "devt.de/krotik/common/errorutil"
  16. )
  17. func init() {
  18. // Create singleton Server instance.
  19. server = &Server{make(map[string]*MemberManager)}
  20. // Register the cluster API as RPC server
  21. errorutil.AssertOk(rpc.Register(server))
  22. }
  23. /*
  24. RPCFunction is used to identify the called function in a RPC call
  25. */
  26. type RPCFunction string
  27. /*
  28. List of all possible RPC functions. The list includes all RPC callable functions
  29. in this file.
  30. */
  31. const (
  32. // General functions
  33. RPCPing RPCFunction = "Ping"
  34. RPCSIRequest = "StateInfoRequest"
  35. RPCMIRequest = "MemberInfoRequest"
  36. // Cluster-wide locking
  37. RPCAcquireLock = "AcquireLock"
  38. RPCReleaseLock = "ReleaseLock"
  39. // Cluster member management
  40. RPCJoinCluster = "JoinCluster"
  41. RPCAddMember = "AddMember"
  42. RPCEjectMember = "EjectMember"
  43. // StateInfo functions
  44. RPCUpdateStateInfo = "UpdateStateInfo"
  45. // Data request functions
  46. RPCDataRequest = "DataRequest"
  47. )
  48. /*
  49. RequestArgument is used to identify arguments in a RPC call
  50. */
  51. type RequestArgument int
  52. /*
  53. List of all possible arguments in a RPC request. There are usually no checks which
  54. give back an error if a required argument is missing. The RPC API is an internal
  55. API and might change without backwards compatibility.
  56. */
  57. const (
  58. // General arguments
  59. RequestTARGET RequestArgument = iota // Required argument which identifies the target cluster memeber
  60. RequestTOKEN // Client token which is used for authorization checks
  61. RequestLOCK // Lock name which a member requests to take
  62. RequestMEMBERNAME // Name for a member
  63. RequestMEMBERRPC // Rpc address and port for a member
  64. RequestSTATEINFOMAP // StateInfo object as a map
  65. RequestDATA // Data request object
  66. )
  67. /*
  68. server is the Server instance which serves rpc calls
  69. */
  70. var server *Server
  71. /*
  72. Server is the RPC exposed cluster API of a cluster member. Server
  73. is a singleton and will route incoming (authenticated) requests to registered
  74. MemberManagers. The calling member is referred to as source member and the called
  75. member is referred to as target member.
  76. */
  77. type Server struct {
  78. managers map[string]*MemberManager // Map of local cluster members
  79. }
  80. // General functions
  81. // =================
  82. /*
  83. Ping answers with a Pong if the given client token was verified and the local
  84. cluster member exists.
  85. */
  86. func (ms *Server) Ping(request map[RequestArgument]interface{},
  87. response *interface{}) error {
  88. // Verify the given token and retrieve the target member
  89. manager, err := ms.checkToken(request, false)
  90. if err != nil {
  91. return err
  92. }
  93. // Send a simple response
  94. res := []string{"Pong"}
  95. // Check if request is from a cluster member - only reveal timestamps
  96. // to members
  97. token := request[RequestTOKEN].(*MemberToken)
  98. if _, ok := manager.Client.peers[token.MemberName]; ok {
  99. ts, _ := manager.stateInfo.Get(StateInfoTS)
  100. res = append(res, ts.([]string)...)
  101. tsold, _ := manager.stateInfo.Get(StateInfoTSOLD)
  102. res = append(res, tsold.([]string)...)
  103. }
  104. *response = res
  105. return nil
  106. }
  107. /*
  108. StateInfoRequest answers with the member's state info.
  109. */
  110. func (ms *Server) StateInfoRequest(request map[RequestArgument]interface{},
  111. response *interface{}) error {
  112. // Verify the given token and retrieve the target member
  113. manager, err := ms.checkToken(request, false)
  114. if err != nil {
  115. return err
  116. }
  117. *response = mapToBytes(manager.stateInfo.Map())
  118. return nil
  119. }
  120. /*
  121. MemberInfoRequest answers with the member's static info.
  122. */
  123. func (ms *Server) MemberInfoRequest(request map[RequestArgument]interface{},
  124. response *interface{}) error {
  125. // Verify the given token and retrieve the target member
  126. manager, err := ms.checkToken(request, false)
  127. if err != nil {
  128. return err
  129. }
  130. *response = mapToBytes(manager.memberInfo)
  131. return nil
  132. }
  133. // Cluster membership functions
  134. // ============================
  135. /*
  136. JoinCluster is used by a new member if it wants to join the cluster.
  137. */
  138. func (ms *Server) JoinCluster(request map[RequestArgument]interface{},
  139. response *interface{}) error {
  140. // Verify the given token and retrieve the target member
  141. manager, err := ms.checkToken(request, false)
  142. if err != nil {
  143. return err
  144. }
  145. newMemberName := request[RequestMEMBERNAME].(string)
  146. newMemberRPC := request[RequestMEMBERRPC].(string)
  147. err = manager.JoinNewMember(newMemberName, newMemberRPC)
  148. if err == nil {
  149. // Return updated state info if there was no error
  150. *response = mapToBytes(manager.stateInfo.Map())
  151. }
  152. return err
  153. }
  154. /*
  155. AddMember adds a new member on the target member.
  156. */
  157. func (ms *Server) AddMember(request map[RequestArgument]interface{},
  158. response *interface{}) error {
  159. // Verify the given token and retrieve the target member
  160. manager, err := ms.checkToken(request, true)
  161. if err != nil {
  162. return err
  163. }
  164. // Acquire lock to modify client map
  165. newMemberName := request[RequestMEMBERNAME].(string)
  166. newMemberRPC := request[RequestMEMBERRPC].(string)
  167. newStateInfo := bytesToMap(request[RequestSTATEINFOMAP].([]byte))
  168. return manager.addMember(newMemberName, newMemberRPC, newStateInfo)
  169. }
  170. /*
  171. EjectMember can be called by a cluster member to eject itself or another cluster member.
  172. */
  173. func (ms *Server) EjectMember(request map[RequestArgument]interface{},
  174. response *interface{}) error {
  175. // Verify the given token and retrieve the target member
  176. manager, err := ms.checkToken(request, true)
  177. if err != nil {
  178. return err
  179. }
  180. memberToEject := request[RequestMEMBERNAME].(string)
  181. return manager.EjectMember(memberToEject)
  182. }
  183. // Cluster-wide locking
  184. // ====================
  185. /*
  186. AcquireLock tries to acquire a named lock for the source member on the
  187. target member. It fails if the lock is alread acquired by a different member.
  188. The lock can only be held for a limited amount of time.
  189. */
  190. func (ms *Server) AcquireLock(request map[RequestArgument]interface{},
  191. response *interface{}) error {
  192. // Verify the given token and retrieve the target member
  193. manager, err := ms.checkToken(request, true)
  194. if err != nil {
  195. return err
  196. }
  197. // Acquire lock to modify lock map
  198. manager.Client.maplock.Lock()
  199. manager.Client.maplock.Unlock()
  200. requestedLock := request[RequestLOCK].(string)
  201. sourceMember := request[RequestTOKEN].(*MemberToken).MemberName
  202. // Get the lock owner
  203. lockOwner, ok := manager.Client.clusterLocks.Get(requestedLock)
  204. if ok && lockOwner != sourceMember {
  205. // If there is already an owner return an error which mentions the owner
  206. return &Error{ErrLockTaken, lockOwner.(string)}
  207. }
  208. // If there is no owner set the source client as the new owner
  209. manager.Client.clusterLocks.Put(requestedLock, sourceMember)
  210. *response = sourceMember
  211. return nil
  212. }
  213. /*
  214. ReleaseLock releases a lock. Only the member which holds the lock can release it.
  215. */
  216. func (ms *Server) ReleaseLock(request map[RequestArgument]interface{},
  217. response *interface{}) error {
  218. // Verify the given token and retrieve the target member
  219. manager, err := ms.checkToken(request, true)
  220. if err != nil {
  221. return err
  222. }
  223. // Acquire lock to modify lock map
  224. manager.Client.maplock.Lock()
  225. defer manager.Client.maplock.Unlock()
  226. requestedLock := request[RequestLOCK].(string)
  227. sourceMember := request[RequestTOKEN].(*MemberToken).MemberName
  228. // Get the lock owner
  229. lockOwner, ok := manager.Client.clusterLocks.Get(requestedLock)
  230. if ok {
  231. if lockOwner == sourceMember {
  232. // Release lock
  233. manager.Client.clusterLocks.Remove(requestedLock)
  234. } else {
  235. // Lock is owned by someone else
  236. return &Error{ErrLockNotOwned, fmt.Sprintf("Owned by %v not by %v",
  237. lockOwner, sourceMember)}
  238. }
  239. }
  240. // Operation on a non-existing lock is a NOP
  241. return nil
  242. }
  243. // StateInfo functions
  244. // ===================
  245. /*
  246. UpdateStateInfo updates the state info of the target member.
  247. */
  248. func (ms *Server) UpdateStateInfo(request map[RequestArgument]interface{},
  249. response *interface{}) error {
  250. // Verify the given token and retrieve the target member
  251. manager, err := ms.checkToken(request, true)
  252. if err != nil {
  253. return err
  254. }
  255. newStateInfo := bytesToMap(request[RequestSTATEINFOMAP].([]byte))
  256. return manager.applyStateInfo(newStateInfo)
  257. }
  258. // Data request functions
  259. // ======================
  260. /*
  261. DataRequest handles a data request.
  262. */
  263. func (ms *Server) DataRequest(request map[RequestArgument]interface{},
  264. response *interface{}) error {
  265. // Verify the given token and retrieve the target member
  266. manager, err := ms.checkToken(request, true)
  267. if err != nil {
  268. return err
  269. }
  270. // Handle the data request
  271. reqdata := request[RequestDATA]
  272. return manager.handleDataRequest(reqdata, response)
  273. }
  274. // Helper functions
  275. // ================
  276. /*
  277. checkToken checks the member token in a given request.
  278. */
  279. func (ms *Server) checkToken(request map[RequestArgument]interface{},
  280. checkClusterMembership bool) (*MemberManager, error) {
  281. // Get the target member
  282. target := request[RequestTARGET].(string)
  283. token := request[RequestTOKEN].(*MemberToken)
  284. if manager, ok := ms.managers[target]; ok {
  285. // Generate expected auth from given requesting member name in token and secret of target
  286. expectedAuth := fmt.Sprintf("%X", sha512.Sum512_224([]byte(token.MemberName+manager.secret)))
  287. if token.MemberAuth == expectedAuth {
  288. if checkClusterMembership {
  289. // Check if the requesting client is actually a member of the cluster
  290. manager.Client.maplock.Lock()
  291. _, ok := manager.Client.peers[token.MemberName]
  292. manager.Client.maplock.Unlock()
  293. if !ok {
  294. return nil, ErrNotMember
  295. }
  296. }
  297. return manager, nil
  298. }
  299. return nil, ErrInvalidToken
  300. }
  301. return nil, ErrUnknownTarget
  302. }