client.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package manager
  11. import (
  12. "encoding/gob"
  13. "fmt"
  14. "net"
  15. "net/rpc"
  16. "sort"
  17. "strings"
  18. "sync"
  19. "time"
  20. "devt.de/krotik/common/datautil"
  21. )
  22. func init() {
  23. // Make sure we can use the relevant types in a gob operation
  24. gob.Register(&MemberToken{})
  25. }
  26. /*
  27. Known cluster locks
  28. */
  29. const (
  30. ClusterLockUpdateStateInfo = "ClusterLockUpdateStateInfo"
  31. )
  32. /*
  33. DialTimeout is the dial timeout for RPC connections
  34. */
  35. var DialTimeout = 10 * time.Second
  36. /*
  37. MemberToken is used to authenticate a member in the cluster
  38. */
  39. type MemberToken struct {
  40. MemberName string
  41. MemberAuth string
  42. }
  43. /*
  44. Client is the client for the RPC cluster API of a cluster member.
  45. */
  46. type Client struct {
  47. token *MemberToken // Token to be send to other members for authentication
  48. rpc string // This client's rpc network interface (may be empty in case of pure clients)
  49. peers map[string]string // Map of member names to their rpc network interface
  50. conns map[string]*rpc.Client // Map of member names to network connections
  51. failed map[string]string // Map of (temporary) failed members
  52. maplock *sync.RWMutex // Lock for maps
  53. clusterLocks *datautil.MapCache // Cluster locks and which member holds them
  54. }
  55. /*
  56. MemberErrors map for simulated member errors (only used for testing)
  57. */
  58. var MemberErrors map[string]error
  59. /*
  60. MemberErrorExceptions map to exclude members from simulated member errors (only used for testing)
  61. */
  62. var MemberErrorExceptions map[string][]string
  63. // General cluster client API
  64. // ==========================
  65. /*
  66. IsFailed checks if the given member is in the failed state.
  67. */
  68. func (mc *Client) IsFailed(name string) bool {
  69. mc.maplock.Lock()
  70. defer mc.maplock.Unlock()
  71. _, ok := mc.failed[name]
  72. return ok
  73. }
  74. /*
  75. FailedTotal returns the total number of failed members.
  76. */
  77. func (mc *Client) FailedTotal() int {
  78. mc.maplock.Lock()
  79. defer mc.maplock.Unlock()
  80. return len(mc.failed)
  81. }
  82. /*
  83. FailedPeers returns a list of failed members.
  84. */
  85. func (mc *Client) FailedPeers() []string {
  86. var ret []string
  87. mc.maplock.Lock()
  88. defer mc.maplock.Unlock()
  89. for p := range mc.failed {
  90. ret = append(ret, p)
  91. }
  92. sort.Strings(ret)
  93. return ret
  94. }
  95. /*
  96. FailedPeerErrors returns the same list as FailedPeers but with error messages.
  97. */
  98. func (mc *Client) FailedPeerErrors() []string {
  99. var ret []string
  100. for _, p := range mc.FailedPeers() {
  101. e := mc.failed[p]
  102. ret = append(ret, fmt.Sprintf("%v (%v)", p, e))
  103. }
  104. return ret
  105. }
  106. /*
  107. OperationalPeers returns all operational peers and an error if too many cluster members
  108. have failed.
  109. */
  110. func (mc *Client) OperationalPeers() ([]string, error) {
  111. var err error
  112. var peers []string
  113. mc.maplock.Lock()
  114. defer mc.maplock.Unlock()
  115. for peer := range mc.peers {
  116. if _, ok := mc.failed[peer]; !ok {
  117. peers = append(peers, peer)
  118. }
  119. }
  120. if len(mc.peers) > 0 && len(peers) == 0 {
  121. err = &Error{ErrClusterState, fmt.Sprintf("No peer cluster member is reachable")}
  122. } else {
  123. sort.Strings(peers)
  124. }
  125. return peers, err
  126. }
  127. /*
  128. SendRequest sends a request to another cluster member. Not reachable members
  129. get an entry in the failed map and the error return is ErrMemberComm. All
  130. other error returns should be considered serious errors.
  131. */
  132. func (mc *Client) SendRequest(member string, remoteCall RPCFunction,
  133. args map[RequestArgument]interface{}) (interface{}, error) {
  134. var err error
  135. // Function to categorize errors
  136. handleError := func(err error) error {
  137. if _, ok := err.(net.Error); ok {
  138. // We got a network error and the communication with a member
  139. // is interrupted - add the member to the failing members list
  140. mc.maplock.Lock()
  141. // Set failure state
  142. mc.failed[member] = err.Error()
  143. // Remove the connection
  144. delete(mc.conns, member)
  145. mc.maplock.Unlock()
  146. return &Error{ErrMemberComm, err.Error()}
  147. }
  148. // Do not wrap a cluster network error in another cluster network error
  149. if strings.HasPrefix(err.Error(), "ClusterError: "+ErrMemberError.Error()) {
  150. return err
  151. }
  152. return &Error{ErrMemberError, err.Error()}
  153. }
  154. mc.maplock.Lock()
  155. laddr, ok := mc.peers[member]
  156. mc.maplock.Unlock()
  157. if ok {
  158. // Get network connection to the member
  159. mc.maplock.Lock()
  160. conn, ok := mc.conns[member]
  161. mc.maplock.Unlock()
  162. if !ok {
  163. c, err := net.DialTimeout("tcp", laddr, DialTimeout)
  164. if err != nil {
  165. LogDebug(mc.token.MemberName, ": ",
  166. fmt.Sprintf("- %v.%v (laddr=%v err=%v)", member, remoteCall, laddr, err))
  167. return nil, handleError(err)
  168. }
  169. conn = rpc.NewClient(c)
  170. mc.maplock.Lock()
  171. mc.conns[member] = conn
  172. mc.maplock.Unlock()
  173. }
  174. // Assemble the request
  175. request := map[RequestArgument]interface{}{
  176. RequestTARGET: member,
  177. RequestTOKEN: mc.token,
  178. }
  179. if args != nil {
  180. for k, v := range args {
  181. request[k] = v
  182. }
  183. }
  184. var response interface{}
  185. LogDebug(mc.token.MemberName, ": ",
  186. fmt.Sprintf("> %v.%v (laddr=%v)", member, remoteCall, laddr))
  187. if err, _ = MemberErrors[member]; err == nil || isErrorExcepted(mc.token.MemberName, member) {
  188. err = conn.Call("Server."+string(remoteCall), request, &response)
  189. }
  190. LogDebug(mc.token.MemberName, ": ",
  191. fmt.Sprintf("< %v.%v (err=%v)", member, remoteCall, err))
  192. if err != nil {
  193. return nil, handleError(err)
  194. }
  195. return response, nil
  196. }
  197. return nil, &Error{ErrUnknownPeer, member}
  198. }
  199. /*
  200. SendPing sends a ping to a member and returns the result. Second argument is
  201. optional if the target member is not a known peer. Should be an empty string
  202. in all other cases.
  203. */
  204. func (mc *Client) SendPing(member string, rpc string) ([]string, error) {
  205. if _, ok := mc.peers[member]; rpc != "" && !ok {
  206. // Add member temporary
  207. mc.peers[member] = rpc
  208. defer func() {
  209. mc.maplock.Lock()
  210. delete(mc.peers, member)
  211. delete(mc.conns, member)
  212. delete(mc.failed, member)
  213. mc.maplock.Unlock()
  214. }()
  215. }
  216. res, err := mc.SendRequest(member, RPCPing, nil)
  217. if res != nil {
  218. return res.([]string), err
  219. }
  220. return nil, err
  221. }
  222. // Cluster membership functions
  223. // ============================
  224. /*
  225. SendJoinCluster sends a request to a cluster member to join the caller to the cluster.
  226. Pure clients cannot use this function as this call requires the Client.rpc field to be set.
  227. */
  228. func (mc *Client) SendJoinCluster(targetMember string, targetMemberRPC string) (map[string]interface{}, error) {
  229. // Check we are on a cluster member - pure clients will fail here
  230. if mc.rpc == "" {
  231. return nil, &Error{ErrClusterConfig, "Cannot add member without RPC interface"}
  232. }
  233. // Ensure the new member is in the peers map
  234. mc.maplock.Lock()
  235. mc.peers[targetMember] = targetMemberRPC
  236. mc.maplock.Unlock()
  237. // Join the cluster
  238. res, err := mc.SendRequest(targetMember, RPCJoinCluster, map[RequestArgument]interface{}{
  239. RequestMEMBERNAME: mc.token.MemberName,
  240. RequestMEMBERRPC: mc.rpc,
  241. })
  242. if res != nil && err == nil {
  243. return bytesToMap(res.([]byte)), err
  244. }
  245. mc.maplock.Lock()
  246. delete(mc.peers, targetMember)
  247. delete(mc.conns, targetMember)
  248. delete(mc.failed, targetMember)
  249. mc.maplock.Unlock()
  250. return nil, err
  251. }
  252. /*
  253. SendEjectMember sends a request to eject a member from the cluster.
  254. */
  255. func (mc *Client) SendEjectMember(member string, memberToEject string) error {
  256. _, err := mc.SendRequest(member, RPCEjectMember, map[RequestArgument]interface{}{
  257. RequestMEMBERNAME: memberToEject,
  258. })
  259. return err
  260. }
  261. // Cluster-wide locking
  262. // ====================
  263. /*
  264. SendAcquireClusterLock tries to acquire a named lock on all members of the cluster.
  265. It fails if the lock is alread acquired or if not enough cluster members can be
  266. reached.
  267. */
  268. func (mc *Client) SendAcquireClusterLock(lockName string) error {
  269. // Get operational peers (operational cluster is NOT required - up to the calling
  270. // function to decide if the cluster should be operational)
  271. peers, _ := mc.OperationalPeers()
  272. // Try to acquire the lock on all members
  273. var takenLocks []string
  274. for _, peer := range peers {
  275. _, err := mc.SendRequest(peer,
  276. RPCAcquireLock, map[RequestArgument]interface{}{
  277. RequestLOCK: lockName,
  278. })
  279. if err != nil && err.(*Error).Type == ErrMemberComm {
  280. // If we can't communicate with a member just continue and
  281. // don't take the lock - the member is now in the failed list
  282. // and subsequent calls to operational peers should determine
  283. // if the cluster is functional or not
  284. continue
  285. } else if err != nil {
  286. // If there was a serious error try to release all taken locks
  287. for _, lockPeer := range takenLocks {
  288. mc.SendRequest(lockPeer,
  289. RPCReleaseLock, map[RequestArgument]interface{}{
  290. RequestLOCK: lockName,
  291. })
  292. }
  293. return err
  294. } else {
  295. takenLocks = append(takenLocks, peer)
  296. }
  297. }
  298. // Now take the lock on this member
  299. mc.maplock.Lock()
  300. mc.clusterLocks.Put(lockName, mc.token.MemberName)
  301. mc.maplock.Unlock()
  302. return nil
  303. }
  304. /*
  305. SendReleaseClusterLock tries to release a named lock on all members of the cluster.
  306. It is not an error if a lock is not takfen (or has expired) on this member or any other
  307. target member.
  308. */
  309. func (mc *Client) SendReleaseClusterLock(lockName string) error {
  310. // Get operational peers (operational cluster is NOT required - up to the calling
  311. // function to decide if the cluster should be operational)
  312. peers, _ := mc.OperationalPeers()
  313. // Try to acquire the lock on all members
  314. for _, peer := range peers {
  315. _, err := mc.SendRequest(peer,
  316. RPCReleaseLock, map[RequestArgument]interface{}{
  317. RequestLOCK: lockName,
  318. })
  319. if err != nil && err.(*Error).Type != ErrMemberComm {
  320. return err
  321. }
  322. }
  323. // Now release the lock on this member
  324. mc.maplock.Lock()
  325. mc.clusterLocks.Remove(lockName)
  326. mc.maplock.Unlock()
  327. return nil
  328. }
  329. // StateInfo functions
  330. // ===================
  331. /*
  332. SendStateInfoRequest requests the state info of a member and returns it.
  333. */
  334. func (mc *Client) SendStateInfoRequest(member string) (map[string]interface{}, error) {
  335. res, err := mc.SendRequest(member, RPCSIRequest, nil)
  336. if res != nil {
  337. return bytesToMap(res.([]byte)), err
  338. }
  339. return nil, err
  340. }
  341. // Data request functions
  342. // ======================
  343. /*
  344. SendDataRequest sends a data request to a member and returns its response.
  345. */
  346. func (mc *Client) SendDataRequest(member string, reqdata interface{}) (interface{}, error) {
  347. return mc.SendRequest(member, RPCDataRequest, map[RequestArgument]interface{}{
  348. RequestDATA: reqdata,
  349. })
  350. }
  351. // Static member info functions
  352. // ============================
  353. /*
  354. SendMemberInfoRequest requests the static member info of a member and returns it.
  355. */
  356. func (mc *Client) SendMemberInfoRequest(member string) (map[string]interface{}, error) {
  357. res, err := mc.SendRequest(member, RPCMIRequest, nil)
  358. if res != nil {
  359. return bytesToMap(res.([]byte)), err
  360. }
  361. return nil, err
  362. }
  363. // Helper functions
  364. // ================
  365. /*
  366. Check if a given route should be excepted from errors (only used for testing)
  367. */
  368. func isErrorExcepted(source string, target string) bool {
  369. if exceptions, ok := MemberErrorExceptions[source]; ok {
  370. for _, exception := range exceptions {
  371. if exception == target {
  372. return true
  373. }
  374. }
  375. }
  376. return false
  377. }