123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package manager
- import (
- "encoding/gob"
- "fmt"
- "net"
- "net/rpc"
- "sort"
- "strings"
- "sync"
- "time"
- "devt.de/krotik/common/datautil"
- )
- func init() {
- // Make sure we can use the relevant types in a gob operation
- gob.Register(&MemberToken{})
- }
- /*
- Known cluster locks
- */
- const (
- ClusterLockUpdateStateInfo = "ClusterLockUpdateStateInfo"
- )
- /*
- DialTimeout is the dial timeout for RPC connections
- */
- var DialTimeout = 10 * time.Second
- /*
- MemberToken is used to authenticate a member in the cluster
- */
- type MemberToken struct {
- MemberName string
- MemberAuth string
- }
- /*
- Client is the client for the RPC cluster API of a cluster member.
- */
- type Client struct {
- token *MemberToken // Token to be send to other members for authentication
- rpc string // This client's rpc network interface (may be empty in case of pure clients)
- peers map[string]string // Map of member names to their rpc network interface
- conns map[string]*rpc.Client // Map of member names to network connections
- failed map[string]string // Map of (temporary) failed members
- maplock *sync.RWMutex // Lock for maps
- clusterLocks *datautil.MapCache // Cluster locks and which member holds them
- }
- /*
- MemberErrors map for simulated member errors (only used for testing)
- */
- var MemberErrors map[string]error
- /*
- MemberErrorExceptions map to exclude members from simulated member errors (only used for testing)
- */
- var MemberErrorExceptions map[string][]string
- // General cluster client API
- // ==========================
- /*
- IsFailed checks if the given member is in the failed state.
- */
- func (mc *Client) IsFailed(name string) bool {
- mc.maplock.Lock()
- defer mc.maplock.Unlock()
- _, ok := mc.failed[name]
- return ok
- }
- /*
- FailedTotal returns the total number of failed members.
- */
- func (mc *Client) FailedTotal() int {
- mc.maplock.Lock()
- defer mc.maplock.Unlock()
- return len(mc.failed)
- }
- /*
- FailedPeers returns a list of failed members.
- */
- func (mc *Client) FailedPeers() []string {
- var ret []string
- mc.maplock.Lock()
- defer mc.maplock.Unlock()
- for p := range mc.failed {
- ret = append(ret, p)
- }
- sort.Strings(ret)
- return ret
- }
- /*
- FailedPeerErrors returns the same list as FailedPeers but with error messages.
- */
- func (mc *Client) FailedPeerErrors() []string {
- var ret []string
- for _, p := range mc.FailedPeers() {
- e := mc.failed[p]
- ret = append(ret, fmt.Sprintf("%v (%v)", p, e))
- }
- return ret
- }
- /*
- OperationalPeers returns all operational peers and an error if too many cluster members
- have failed.
- */
- func (mc *Client) OperationalPeers() ([]string, error) {
- var err error
- var peers []string
- mc.maplock.Lock()
- defer mc.maplock.Unlock()
- for peer := range mc.peers {
- if _, ok := mc.failed[peer]; !ok {
- peers = append(peers, peer)
- }
- }
- if len(mc.peers) > 0 && len(peers) == 0 {
- err = &Error{ErrClusterState, fmt.Sprintf("No peer cluster member is reachable")}
- } else {
- sort.Strings(peers)
- }
- return peers, err
- }
- /*
- SendRequest sends a request to another cluster member. Not reachable members
- get an entry in the failed map and the error return is ErrMemberComm. All
- other error returns should be considered serious errors.
- */
- func (mc *Client) SendRequest(member string, remoteCall RPCFunction,
- args map[RequestArgument]interface{}) (interface{}, error) {
- var err error
- // Function to categorize errors
- handleError := func(err error) error {
- if _, ok := err.(net.Error); ok {
- // We got a network error and the communication with a member
- // is interrupted - add the member to the failing members list
- mc.maplock.Lock()
- // Set failure state
- mc.failed[member] = err.Error()
- // Remove the connection
- delete(mc.conns, member)
- mc.maplock.Unlock()
- return &Error{ErrMemberComm, err.Error()}
- }
- // Do not wrap a cluster network error in another cluster network error
- if strings.HasPrefix(err.Error(), "ClusterError: "+ErrMemberError.Error()) {
- return err
- }
- return &Error{ErrMemberError, err.Error()}
- }
- mc.maplock.Lock()
- laddr, ok := mc.peers[member]
- mc.maplock.Unlock()
- if ok {
- // Get network connection to the member
- mc.maplock.Lock()
- conn, ok := mc.conns[member]
- mc.maplock.Unlock()
- if !ok {
- c, err := net.DialTimeout("tcp", laddr, DialTimeout)
- if err != nil {
- LogDebug(mc.token.MemberName, ": ",
- fmt.Sprintf("- %v.%v (laddr=%v err=%v)", member, remoteCall, laddr, err))
- return nil, handleError(err)
- }
- conn = rpc.NewClient(c)
- mc.maplock.Lock()
- mc.conns[member] = conn
- mc.maplock.Unlock()
- }
- // Assemble the request
- request := map[RequestArgument]interface{}{
- RequestTARGET: member,
- RequestTOKEN: mc.token,
- }
- if args != nil {
- for k, v := range args {
- request[k] = v
- }
- }
- var response interface{}
- LogDebug(mc.token.MemberName, ": ",
- fmt.Sprintf("> %v.%v (laddr=%v)", member, remoteCall, laddr))
- if err, _ = MemberErrors[member]; err == nil || isErrorExcepted(mc.token.MemberName, member) {
- err = conn.Call("Server."+string(remoteCall), request, &response)
- }
- LogDebug(mc.token.MemberName, ": ",
- fmt.Sprintf("< %v.%v (err=%v)", member, remoteCall, err))
- if err != nil {
- return nil, handleError(err)
- }
- return response, nil
- }
- return nil, &Error{ErrUnknownPeer, member}
- }
- /*
- SendPing sends a ping to a member and returns the result. Second argument is
- optional if the target member is not a known peer. Should be an empty string
- in all other cases.
- */
- func (mc *Client) SendPing(member string, rpc string) ([]string, error) {
- if _, ok := mc.peers[member]; rpc != "" && !ok {
- // Add member temporary
- mc.peers[member] = rpc
- defer func() {
- mc.maplock.Lock()
- delete(mc.peers, member)
- delete(mc.conns, member)
- delete(mc.failed, member)
- mc.maplock.Unlock()
- }()
- }
- res, err := mc.SendRequest(member, RPCPing, nil)
- if res != nil {
- return res.([]string), err
- }
- return nil, err
- }
- // Cluster membership functions
- // ============================
- /*
- SendJoinCluster sends a request to a cluster member to join the caller to the cluster.
- Pure clients cannot use this function as this call requires the Client.rpc field to be set.
- */
- func (mc *Client) SendJoinCluster(targetMember string, targetMemberRPC string) (map[string]interface{}, error) {
- // Check we are on a cluster member - pure clients will fail here
- if mc.rpc == "" {
- return nil, &Error{ErrClusterConfig, "Cannot add member without RPC interface"}
- }
- // Ensure the new member is in the peers map
- mc.maplock.Lock()
- mc.peers[targetMember] = targetMemberRPC
- mc.maplock.Unlock()
- // Join the cluster
- res, err := mc.SendRequest(targetMember, RPCJoinCluster, map[RequestArgument]interface{}{
- RequestMEMBERNAME: mc.token.MemberName,
- RequestMEMBERRPC: mc.rpc,
- })
- if res != nil && err == nil {
- return bytesToMap(res.([]byte)), err
- }
- mc.maplock.Lock()
- delete(mc.peers, targetMember)
- delete(mc.conns, targetMember)
- delete(mc.failed, targetMember)
- mc.maplock.Unlock()
- return nil, err
- }
- /*
- SendEjectMember sends a request to eject a member from the cluster.
- */
- func (mc *Client) SendEjectMember(member string, memberToEject string) error {
- _, err := mc.SendRequest(member, RPCEjectMember, map[RequestArgument]interface{}{
- RequestMEMBERNAME: memberToEject,
- })
- return err
- }
- // Cluster-wide locking
- // ====================
- /*
- SendAcquireClusterLock tries to acquire a named lock on all members of the cluster.
- It fails if the lock is alread acquired or if not enough cluster members can be
- reached.
- */
- func (mc *Client) SendAcquireClusterLock(lockName string) error {
- // Get operational peers (operational cluster is NOT required - up to the calling
- // function to decide if the cluster should be operational)
- peers, _ := mc.OperationalPeers()
- // Try to acquire the lock on all members
- var takenLocks []string
- for _, peer := range peers {
- _, err := mc.SendRequest(peer,
- RPCAcquireLock, map[RequestArgument]interface{}{
- RequestLOCK: lockName,
- })
- if err != nil && err.(*Error).Type == ErrMemberComm {
- // If we can't communicate with a member just continue and
- // don't take the lock - the member is now in the failed list
- // and subsequent calls to operational peers should determine
- // if the cluster is functional or not
- continue
- } else if err != nil {
- // If there was a serious error try to release all taken locks
- for _, lockPeer := range takenLocks {
- mc.SendRequest(lockPeer,
- RPCReleaseLock, map[RequestArgument]interface{}{
- RequestLOCK: lockName,
- })
- }
- return err
- } else {
- takenLocks = append(takenLocks, peer)
- }
- }
- // Now take the lock on this member
- mc.maplock.Lock()
- mc.clusterLocks.Put(lockName, mc.token.MemberName)
- mc.maplock.Unlock()
- return nil
- }
- /*
- SendReleaseClusterLock tries to release a named lock on all members of the cluster.
- It is not an error if a lock is not takfen (or has expired) on this member or any other
- target memeber.
- */
- func (mc *Client) SendReleaseClusterLock(lockName string) error {
- // Get operational peers (operational cluster is NOT required - up to the calling
- // function to decide if the cluster should be operational)
- peers, _ := mc.OperationalPeers()
- // Try to acquire the lock on all members
- for _, peer := range peers {
- _, err := mc.SendRequest(peer,
- RPCReleaseLock, map[RequestArgument]interface{}{
- RequestLOCK: lockName,
- })
- if err != nil && err.(*Error).Type != ErrMemberComm {
- return err
- }
- }
- // Now release the lock on this member
- mc.maplock.Lock()
- mc.clusterLocks.Remove(lockName)
- mc.maplock.Unlock()
- return nil
- }
- // StateInfo functions
- // ===================
- /*
- SendStateInfoRequest requests the state info of a member and returns it.
- */
- func (mc *Client) SendStateInfoRequest(member string) (map[string]interface{}, error) {
- res, err := mc.SendRequest(member, RPCSIRequest, nil)
- if res != nil {
- return bytesToMap(res.([]byte)), err
- }
- return nil, err
- }
- // Data request functions
- // ======================
- /*
- SendDataRequest sends a data request to a member and returns its response.
- */
- func (mc *Client) SendDataRequest(member string, reqdata interface{}) (interface{}, error) {
- return mc.SendRequest(member, RPCDataRequest, map[RequestArgument]interface{}{
- RequestDATA: reqdata,
- })
- }
- // Static member info functions
- // ============================
- /*
- SendMemberInfoRequest requests the static member info of a member and returns it.
- */
- func (mc *Client) SendMemberInfoRequest(member string) (map[string]interface{}, error) {
- res, err := mc.SendRequest(member, RPCMIRequest, nil)
- if res != nil {
- return bytesToMap(res.([]byte)), err
- }
- return nil, err
- }
- // Helper functions
- // ================
- /*
- Check if a given route should be excepted from errors (only used for testing)
- */
- func isErrorExcepted(source string, target string) bool {
- if exceptions, ok := MemberErrorExceptions[source]; ok {
- for _, exception := range exceptions {
- if exception == target {
- return true
- }
- }
- }
- return false
- }
|