123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package manager
- import (
- "crypto/sha512"
- "fmt"
- "net/rpc"
- "devt.de/krotik/common/errorutil"
- )
- func init() {
- // Create singleton Server instance.
- server = &Server{make(map[string]*MemberManager)}
- // Register the cluster API as RPC server
- errorutil.AssertOk(rpc.Register(server))
- }
- /*
- RPCFunction is used to identify the called function in a RPC call
- */
- type RPCFunction string
- /*
- List of all possible RPC functions. The list includes all RPC callable functions
- in this file.
- */
- const (
- // General functions
- RPCPing RPCFunction = "Ping"
- RPCSIRequest = "StateInfoRequest"
- RPCMIRequest = "MemberInfoRequest"
- // Cluster-wide locking
- RPCAcquireLock = "AcquireLock"
- RPCReleaseLock = "ReleaseLock"
- // Cluster member management
- RPCJoinCluster = "JoinCluster"
- RPCAddMember = "AddMember"
- RPCEjectMember = "EjectMember"
- // StateInfo functions
- RPCUpdateStateInfo = "UpdateStateInfo"
- // Data request functions
- RPCDataRequest = "DataRequest"
- )
- /*
- RequestArgument is used to identify arguments in a RPC call
- */
- type RequestArgument int
- /*
- List of all possible arguments in a RPC request. There are usually no checks which
- give back an error if a required argument is missing. The RPC API is an internal
- API and might change without backwards compatibility.
- */
- const (
- // General arguments
- RequestTARGET RequestArgument = iota // Required argument which identifies the target cluster memeber
- RequestTOKEN // Client token which is used for authorization checks
- RequestLOCK // Lock name which a member requests to take
- RequestMEMBERNAME // Name for a member
- RequestMEMBERRPC // Rpc address and port for a member
- RequestSTATEINFOMAP // StateInfo object as a map
- RequestDATA // Data request object
- )
- /*
- server is the Server instance which serves rpc calls
- */
- var server *Server
- /*
- Server is the RPC exposed cluster API of a cluster member. Server
- is a singleton and will route incoming (authenticated) requests to registered
- MemberManagers. The calling member is referred to as source member and the called
- member is referred to as target member.
- */
- type Server struct {
- managers map[string]*MemberManager // Map of local cluster members
- }
- // General functions
- // =================
- /*
- Ping answers with a Pong if the given client token was verified and the local
- cluster member exists.
- */
- func (ms *Server) Ping(request map[RequestArgument]interface{},
- response *interface{}) error {
- // Verify the given token and retrieve the target member
- manager, err := ms.checkToken(request, false)
- if err != nil {
- return err
- }
- // Send a simple response
- res := []string{"Pong"}
- // Check if request is from a cluster member - only reveal timestamps
- // to members
- token := request[RequestTOKEN].(*MemberToken)
- if _, ok := manager.Client.peers[token.MemberName]; ok {
- ts, _ := manager.stateInfo.Get(StateInfoTS)
- res = append(res, ts.([]string)...)
- tsold, _ := manager.stateInfo.Get(StateInfoTSOLD)
- res = append(res, tsold.([]string)...)
- }
- *response = res
- return nil
- }
- /*
- StateInfoRequest answers with the member's state info.
- */
- func (ms *Server) StateInfoRequest(request map[RequestArgument]interface{},
- response *interface{}) error {
- // Verify the given token and retrieve the target member
- manager, err := ms.checkToken(request, false)
- if err != nil {
- return err
- }
- *response = mapToBytes(manager.stateInfo.Map())
- return nil
- }
- /*
- MemberInfoRequest answers with the member's static info.
- */
- func (ms *Server) MemberInfoRequest(request map[RequestArgument]interface{},
- response *interface{}) error {
- // Verify the given token and retrieve the target member
- manager, err := ms.checkToken(request, false)
- if err != nil {
- return err
- }
- *response = mapToBytes(manager.memberInfo)
- return nil
- }
- // Cluster membership functions
- // ============================
- /*
- JoinCluster is used by a new member if it wants to join the cluster.
- */
- func (ms *Server) JoinCluster(request map[RequestArgument]interface{},
- response *interface{}) error {
- // Verify the given token and retrieve the target member
- manager, err := ms.checkToken(request, false)
- if err != nil {
- return err
- }
- newMemberName := request[RequestMEMBERNAME].(string)
- newMemberRPC := request[RequestMEMBERRPC].(string)
- err = manager.JoinNewMember(newMemberName, newMemberRPC)
- if err == nil {
- // Return updated state info if there was no error
- *response = mapToBytes(manager.stateInfo.Map())
- }
- return err
- }
- /*
- AddMember adds a new member on the target member.
- */
- func (ms *Server) AddMember(request map[RequestArgument]interface{},
- response *interface{}) error {
- // Verify the given token and retrieve the target member
- manager, err := ms.checkToken(request, true)
- if err != nil {
- return err
- }
- // Acquire lock to modify client map
- newMemberName := request[RequestMEMBERNAME].(string)
- newMemberRPC := request[RequestMEMBERRPC].(string)
- newStateInfo := bytesToMap(request[RequestSTATEINFOMAP].([]byte))
- return manager.addMember(newMemberName, newMemberRPC, newStateInfo)
- }
- /*
- EjectMember can be called by a cluster member to eject itself or another cluster member.
- */
- func (ms *Server) EjectMember(request map[RequestArgument]interface{},
- response *interface{}) error {
- // Verify the given token and retrieve the target member
- manager, err := ms.checkToken(request, true)
- if err != nil {
- return err
- }
- memberToEject := request[RequestMEMBERNAME].(string)
- return manager.EjectMember(memberToEject)
- }
- // Cluster-wide locking
- // ====================
- /*
- AcquireLock tries to acquire a named lock for the source member on the
- target member. It fails if the lock is alread acquired by a different member.
- The lock can only be held for a limited amount of time.
- */
- func (ms *Server) AcquireLock(request map[RequestArgument]interface{},
- response *interface{}) error {
- // Verify the given token and retrieve the target member
- manager, err := ms.checkToken(request, true)
- if err != nil {
- return err
- }
- // Acquire lock to modify lock map
- manager.Client.maplock.Lock()
- manager.Client.maplock.Unlock()
- requestedLock := request[RequestLOCK].(string)
- sourceMember := request[RequestTOKEN].(*MemberToken).MemberName
- // Get the lock owner
- lockOwner, ok := manager.Client.clusterLocks.Get(requestedLock)
- if ok && lockOwner != sourceMember {
- // If there is already an owner return an error which mentions the owner
- return &Error{ErrLockTaken, lockOwner.(string)}
- }
- // If there is no owner set the source client as the new owner
- manager.Client.clusterLocks.Put(requestedLock, sourceMember)
- *response = sourceMember
- return nil
- }
- /*
- ReleaseLock releases a lock. Only the member which holds the lock can release it.
- */
- func (ms *Server) ReleaseLock(request map[RequestArgument]interface{},
- response *interface{}) error {
- // Verify the given token and retrieve the target member
- manager, err := ms.checkToken(request, true)
- if err != nil {
- return err
- }
- // Acquire lock to modify lock map
- manager.Client.maplock.Lock()
- defer manager.Client.maplock.Unlock()
- requestedLock := request[RequestLOCK].(string)
- sourceMember := request[RequestTOKEN].(*MemberToken).MemberName
- // Get the lock owner
- lockOwner, ok := manager.Client.clusterLocks.Get(requestedLock)
- if ok {
- if lockOwner == sourceMember {
- // Release lock
- manager.Client.clusterLocks.Remove(requestedLock)
- } else {
- // Lock is owned by someone else
- return &Error{ErrLockNotOwned, fmt.Sprintf("Owned by %v not by %v",
- lockOwner, sourceMember)}
- }
- }
- // Operation on a non-existing lock is a NOP
- return nil
- }
- // StateInfo functions
- // ===================
- /*
- UpdateStateInfo updates the state info of the target member.
- */
- func (ms *Server) UpdateStateInfo(request map[RequestArgument]interface{},
- response *interface{}) error {
- // Verify the given token and retrieve the target member
- manager, err := ms.checkToken(request, true)
- if err != nil {
- return err
- }
- newStateInfo := bytesToMap(request[RequestSTATEINFOMAP].([]byte))
- return manager.applyStateInfo(newStateInfo)
- }
- // Data request functions
- // ======================
- /*
- DataRequest handles a data request.
- */
- func (ms *Server) DataRequest(request map[RequestArgument]interface{},
- response *interface{}) error {
- // Verify the given token and retrieve the target member
- manager, err := ms.checkToken(request, true)
- if err != nil {
- return err
- }
- // Handle the data request
- reqdata := request[RequestDATA]
- return manager.handleDataRequest(reqdata, response)
- }
- // Helper functions
- // ================
- /*
- checkToken checks the member token in a given request.
- */
- func (ms *Server) checkToken(request map[RequestArgument]interface{},
- checkClusterMembership bool) (*MemberManager, error) {
- // Get the target member
- target := request[RequestTARGET].(string)
- token := request[RequestTOKEN].(*MemberToken)
- if manager, ok := ms.managers[target]; ok {
- // Generate expected auth from given requesting member name in token and secret of target
- expectedAuth := fmt.Sprintf("%X", sha512.Sum512_224([]byte(token.MemberName+manager.secret)))
- if token.MemberAuth == expectedAuth {
- if checkClusterMembership {
- // Check if the requesting client is actually a member of the cluster
- manager.Client.maplock.Lock()
- _, ok := manager.Client.peers[token.MemberName]
- manager.Client.maplock.Unlock()
- if !ok {
- return nil, ErrNotMember
- }
- }
- return manager, nil
- }
- return nil, ErrInvalidToken
- }
- return nil, ErrUnknownTarget
- }
|