| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506 | /* * EliasDB * * Copyright 2016 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. *//*Package cluster contains EliasDB's clustering code.The clustering code provides an abstraction layer to EliasDB's graphstorage.Storage.This means the actual storage of a cluster can be entirely memory based or useany other backend as long as it satisfies the graphstorage.Storage interface.DistributedStorage wraps a graphstorage.Storage and has a manager.MemberManagerobject.Members are identified by a unique name. Calling Start() on manager.MemberManagerregisters and starts the RPC server for the member. Cluster internal RPC requestsare served by manager.Server. It is a singleton object which routes RPC callsto registered MemberManagers - this architecture makes it easy to unit testthe cluster code. The manager.MemberManager has a manager.Client object whichcan be used to send messages to the cluster.The integrity of the cluster is protected by a shared secret (string) amongall members of the cluster. A new member can only join and communicate withthe cluster if it has the secret string. The secret string is never transferreddirectly over the network - it is only used for generating a member specifictoken which can be verified by all other members.The clustering code was inspired by Amazon DynamoDBhttp://www.allthingsdistributed.com/2012/01/amazon-dynamodb.html*/package clusterimport (	"fmt"	"math"	"sync"	"devt.de/krotik/common/datautil"	"devt.de/krotik/eliasdb/cluster/manager"	"devt.de/krotik/eliasdb/graph/graphstorage"	"devt.de/krotik/eliasdb/storage")/*DistributedStorageError is an error related to the distribution storage. Thiserror is returned when the data distribution fails for example when too manycluster members have failed.*/type DistributedStorageError struct {	err error // Wrapped error}/*newError creates a new DistributedStorageError.*/func newError(err error) error {	return &DistributedStorageError{err}}/*Error returns a string representation of a DistributedStorageError.*/func (dse *DistributedStorageError) Error() string {	return fmt.Sprint("Storage disabled: ", dse.err.Error())}/*DistributedStorage data structure*/type DistributedStorage struct {	MemberManager *manager.MemberManager // Manager object	distributionTableLock  *sync.Mutex        // Mutex to access the distribution table	distributionTable      *DistributionTable // Distribution table for the cluster - may be nil	distributionTableError error              // Error detail if the storage is disabled	localName         string                                // Name of the local graph storage	localDRHandler    func(interface{}, *interface{}) error // Local data request handler	localFlushHandler func() error                          // Handler to flush the local storage	localCloseHandler func() error                          // Handler to close the local storage	mainDB      map[string]string // Local main copy (only set when requested)	mainDBError error             // Last error when main db was requested}/*NewDistributedStorage creates a new cluster graph storage. The distributed storagewraps around a local graphstorage.Storage. The configuration of the distributedstorage consists of two parts: A normal config map which defines static informationlike rpc port, secret string, etc and a StateInfo object which is used for dynamicinformation like cluster members, member status, etc. An empty StateInfo meansthat the cluster has only one member.*/func NewDistributedStorage(gs graphstorage.Storage, config map[string]interface{},	stateInfo manager.StateInfo) (*DistributedStorage, error) {	ds, ms, err := newDistributedAndMemberStorage(gs, config, stateInfo)	if _, ok := gs.(*graphstorage.MemoryGraphStorage); ok {		msmap[ds] = ms // Keep track of memory storages for debugging	}	return ds, err}/*DSRetNew is the return value on successful creating a distributed storage(used for testing)*/var DSRetNew error/*newDistributedAndMemberStorage creates a new cluster graph storage but also returns areference to the internal memberStorage object.*/func newDistributedAndMemberStorage(gs graphstorage.Storage, config map[string]interface{},	stateInfo manager.StateInfo) (*DistributedStorage, *memberStorage, error) {	var repFac int	// Merge given configuration with default configuration	clusterconfig := datautil.MergeMaps(manager.DefaultConfig, config)	// Make 100% sure there is a secret string	if clusterconfig[manager.ConfigClusterSecret] == "" {		clusterconfig[manager.ConfigClusterSecret] = manager.DefaultConfig[manager.ConfigClusterSecret]	}	// Set replication factor	if f, ok := stateInfo.Get(manager.StateInfoREPFAC); !ok {		repFac = int(math.Max(clusterconfig[manager.ConfigReplicationFactor].(float64), 1))		stateInfo.Put(manager.StateInfoREPFAC, repFac)		stateInfo.Flush()	} else {		repFac = f.(int)	}	// Create member objects - these calls will initialise this member's state info	mm := manager.NewMemberManager(clusterconfig[manager.ConfigRPC].(string),		clusterconfig[manager.ConfigMemberName].(string),		clusterconfig[manager.ConfigClusterSecret].(string), stateInfo)	dt, err := NewDistributionTable(mm.Members(), repFac)	if err != nil {		mm.LogInfo("Storage disabled:", err)	}	ds := &DistributedStorage{mm, &sync.Mutex{}, dt, err, gs.Name(), nil, nil, nil, nil, nil}	// Create MemberStorage instance which is not exposed - the object will	// only be used by the RPC server and called during start and stop. It is	// the only instance which has access to the wrapped storage.GraphStorage.	memberStorage, err := newMemberStorage(ds, gs)	if err != nil {		return nil, nil, err	}	// Register handler function for RPC calls and for closing the local storage	mm.SetHandleDataRequest(memberStorage.handleDataRequest)	ds.localDRHandler = memberStorage.handleDataRequest	ds.localFlushHandler = memberStorage.gs.FlushAll	ds.localCloseHandler = memberStorage.gs.Close	// Set update handler	ds.MemberManager.SetEventHandler(func() {		// Handler for state info updates (this handler is called once the state		// info object has been updated from the current state)		si := mm.StateInfo()		rfo, ok := si.Get(manager.StateInfoREPFAC)		rf := rfo.(int)		members, ok2 := si.Get(manager.StateInfoMEMBERS)		if ok && ok2 {			distTable, distTableErr := ds.DistributionTable()			numMembers := len(members.([]string)) / 2			numFailedPeers := len(mm.Client.FailedPeers())			// Check if the cluster is operational			if distTableErr == nil && rf > 0 && numFailedPeers > rf-1 {				// Cluster is not operational				if distTable != nil {					err := fmt.Errorf("Too many members failed (total: %v, failed: %v, replication: %v)",						numMembers, numFailedPeers, rf)					mm.LogInfo("Storage disabled:", err.Error())					ds.SetDistributionTableError(err)				}				return			}			// Check if the replication factor has changed or the amount of members			if distTable == nil ||				numMembers != len(distTable.Members()) ||				rf != distTable.repFac {				// Try to renew the distribution table				if dt, err := NewDistributionTable(mm.Members(), rf); err == nil {					ds.SetDistributionTable(dt)				}			}		}	}, memberStorage.transferWorker)	return ds, memberStorage, DSRetNew}/*Start starts the distributed storage.*/func (ds *DistributedStorage) Start() error {	return ds.MemberManager.Start()}/*Close closes the distributed storage.*/func (ds *DistributedStorage) Close() error {	ds.MemberManager.Shutdown()	return ds.localCloseHandler()}/*IsOperational returns if this distribution storage is operational*/func (ds *DistributedStorage) IsOperational() bool {	ds.distributionTableLock.Lock()	defer ds.distributionTableLock.Unlock()	return ds.distributionTableError == nil && ds.distributionTable != nil}/*DistributionTable returns the current distribution table or an error if thestorage is not available.*/func (ds *DistributedStorage) DistributionTable() (*DistributionTable, error) {	ds.distributionTableLock.Lock()	defer ds.distributionTableLock.Unlock()	return ds.distributionTable, ds.distributionTableError}/*SetDistributionTable sets the distribution table and clears any error.*/func (ds *DistributedStorage) SetDistributionTable(dt *DistributionTable) {	ds.distributionTableLock.Lock()	defer ds.distributionTableLock.Unlock()	ds.distributionTable = dt	ds.distributionTableError = nil}/*SetDistributionTableError records an distribution table related error. Thisclears the current distribution table.*/func (ds *DistributedStorage) SetDistributionTableError(err error) {	ds.distributionTableLock.Lock()	defer ds.distributionTableLock.Unlock()	ds.distributionTable = nil	ds.distributionTableError = newError(err)}/*sendDataRequest is used to send data requests into the cluster.*/func (ds *DistributedStorage) sendDataRequest(member string, request *DataRequest) (interface{}, error) {	// Check if the request should be handled locally	if member == ds.MemberManager.Name() {		// Make sure to copy the request value for local insert or update requests.		// This is necessary since the serialization buffers are pooled and never		// dismissed. Locally the values are just passed around.		if request.RequestType == RTInsert || request.RequestType == RTUpdate {			var val []byte			datautil.CopyObject(request.Value, &val)			request.Value = val		}		var response interface{}		err := ds.localDRHandler(request, &response)		return response, err	}	return ds.MemberManager.Client.SendDataRequest(member, request)}/*Name returns the name of the cluster DistributedStorage instance.*/func (ds *DistributedStorage) Name() string {	return ds.MemberManager.Name()}/*LocalName returns the local name of the wrapped DistributedStorage instance.*/func (ds *DistributedStorage) LocalName() string {	return ds.localName}/*ReplicationFactor returns the replication factor of this cluster member. Avalue of 0 means the cluster is not operational in the moment.*/func (ds *DistributedStorage) ReplicationFactor() int {	// Do not do anything is the cluster is not operational	distTable, distTableErr := ds.DistributionTable()	if distTableErr != nil {		return 0	}	return distTable.repFac}/*MainDB returns the main database. The main database is a quicklookup map for meta data which is always kept in memory.*/func (ds *DistributedStorage) MainDB() map[string]string {	ret := make(map[string]string)	// Clear the current mainDB cache	ds.mainDB = nil	// Do not do anything is the cluster is not operational	distTable, distTableErr := ds.DistributionTable()	if distTableErr != nil {		ds.mainDBError = distTableErr		return ret	}	// Main db requests always go to member 1	member := distTable.Members()[0]	request := &DataRequest{RTGetMain, nil, nil, false}	mainDB, err := ds.sendDataRequest(member, request)	if err != nil {		// Cycle through all replicating members if there was an error.		// (as long as the cluster is considered operational there must be a		// replicating member available to accept the request)		for _, rmember := range distTable.Replicas(member) {			mainDB, err = ds.sendDataRequest(rmember, request)			if err == nil {				break			}		}	}	ds.mainDBError = err	if mainDB != nil {		ds.mainDB = mainDB.(map[string]string)		ret = ds.mainDB	}	// We failed to get the main db - any flush will fail.	return ret}/*RollbackMain rollback the main database.*/func (ds *DistributedStorage) RollbackMain() error {	// Nothing to do here - the main db will be updated next time it is requested	ds.mainDB = nil	ds.mainDBError = nil	return nil}/*FlushMain writes the main database to the storage.*/func (ds *DistributedStorage) FlushMain() error {	// Make sure there is no error	distTable, distTableErr := ds.DistributionTable()	if ds.mainDBError != nil {		return ds.mainDBError	} else if distTableErr != nil {		return distTableErr	}	// Main db requests always go to member 1	member := distTable.Members()[0]	request := &DataRequest{RTSetMain, nil, ds.mainDB, false}	_, err := ds.sendDataRequest(member, request)	if err != nil {		// Cycle through all replicating members if there was an error.		// (as long as the cluster is considered operational there must be a		// replicating member available to accept the request)		for _, rmember := range distTable.Replicas(member) {			_, err = ds.sendDataRequest(rmember, request)			if err == nil {				break			}		}	}	return err}/*FlushAll writes all pending local changes to the storage.*/func (ds *DistributedStorage) FlushAll() error {	return ds.localFlushHandler()}/*StorageManager gets a storage manager with a certain name. A non-exisClusterting StorageManageris not created automatically if the create flag is set to false.*/func (ds *DistributedStorage) StorageManager(smname string, create bool) storage.Manager {	// Make sure there is no error	distTable, distTableErr := ds.DistributionTable()	if ds.mainDBError != nil {		return nil	} else if distTableErr != nil {		return nil	}	if !create {		// Make sure the storage manage exists if it should not be created.		// Try to get its 1st root value. If nil is returned then the storage		// manager does not exist.		// Root ids always go to member 1 as well as the first insert request for data.		member := distTable.Members()[0]		request := &DataRequest{RTGetRoot, map[DataRequestArg]interface{}{			RPStoreName: smname,			RPRoot:      1,		}, nil, false}		res, err := ds.sendDataRequest(member, request)		if res == nil && err == nil {			return nil		}	}	return &DistributedStorageManager{smname, 0, ds, nil}}
 |