| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435 | /* * EliasDB * * Copyright 2016 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */package managerimport (	"crypto/sha512"	"fmt"	"net/rpc"	"devt.de/krotik/common/errorutil")func init() {	// Create singleton Server instance.	server = &Server{make(map[string]*MemberManager)}	// Register the cluster API as RPC server	errorutil.AssertOk(rpc.Register(server))}/*RPCFunction is used to identify the called function in a RPC call*/type RPCFunction string/*List of all possible RPC functions. The list includes all RPC callable functionsin this file.*/const (	// General functions	RPCPing      RPCFunction = "Ping"	RPCSIRequest             = "StateInfoRequest"	RPCMIRequest             = "MemberInfoRequest"	// Cluster-wide locking	RPCAcquireLock = "AcquireLock"	RPCReleaseLock = "ReleaseLock"	// Cluster member management	RPCJoinCluster = "JoinCluster"	RPCAddMember   = "AddMember"	RPCEjectMember = "EjectMember"	// StateInfo functions	RPCUpdateStateInfo = "UpdateStateInfo"	// Data request functions	RPCDataRequest = "DataRequest")/*RequestArgument is used to identify arguments in a RPC call*/type RequestArgument int/*List of all possible arguments in a RPC request. There are usually no checks whichgive back an error if a required argument is missing. The RPC API is an internalAPI and might change without backwards compatibility.*/const (	// General arguments	RequestTARGET       RequestArgument = iota // Required argument which identifies the target cluster member	RequestTOKEN                               // Client token which is used for authorization checks	RequestLOCK                                // Lock name which a member requests to take	RequestMEMBERNAME                          // Name for a member	RequestMEMBERRPC                           // Rpc address and port for a member	RequestSTATEINFOMAP                        // StateInfo object as a map	RequestDATA                                // Data request object)/*server is the Server instance which serves rpc calls*/var server *Server/*Server is the RPC exposed cluster API of a cluster member. Serveris a singleton and will route incoming (authenticated) requests to registeredMemberManagers. The calling member is referred to as source member and the calledmember is referred to as target member.*/type Server struct {	managers map[string]*MemberManager // Map of local cluster members}// General functions// =================/*Ping answers with a Pong if the given client token was verified and the localcluster member exists.*/func (ms *Server) Ping(request map[RequestArgument]interface{},	response *interface{}) error {	// Verify the given token and retrieve the target member	manager, err := ms.checkToken(request, false)	if err != nil {		return err	}	// Send a simple response	res := []string{"Pong"}	// Check if request is from a cluster member - only reveal timestamps	// to members	token := request[RequestTOKEN].(*MemberToken)	if _, ok := manager.Client.peers[token.MemberName]; ok {		ts, _ := manager.stateInfo.Get(StateInfoTS)		res = append(res, ts.([]string)...)		tsold, _ := manager.stateInfo.Get(StateInfoTSOLD)		res = append(res, tsold.([]string)...)	}	*response = res	return nil}/*StateInfoRequest answers with the member's state info.*/func (ms *Server) StateInfoRequest(request map[RequestArgument]interface{},	response *interface{}) error {	// Verify the given token and retrieve the target member	manager, err := ms.checkToken(request, false)	if err != nil {		return err	}	*response = mapToBytes(manager.stateInfo.Map())	return nil}/*MemberInfoRequest answers with the member's static info.*/func (ms *Server) MemberInfoRequest(request map[RequestArgument]interface{},	response *interface{}) error {	// Verify the given token and retrieve the target member	manager, err := ms.checkToken(request, false)	if err != nil {		return err	}	*response = mapToBytes(manager.memberInfo)	return nil}// Cluster membership functions// ============================/*JoinCluster is used by a new member if it wants to join the cluster.*/func (ms *Server) JoinCluster(request map[RequestArgument]interface{},	response *interface{}) error {	// Verify the given token and retrieve the target member	manager, err := ms.checkToken(request, false)	if err != nil {		return err	}	newMemberName := request[RequestMEMBERNAME].(string)	newMemberRPC := request[RequestMEMBERRPC].(string)	err = manager.JoinNewMember(newMemberName, newMemberRPC)	if err == nil {		// Return updated state info if there was no error		*response = mapToBytes(manager.stateInfo.Map())	}	return err}/*AddMember adds a new member on the target member.*/func (ms *Server) AddMember(request map[RequestArgument]interface{},	response *interface{}) error {	// Verify the given token and retrieve the target member	manager, err := ms.checkToken(request, true)	if err != nil {		return err	}	// Acquire lock to modify client map	newMemberName := request[RequestMEMBERNAME].(string)	newMemberRPC := request[RequestMEMBERRPC].(string)	newStateInfo := bytesToMap(request[RequestSTATEINFOMAP].([]byte))	return manager.addMember(newMemberName, newMemberRPC, newStateInfo)}/*EjectMember can be called by a cluster member to eject itself or another cluster member.*/func (ms *Server) EjectMember(request map[RequestArgument]interface{},	response *interface{}) error {	// Verify the given token and retrieve the target member	manager, err := ms.checkToken(request, true)	if err != nil {		return err	}	memberToEject := request[RequestMEMBERNAME].(string)	return manager.EjectMember(memberToEject)}// Cluster-wide locking// ====================/*AcquireLock tries to acquire a named lock for the source member on thetarget member. It fails if the lock is alread acquired by a different member.The lock can only be held for a limited amount of time.*/func (ms *Server) AcquireLock(request map[RequestArgument]interface{},	response *interface{}) error {	// Verify the given token and retrieve the target member	manager, err := ms.checkToken(request, true)	if err != nil {		return err	}	// Acquire lock to modify lock map	manager.Client.maplock.Lock()	manager.Client.maplock.Unlock()	requestedLock := request[RequestLOCK].(string)	sourceMember := request[RequestTOKEN].(*MemberToken).MemberName	// Get the lock owner	lockOwner, ok := manager.Client.clusterLocks.Get(requestedLock)	if ok && lockOwner != sourceMember {		// If there is already an owner return an error which mentions the owner		return &Error{ErrLockTaken, lockOwner.(string)}	}	// If there is no owner set the source client as the new owner	manager.Client.clusterLocks.Put(requestedLock, sourceMember)	*response = sourceMember	return nil}/*ReleaseLock releases a lock. Only the member which holds the lock can release it.*/func (ms *Server) ReleaseLock(request map[RequestArgument]interface{},	response *interface{}) error {	// Verify the given token and retrieve the target member	manager, err := ms.checkToken(request, true)	if err != nil {		return err	}	// Acquire lock to modify lock map	manager.Client.maplock.Lock()	defer manager.Client.maplock.Unlock()	requestedLock := request[RequestLOCK].(string)	sourceMember := request[RequestTOKEN].(*MemberToken).MemberName	// Get the lock owner	lockOwner, ok := manager.Client.clusterLocks.Get(requestedLock)	if ok {		if lockOwner == sourceMember {			// Release lock			manager.Client.clusterLocks.Remove(requestedLock)		} else {			// Lock is owned by someone else			return &Error{ErrLockNotOwned, fmt.Sprintf("Owned by %v not by %v",				lockOwner, sourceMember)}		}	}	// Operation on a non-existing lock is a NOP	return nil}// StateInfo functions// ===================/*UpdateStateInfo updates the state info of the target member.*/func (ms *Server) UpdateStateInfo(request map[RequestArgument]interface{},	response *interface{}) error {	// Verify the given token and retrieve the target member	manager, err := ms.checkToken(request, true)	if err != nil {		return err	}	newStateInfo := bytesToMap(request[RequestSTATEINFOMAP].([]byte))	return manager.applyStateInfo(newStateInfo)}// Data request functions// ======================/*DataRequest handles a data request.*/func (ms *Server) DataRequest(request map[RequestArgument]interface{},	response *interface{}) error {	// Verify the given token and retrieve the target member	manager, err := ms.checkToken(request, true)	if err != nil {		return err	}	// Handle the data request	reqdata := request[RequestDATA]	return manager.handleDataRequest(reqdata, response)}// Helper functions// ================/*checkToken checks the member token in a given request.*/func (ms *Server) checkToken(request map[RequestArgument]interface{},	checkClusterMembership bool) (*MemberManager, error) {	// Get the target member	target := request[RequestTARGET].(string)	token := request[RequestTOKEN].(*MemberToken)	if manager, ok := ms.managers[target]; ok {		// Generate expected auth from given requesting member name in token and secret of target		expectedAuth := fmt.Sprintf("%X", sha512.Sum512_224([]byte(token.MemberName+manager.secret)))		if token.MemberAuth == expectedAuth {			if checkClusterMembership {				// Check if the requesting client is actually a member of the cluster				manager.Client.maplock.Lock()				_, ok := manager.Client.peers[token.MemberName]				manager.Client.maplock.Unlock()				if !ok {					return nil, ErrNotMember				}			}			return manager, nil		}		return nil, ErrInvalidToken	}	return nil, ErrUnknownTarget}
 |