123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package cluster
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "sort"
- "strings"
- "sync"
- "devt.de/krotik/common/sortutil"
- "devt.de/krotik/eliasdb/cluster/manager"
- "devt.de/krotik/eliasdb/graph/graphstorage"
- "devt.de/krotik/eliasdb/hash"
- "devt.de/krotik/eliasdb/storage"
- )
- /*
- ClusterStoragePrefix is the prefix for cluster related storage managers
- */
- const ClusterStoragePrefix = "cs_"
- /*
- LocalStoragePrefix is the prefix for local storage managers
- */
- const LocalStoragePrefix = "ls_"
- /*
- memberStorage models the local storage of a cluster member. This data structure
- is the only thing which has access to the wrapped graphstorage.Storage.
- */
- type memberStorage struct {
- ds *DistributedStorage // Distributed storage which created this member storage
- gs graphstorage.Storage // Wrapped graphstorage.Storage
- at *memberAddressTable // Address table (cluster location -> local location)
- transferLock *sync.Mutex // Lock for the transfer task
- transferRunning bool // Flag to indicate that the transfer task is running
- rebalanceLock *sync.Mutex // Lock for the rebalance task
- rebalanceRunning bool // Flag to indicate that the rebalance task is running
- rebalanceCounter int
- }
- /*
- newMemberStorage creates a new memberStorage instance.
- */
- func newMemberStorage(ds *DistributedStorage, gs graphstorage.Storage) (*memberStorage, error) {
- sm := gs.StorageManager("cluster_translation", true)
- at, err := newMemberAddressTable(ds, sm)
- if err != nil {
- return nil, err
- }
- return &memberStorage{ds, gs, at, &sync.Mutex{}, false, &sync.Mutex{}, false, 0}, nil
- }
- /*
- handleDataRequest deals with RPC requests. It is the only function which is
- called by the RPC server of the member manager.
- */
- func (ms *memberStorage) handleDataRequest(request interface{}, response *interface{}) error {
- var err error
- // Make sure a request can be served
- distTable, distTableErr := ms.at.checkState()
- if distTableErr != nil {
- return distTableErr
- }
- dr := request.(*DataRequest)
- switch dr.RequestType {
- case RTGetMain:
- *response = ms.gs.MainDB()
- case RTSetMain:
- err = ms.handleSetMainRequest(distTable, dr, response)
- case RTSetRoot:
- err = ms.handleSetRootRequest(distTable, dr, response)
- case RTGetRoot:
- err = ms.handleGetRootRequest(distTable, dr, response)
- case RTInsert:
- err = ms.handleInsertRequest(distTable, dr, response)
- case RTUpdate:
- err = ms.handleUpdateRequest(distTable, dr, response)
- case RTFree:
- err = ms.handleFreeRequest(distTable, dr, response)
- case RTExists:
- err = ms.handleFetchRequest(distTable, dr, response, false)
- case RTFetch:
- err = ms.handleFetchRequest(distTable, dr, response, true)
- case RTRebalance:
- err = ms.handleRebalanceRequest(distTable, dr, response)
- default:
- err = fmt.Errorf("Unknown request type")
- }
- manager.LogDebug(ms.ds.MemberManager.Name(), fmt.Sprintf("(Store): Handled: %v %s (Transfer: %v, Error: %v)",
- dr.RequestType, dr.Args, dr.Transfer, err))
- return err
- }
- /*
- handleSetMainRequest sets the mainDB on the local storage manager.
- */
- func (ms *memberStorage) handleSetMainRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
- mainDB := ms.gs.MainDB()
- newMainDB := request.Value.(map[string]string)
- // Update keys and values
- for k, v := range newMainDB {
- mainDB[k] = v
- }
- // Check if things should be deleted
- var toRemove []string
- for k := range mainDB {
- if _, ok := newMainDB[k]; !ok {
- toRemove = append(toRemove, k)
- }
- }
- for _, k := range toRemove {
- delete(mainDB, k)
- }
- err := ms.gs.FlushMain()
- if !request.Transfer {
- ms.at.AddTransferRequest(distTable.OtherReplicationMembers(0, ms.ds.MemberManager.Name()),
- &DataRequest{RTSetMain, nil, request.Value, true})
- }
- return err
- }
- /*
- handleGetRootRequest retrieves a root value from a local storage manager.
- */
- func (ms *memberStorage) handleGetRootRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
- dsname := request.Args[RPStoreName].(string)
- root := request.Args[RPRoot].(int)
- sm := ms.dataStorage(dsname, false)
- if sm != nil {
- *response = sm.Root(root)
- }
- return nil
- }
- /*
- handleSetRootRequest sets a new root value in a local storage manager.
- */
- func (ms *memberStorage) handleSetRootRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
- dsname := request.Args[RPStoreName].(string)
- root := request.Args[RPRoot].(int)
- sm := ms.dataStorage(dsname, true)
- sm.SetRoot(root, request.Value.(uint64))
- if !request.Transfer {
- ms.at.AddTransferRequest(distTable.OtherReplicationMembers(0, ms.ds.MemberManager.Name()),
- &DataRequest{RTSetRoot, request.Args, request.Value, true})
- }
- return sm.Flush()
- }
- /*
- handleInsertRequest inserts an object and return its cluster storage location.
- Distribution procedure:
- Client -> Cluster Member Request Receiver
- Cluster Member Request Receiver -> Cluster Member Primary Storage (chosen round-robin / available)
- Cluster Member Primary Storage writes into its Transfer Table
- Cluster Member Primary Storage (Transfer worker) -> Replicating Cluster Members
- */
- func (ms *memberStorage) handleInsertRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
- var err error
- var cloc uint64
- dsname := request.Args[RPStoreName].(string)
- *response = 0
- sm := ms.dataStorage(dsname, true)
- if !request.Transfer {
- // First get a new cluster location (on this member)
- cloc, err = ms.at.NewClusterLoc(dsname)
- } else {
- // If this is a transfer request we know already the cluster location
- cloc = request.Args[RPLoc].(uint64)
- }
- if err == nil {
- // Insert into the local storage
- loc, err := sm.Insert(request.Value)
- if err == nil {
- // Add a translation
- _, _, err = ms.at.SetTransClusterLoc(dsname, cloc, loc, 1)
- if err == nil {
- if !request.Transfer {
- // Add transfer request for replication
- // At this point the operation has succeeded. We still need to
- // replicate the change to all the replicating members but
- // any errors happening during this shall not fail this operation.
- // The next rebalancing will then synchronize all members again.
- ms.at.AddTransferRequest(distTable.Replicas(ms.ds.MemberManager.Name()),
- &DataRequest{RTInsert, map[DataRequestArg]interface{}{
- RPStoreName: dsname,
- RPLoc: cloc,
- }, request.Value, true})
- }
- *response = cloc
- }
- }
- }
- return err
- }
- /*
- handleUpdateRequest updates an object and return its cluster storage location.
- There is indeed a chance to produce inconsistencies if members fail in the right
- sequence. It is assumed that these will be delt with in the next rebalance.
- Distribution procedure:
- Client -> Cluster Member Request Receiver
- Cluster Member Request Receiver -> Cluster Member Primary Storage or Replicating Cluster Member
- Storing Cluster Member does the update and writes into its transfer table
- Storing Cluster Member (Transfer worker) -> Replicating / Primary Cluster Members
- */
- func (ms *memberStorage) handleUpdateRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
- var err error
- var newVersion uint64
- dsname := request.Args[RPStoreName].(string)
- cloc := request.Args[RPLoc].(uint64)
- *response = 0
- // Get the translation
- transRec, ok, err := ms.at.TransClusterLoc(dsname, cloc)
- if ok {
- sm := ms.dataStorage(dsname, false)
- if sm != nil {
- // Update the local storage
- if !request.Transfer {
- err = sm.Update(transRec.loc, request.Value)
- newVersion = transRec.ver + 1
- } else {
- newVersion = request.Args[RPVer].(uint64)
- if newVersion >= transRec.ver {
- err = sm.Update(transRec.loc, request.Value)
- } else {
- // Outdated update requests are simply ignored
- err = fmt.Errorf("Received outdated update request (%v - Location: %v)",
- ms.ds.MemberManager.Name(), cloc)
- manager.LogDebug(ms.ds.MemberManager.Name(), err.Error())
- // Need to return no error so the transfer worker on the
- // other side removes its entry
- err = nil
- }
- }
- if err == nil {
- // Increase the version of the translation record
- _, _, err = ms.at.SetTransClusterLoc(dsname, cloc, transRec.loc, newVersion)
- if err == nil {
- if !request.Transfer {
- // Add transfer request for replication
- // At this point the operation has succeeded. We still need to
- // replicate the change to all the replicating members but
- // any errors happening during this shall not fail this operation.
- // The next rebalancing will then synchronize all members again.
- ms.at.AddTransferRequest(distTable.OtherReplicationMembers(cloc, ms.ds.MemberManager.Name()),
- &DataRequest{RTUpdate, map[DataRequestArg]interface{}{
- RPStoreName: dsname,
- RPLoc: cloc,
- RPVer: newVersion,
- }, request.Value, true})
- }
- *response = cloc
- return nil
- }
- }
- }
- }
- if err == nil {
- err = fmt.Errorf("Cluster slot not found (%v - Location: %v)",
- ms.ds.MemberManager.Name(), cloc)
- }
- return err
- }
- /*
- handleFreeRequest removes an object.
- Distribution procedure:
- Client -> Cluster Member Request Receiver
- Cluster Member Request Receiver -> Cluster Member Primary Storage or Replicating Cluster Member
- Storing Cluster Member does the free and writes into its transfer table
- Storing Cluster Member (Transfer worker) -> Replicating / Primary Cluster Members
- */
- func (ms *memberStorage) handleFreeRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
- var err error
- dsname := request.Args[RPStoreName].(string)
- cloc := request.Args[RPLoc].(uint64)
- // Get the translation
- transRec, ok, err := ms.at.TransClusterLoc(dsname, cloc)
- if ok {
- sm := ms.dataStorage(dsname, false)
- if sm != nil {
- // Remove the translation
- _, _, err = ms.at.RemoveTransClusterLoc(dsname, cloc)
- if err == nil {
- // Remove from the local storage
- err = sm.Free(transRec.loc)
- if !request.Transfer {
- // Add transfer request for replication
- // At this point the operation has succeeded. We still need to
- // replicate the change to all the replicating members but
- // any errors happening during this shall not fail this operation.
- // The next rebalancing will then synchronize all members again.
- ms.at.AddTransferRequest(distTable.OtherReplicationMembers(cloc, ms.ds.MemberManager.Name()),
- &DataRequest{RTFree, map[DataRequestArg]interface{}{
- RPStoreName: dsname,
- RPLoc: cloc,
- }, nil, true})
- }
- return err
- }
- }
- }
- if err == nil {
- err = fmt.Errorf("Cluster slot not found (%v - Location: %v)", ms.ds.MemberManager.Name(), cloc)
- }
- return err
- }
- /*
- handleFetchRequest inserts an object and return its cluster storage location.
- */
- func (ms *memberStorage) handleFetchRequest(distTable *DistributionTable,
- request *DataRequest, response *interface{}, fetch bool) error {
- var err error
- dsname := request.Args[RPStoreName].(string)
- cloc := request.Args[RPLoc].(uint64)
- // Get the translation
- transRec, ok, err := ms.at.TransClusterLoc(dsname, cloc)
- if ok {
- // Check if the data should be retrieved
- if !fetch {
- *response = true
- return nil
- }
- sm := ms.dataStorage(dsname, false)
- if sm != nil {
- var res []byte
- err = sm.Fetch(transRec.loc, &res)
- if err == nil {
- *response = res
- return nil
- }
- }
- } else if !fetch {
- *response = false
- return err
- }
- if err == nil {
- err = fmt.Errorf("Cluster slot not found (%v - Location: %v)", ms.ds.MemberManager.Name(), cloc)
- }
- return err
- }
- /*
- handleRebalanceRequest processes rebalance requests.
- */
- func (ms *memberStorage) handleRebalanceRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
- var err error
- var tr *translationRec
- var found bool
- var res interface{}
- var lloc uint64
- handleError := func(err error) {
- if err != nil {
- manager.LogDebug(ms.ds.MemberManager.Name(), fmt.Sprintf("(Store): Error during rebalancing request handling: %v", err))
- }
- }
- // Get the location ranges for this member and locations which are replicated on this member.
- storeRangeStart, storeRangeStop := distTable.MemberRange(ms.ds.MemberManager.Name())
- repRangeStart, repRangeStop := distTable.ReplicationRange(ms.ds.MemberManager.Name())
- // Get the request data
- rsource := request.Args[RPSrc].(string)
- smnames := request.Args[RPStoreName]
- locs := request.Args[RPLoc]
- vers := request.Args[RPVer]
- for i, cloc := range locs.([]uint64) {
- // Check if there was an error from the previous iteration
- handleError(err)
- smname := smnames.([]string)[i]
- ver := vers.([]uint64)[i]
- // Do not proceed if there is an error or if the location is out of
- // range of responsibility
- notInStoreRange := cloc < storeRangeStart || cloc > storeRangeStop
- notInRepRange := cloc < repRangeStart || cloc > repRangeStop
- // Check if the location exists in the local storage
- tr, found, err = ms.at.TransClusterLoc(smname, cloc)
- if err != nil || (notInStoreRange && notInRepRange) {
- // Skip the location if there was an error or if this member
- // is not relevant for the location in question (either as primary
- // storage member or as replica)
- continue
- }
- if found {
- // Check if the version is newer and update the local record if it is
- if tr.ver < ver {
- // Local record exists and needs to be updated
- sm := ms.dataStorage(smname, false)
- // Fetch the data from the remote machine
- res, err = ms.ds.sendDataRequest(rsource, &DataRequest{RTFetch, map[DataRequestArg]interface{}{
- RPStoreName: smname,
- RPLoc: cloc,
- }, nil, false})
- if err == nil {
- // Update the local storage
- if err = sm.Update(tr.loc, res); err == nil {
- // Update the translation
- _, _, err = ms.at.SetTransClusterLoc(smname, cloc, tr.loc, ver)
- manager.LogDebug(ms.ds.MemberManager.Name(),
- fmt.Sprintf("(Store): Rebalance updated %v location: %v", smname, cloc))
- }
- }
- }
- } else {
- // The data on the remote system should be inserted into the local
- // datastore.
- sm := ms.dataStorage(smname, true)
- // Fetch the data from the remote machine
- res, err = ms.ds.sendDataRequest(rsource, &DataRequest{RTFetch, map[DataRequestArg]interface{}{
- RPStoreName: smname,
- RPLoc: cloc,
- }, nil, false})
- if err == nil {
- // Insert into the local storage
- lloc, err = sm.Insert(res)
- if err == nil {
- // Add a translation
- _, _, err = ms.at.SetTransClusterLoc(smname, cloc, lloc, ver)
- manager.LogDebug(ms.ds.MemberManager.Name(),
- fmt.Sprintf("(Store): Rebalance inserted %v location: %v", smname, cloc))
- }
- }
- }
- if err == nil {
- // Should the sender have the data
- sourceSRangeStart, sourceSRangeStop := distTable.MemberRange(rsource)
- sourceRRangeStart, sourceRRangeStop := distTable.ReplicationRange(rsource)
- notInSourceSRange := cloc < sourceSRangeStart || cloc > sourceSRangeStop
- notInSourceRRange := cloc < sourceRRangeStart || cloc > sourceRRangeStop
- if notInSourceSRange && notInSourceRRange {
- manager.LogDebug(ms.ds.MemberManager.Name(),
- fmt.Sprintf("(Store): Rebalance removes %v location: %v from member %v",
- smname, tr.loc, rsource))
- _, err = ms.ds.sendDataRequest(rsource, &DataRequest{RTFree, map[DataRequestArg]interface{}{
- RPStoreName: smname,
- RPLoc: cloc,
- }, nil, true})
- }
- }
- }
- handleError(err)
- return nil
- }
- /*
- dataStorage returns a storage.StorageManager which will only store byte slices.
- */
- func (ms *memberStorage) dataStorage(dsname string, create bool) storage.Manager {
- return ms.gs.StorageManager(LocalStoragePrefix+dsname, create)
- }
- /*
- dump dumps the contents of a particular member storage manager as escaped strings.
- (Works only for MemoryStorageManagers.)
- */
- func (ms *memberStorage) dump(smname string) string {
- var res string
- printTransferTable := func(buf *bytes.Buffer) {
- // Go through the transfer table and see if there is anything
- it := hash.NewHTreeIterator(ms.at.transfer)
- for it.HasNext() {
- _, val := it.Next()
- if val != nil {
- tr := val.(*transferRec)
- args, _ := json.Marshal(tr.request.Args)
- vals, ok := tr.request.Value.([]byte)
- if !ok {
- vals, _ = json.Marshal(tr.request.Value)
- }
- buf.WriteString(fmt.Sprintf("transfer: %v - %v %v %q\n",
- tr.members, tr.request.RequestType, string(args), vals))
- }
- }
- }
- if smname == "" {
- // Dump the contents of the MainDB if no name is given
- buf := new(bytes.Buffer)
- buf.WriteString(fmt.Sprintf("%v MemberStorageManager MainDB\n",
- ms.ds.MemberManager.Name()))
- var keys []string
- for k := range ms.gs.MainDB() {
- keys = append(keys, k)
- }
- sort.Strings(keys)
- // Output local storage content with mapped cluster locations
- for _, k := range keys {
- v := ms.gs.MainDB()[k]
- buf.WriteString(fmt.Sprintf("%v - %q\n", k, v))
- }
- printTransferTable(buf)
- return buf.String()
- }
- sm := ms.dataStorage(smname, false)
- if sm != nil {
- // Make sure the storage manager is a MemoryStorageManager
- if msm, ok := sm.(*storage.MemoryStorageManager); ok {
- // Get all stored cluster locations
- locmap := make(map[uint64]string)
- it := hash.NewHTreeIterator(ms.at.translation)
- for it.HasNext() {
- k, v := it.Next()
- key := string(k)
- if strings.HasPrefix(key, transPrefix) {
- key = string(key[len(fmt.Sprint(transPrefix, smname, "#")):])
- locmap[v.(*translationRec).loc] = fmt.Sprintf("%v (v:%v)",
- key, v.(*translationRec).ver)
- }
- }
- buf := new(bytes.Buffer)
- buf.WriteString(fmt.Sprintf("%v MemberStorageManager %v\n",
- ms.ds.MemberManager.Name(), msm.Name()))
- buf.WriteString("Roots: ")
- // Go through root values
- for i := 0; i < 10; i++ {
- rootVal := msm.Root(i)
- buf.WriteString(fmt.Sprintf("%v=%v ", i, rootVal))
- }
- buf.WriteString("\n")
- var keys []uint64
- for k := range msm.Data {
- keys = append(keys, k)
- }
- sortutil.UInt64s(keys)
- // Output local storage content with mapped cluster locations
- for _, k := range keys {
- v := msm.Data[k]
- caddr := locmap[k]
- buf.WriteString(fmt.Sprintf("cloc: %v - lloc: %v - %q\n",
- caddr, k, v))
- }
- printTransferTable(buf)
- res = buf.String()
- }
- }
- return res
- }
|