123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- /*
- Package manager contains the management code for EliasDB's clustering feature.
- The management code deals with cluster building, general communication between cluster
- members, verification of communicating peers and monitoring of members.
- The cluster structure is pure peer-to-peer design with no single point of failure. All
- members of the cluster share a versioned cluster state which is persisted. Members have
- to manually be added or removed from the cluster. Each member also has a member info object
- which can be used by the application which uses the cluster to store additional member
- related information.
- Temporary failures are detected automatically. Every member of the cluster monitors the
- state of all its peers by sending ping requests to them on a regular schedule.
- */
- package manager
- import (
- "bytes"
- "encoding/gob"
- "fmt"
- "sync"
- "devt.de/krotik/common/datautil"
- "devt.de/krotik/common/errorutil"
- "devt.de/krotik/common/fileutil"
- "devt.de/krotik/eliasdb/storage"
- )
- // Cluster config
- // ==============
- /*
- ConfigRPC is the PRC network interface for the local cluster manager
- */
- const ConfigRPC = "ClusterMemberRPC"
- /*
- ConfigMemberName is the name of the cluster member
- */
- const ConfigMemberName = "ClusterMemberName"
- /*
- ConfigClusterSecret is the secret which authorizes a cluster member
- (the secret must never be send directly over the network)
- */
- const ConfigClusterSecret = "ClusterSecret"
- /*
- ConfigReplicationFactor is the number of times a given datum must be stored
- redundently. The cluster can suffer n-1 member losses before it becomes
- inoperational. The value is set once in the configuration and becomes afterwards
- part of the global cluster state info (once this is there the config value is ignored).
- */
- const ConfigReplicationFactor = "ReplicationFactor"
- /*
- DefaultConfig is the defaut configuration
- */
- var DefaultConfig = map[string]interface{}{
- ConfigRPC: "localhost:9030",
- ConfigMemberName: "member1",
- ConfigClusterSecret: "secret123",
- ConfigReplicationFactor: 1.0,
- }
- // Cluster state info
- // ==================
- /*
- Known StateInfo entries
- */
- const (
- StateInfoTS = "ts" // Timestamp of state info
- StateInfoTSOLD = "tsold" // Previous timestamp of state info
- StateInfoMEMBERS = "members" // List of known cluster members
- StateInfoFAILED = "failed" // List of failed peers
- StateInfoREPFAC = "replication" // Replication factor of the cluster
- )
- /*
- Known MemberInfo entries
- */
- const (
- MemberInfoError = "error" // Error message if a member was not reachable
- MemberInfoTermURL = "termurl" // URL to the cluster terminal of the member
- )
- /*
- StateInfo models a state object which stores cluster related data. This
- information is exchanged between cluster members. It is not expected that
- the info changes frequently.
- */
- type StateInfo interface {
- /*
- Put stores some data in the state info.
- */
- Put(key string, value interface{})
- /*
- Get retrievtes some data from the state info.
- */
- Get(key string) (interface{}, bool)
- /*
- Map returns the state info as a map.
- */
- Map() map[string]interface{}
- /*
- Flush persists the state info.
- */
- Flush() error
- }
- /*
- DefaultStateInfo is the default state info which uses a file to persist its data.
- */
- type DefaultStateInfo struct {
- *datautil.PersistentMap
- datalock *sync.RWMutex
- }
- /*
- NewDefaultStateInfo creates a new DefaultStateInfo.
- */
- func NewDefaultStateInfo(filename string) (StateInfo, error) {
- var pm *datautil.PersistentMap
- var err error
- if res, _ := fileutil.PathExists(filename); !res {
- pm, err = datautil.NewPersistentMap(filename)
- if err != nil {
- return nil, &Error{ErrClusterConfig,
- fmt.Sprintf("Cannot create state info file %v: %v",
- filename, err.Error())}
- }
- } else {
- pm, err = datautil.LoadPersistentMap(filename)
- if err != nil {
- return nil, &Error{ErrClusterConfig,
- fmt.Sprintf("Cannot load state info file %v: %v",
- filename, err.Error())}
- }
- }
- return &DefaultStateInfo{pm, &sync.RWMutex{}}, nil
- }
- /*
- Map returns the state info as a map.
- */
- func (dsi *DefaultStateInfo) Map() map[string]interface{} {
- var ret map[string]interface{}
- datautil.CopyObject(dsi.Data, &ret)
- return ret
- }
- /*
- Get retrieves some data from the state info.
- */
- func (dsi *DefaultStateInfo) Get(key string) (interface{}, bool) {
- dsi.datalock.RLock()
- defer dsi.datalock.RUnlock()
- v, ok := dsi.Data[key]
- return v, ok
- }
- /*
- Put stores some data in the state info.
- */
- func (dsi *DefaultStateInfo) Put(key string, value interface{}) {
- dsi.datalock.Lock()
- defer dsi.datalock.Unlock()
- dsi.Data[key] = value
- }
- /*
- Flush persists the state info.
- */
- func (dsi *DefaultStateInfo) Flush() error {
- if err := dsi.PersistentMap.Flush(); err != nil {
- return &Error{ErrClusterConfig,
- fmt.Sprintf("Cannot persist state info: %v",
- err.Error())}
- }
- return nil
- }
- /*
- MsiRetFlush nil or the error which should be returned by a Flush call
- */
- var MsiRetFlush error
- /*
- MemStateInfo is a state info object which does not persist its data.
- */
- type MemStateInfo struct {
- data map[string]interface{}
- datalock *sync.RWMutex
- }
- /*
- NewMemStateInfo creates a new MemStateInfo.
- */
- func NewMemStateInfo() StateInfo {
- return &MemStateInfo{make(map[string]interface{}), &sync.RWMutex{}}
- }
- /*
- Map returns the state info as a map.
- */
- func (msi *MemStateInfo) Map() map[string]interface{} {
- var ret map[string]interface{}
- datautil.CopyObject(msi.data, &ret)
- return ret
- }
- /*
- Get retrieves some data from the state info.
- */
- func (msi *MemStateInfo) Get(key string) (interface{}, bool) {
- msi.datalock.RLock()
- defer msi.datalock.RUnlock()
- v, ok := msi.data[key]
- return v, ok
- }
- /*
- Put stores some data in the state info.
- */
- func (msi *MemStateInfo) Put(key string, value interface{}) {
- msi.datalock.Lock()
- defer msi.datalock.Unlock()
- msi.data[key] = value
- }
- /*
- Flush does not do anything :-)
- */
- func (msi *MemStateInfo) Flush() error {
- return MsiRetFlush
- }
- // Helper functions to properly serialize maps
- // ===========================================
- /*
- mapToBytes converts a given map to bytes. This method panics on errors.
- */
- func mapToBytes(m map[string]interface{}) []byte {
- bb := storage.BufferPool.Get().(*bytes.Buffer)
- defer func() {
- bb.Reset()
- storage.BufferPool.Put(bb)
- }()
- errorutil.AssertOk(gob.NewEncoder(bb).Encode(m))
- return bb.Bytes()
- }
- /*
- bytesToMap tries to convert a given byte array into a map. This method panics on errors.
- */
- func bytesToMap(b []byte) map[string]interface{} {
- var ret map[string]interface{}
- errorutil.AssertOk(gob.NewDecoder(bytes.NewReader(b)).Decode(&ret))
- return ret
- }
|