123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package cluster
- import (
- "bytes"
- "encoding/gob"
- "fmt"
- "devt.de/krotik/common/errorutil"
- "devt.de/krotik/eliasdb/storage"
- )
- /*
- DistributedStorageManager is a storage.Manager which sends requests to the
- distributed storage.
- */
- type DistributedStorageManager struct {
- name string // Name of the storage manager
- rrc int // Round robin counter
- ds *DistributedStorage // Distributed storage which created the instance
- rootError error // Last error when root values were handled
- }
- /*
- Name returns the name of the StorageManager instance.
- */
- func (dsm *DistributedStorageManager) Name() string {
- return fmt.Sprint("DistributedStorageManager: ", dsm.name)
- }
- /*
- Root returns a root value.
- */
- func (dsm *DistributedStorageManager) Root(root int) uint64 {
- var ret uint64
- // Do not do anything is the cluster is not operational
- distTable, distTableErr := dsm.ds.DistributionTable()
- if distTableErr != nil {
- return 0
- }
- // Root ids always go to member 1
- member := distTable.Members()[0]
- request := &DataRequest{RTGetRoot, map[DataRequestArg]interface{}{
- RPStoreName: dsm.name,
- RPRoot: root,
- }, nil, false}
- res, err := dsm.ds.sendDataRequest(member, request)
- if err != nil {
- // Cycle through all replicating members if there was an error.
- // (as long as the cluster is considered operational there must be a
- // replicating member available to accept the request)
- for _, rmember := range distTable.Replicas(member) {
- res, err = dsm.ds.sendDataRequest(rmember, request)
- if err == nil {
- break
- }
- }
- }
- dsm.rootError = err
- if res != nil {
- ret = res.(uint64)
- }
- return ret
- }
- /*
- SetRoot writes a root value.
- */
- func (dsm *DistributedStorageManager) SetRoot(root int, val uint64) {
- // Do not do anything is the cluster is not operational
- distTable, distTableErr := dsm.ds.DistributionTable()
- if distTableErr != nil {
- return
- }
- // Root ids always go to member 1
- member := distTable.Members()[0]
- request := &DataRequest{RTSetRoot, map[DataRequestArg]interface{}{
- RPStoreName: dsm.name,
- RPRoot: root,
- }, val, false}
- _, err := dsm.ds.sendDataRequest(member, request)
- if err != nil {
- // Cycle through all replicating members if there was an error.
- // (as long as the cluster is considered operational there must be a
- // replicating member available to accept the request)
- for _, rmember := range distTable.Replicas(member) {
- _, err = dsm.ds.sendDataRequest(rmember, request)
- if err == nil {
- break
- }
- }
- }
- dsm.rootError = err
- }
- /*
- Insert inserts an object and return its storage location.
- */
- func (dsm *DistributedStorageManager) Insert(o interface{}) (uint64, error) {
- return dsm.insertOrUpdate(true, 0, o)
- }
- /*
- Update updates a storage location.
- */
- func (dsm *DistributedStorageManager) Update(loc uint64, o interface{}) error {
- _, err := dsm.insertOrUpdate(false, loc, o)
- return err
- }
- /*
- insertOrUpdate stores an object and returns its storage location and any error.
- */
- func (dsm *DistributedStorageManager) insertOrUpdate(insert bool, loc uint64, o interface{}) (uint64, error) {
- var member string
- var replicatingMembers []string
- var rtype RequestType
- var ret uint64
- // Do not do anything is the cluster is not operational
- distTable, distTableErr := dsm.ds.DistributionTable()
- if distTableErr != nil {
- return 0, distTableErr
- }
- // Choose the instance this request should be routed to
- if insert {
- members := distTable.Members()
- member = members[(dsm.rrc)%len(members)]
- rtype = RTInsert
- } else {
- member, replicatingMembers = distTable.LocationHome(loc)
- rtype = RTUpdate
- }
- // Serialize the object into a gob bytes stream
- bb := storage.BufferPool.Get().(*bytes.Buffer)
- defer func() {
- bb.Reset()
- storage.BufferPool.Put(bb)
- }()
- errorutil.AssertOk(gob.NewEncoder(bb).Encode(o))
- request := &DataRequest{rtype, map[DataRequestArg]interface{}{
- RPStoreName: dsm.name,
- RPLoc: loc,
- }, bb.Bytes(), false}
- cloc, err := dsm.ds.sendDataRequest(member, request)
- if err == nil {
- return cloc.(uint64), err
- }
- // An error has occured we need to use another member
- if rtype == RTInsert {
- // Cycle through all members and see which one accepts first
- members := distTable.Members()
- lenMembers := len(members)
- for i := 1; i < lenMembers; i++ {
- member = members[(dsm.rrc+i)%lenMembers]
- cloc, nerr := dsm.ds.sendDataRequest(member, request)
- if nerr == nil {
- ret = cloc.(uint64)
- err = nil
- break
- }
- }
- } else {
- // Cycle through all replicating members and see which one accepts first
- // (as long as the cluster is considered operational there must be a
- // replicating member available to accept the request)
- for _, member := range replicatingMembers {
- cloc, nerr := dsm.ds.sendDataRequest(member, request)
- if nerr == nil {
- ret = cloc.(uint64)
- err = nil
- break
- }
- }
- }
- return ret, err
- }
- /*
- Free frees a storage location.
- */
- func (dsm *DistributedStorageManager) Free(loc uint64) error {
- // Do not do anything is the cluster is not operational
- distTable, distTableErr := dsm.ds.DistributionTable()
- if distTableErr != nil {
- return distTableErr
- }
- // Choose the instance this request should be routed to
- member, replicatingMembers := distTable.LocationHome(loc)
- request := &DataRequest{RTFree, map[DataRequestArg]interface{}{
- RPStoreName: dsm.name,
- RPLoc: loc,
- }, nil, false}
- _, err := dsm.ds.sendDataRequest(member, request)
- if err != nil {
- // Cycle through all replicating members and see which one accepts first
- // (as long as the cluster is considered operational there must be a
- // replicating member available to accept the request)
- for _, member := range replicatingMembers {
- _, nerr := dsm.ds.sendDataRequest(member, request)
- if nerr == nil {
- err = nil
- break
- }
- }
- }
- return err
- }
- /*
- Exists checks if an object exists in a given storage location.
- */
- func (dsm *DistributedStorageManager) Exists(loc uint64) (bool, error) {
- var ret bool
- err := dsm.lookupData(loc, &ret, false)
- return ret, err
- }
- /*
- Fetch fetches an object from a given storage location and writes it to
- a given data container.
- */
- func (dsm *DistributedStorageManager) Fetch(loc uint64, o interface{}) error {
- return dsm.lookupData(loc, o, true)
- }
- /*
- lookupData fetches or checks for an object in a given storage location.
- */
- func (dsm *DistributedStorageManager) lookupData(loc uint64, o interface{}, fetch bool) error {
- var rt RequestType
- // Do not do anything if the cluster is not operational
- distTable, distTableErr := dsm.ds.DistributionTable()
- if distTableErr != nil {
- return distTableErr
- }
- // Choose the instance this request should be routed to
- primaryMember, secondaryMembers := distTable.LocationHome(loc)
- if fetch {
- rt = RTFetch
- } else {
- rt = RTExists
- }
- request := &DataRequest{rt, map[DataRequestArg]interface{}{
- RPStoreName: dsm.name,
- RPLoc: loc,
- }, nil, false}
- res, err := dsm.ds.sendDataRequest(primaryMember, request)
- if err != nil || (!fetch && !res.(bool)) {
- // Try secondary members if the primary member failed or the data didn't exist there
- var serr error
- for _, member := range secondaryMembers {
- res, serr = dsm.ds.sendDataRequest(member, request)
- if serr == nil {
- err = nil
- break
- }
- }
- }
- if err == nil {
- if !fetch {
- *o.(*bool) = res.(bool)
- } else {
- gob.NewDecoder(bytes.NewReader(res.([]byte))).Decode(o)
- }
- }
- return err
- }
- /*
- FetchCached is not implemented for a DistributedStorageManager. Only defined to satisfy
- the StorageManager interface.
- */
- func (dsm *DistributedStorageManager) FetchCached(loc uint64) (interface{}, error) {
- return nil, storage.ErrNotInCache
- }
- /*
- Flush is not implemented for a DistributedStorageManager. All changes are immediately
- written to disk in a cluster.
- */
- func (dsm *DistributedStorageManager) Flush() error {
- _, distTableErr := dsm.ds.DistributionTable()
- // Do not do anything if the cluster is not operational
- if distTableErr != nil {
- return distTableErr
- }
- // Increase round robin counter - things which belond together should be
- // stored on the same members
- dsm.rrc++
- return nil
- }
- /*
- Rollback is not implemented for a DistributedStorageManager. All changes are immediately
- written to disk in a cluster.
- */
- func (dsm *DistributedStorageManager) Rollback() error {
- return nil
- }
- /*
- Close is not implemented for a DistributedStorageManager. Only the local storage must
- be closed which is done when the DistributedStore is shut down.
- */
- func (dsm *DistributedStorageManager) Close() error {
- if _, distTableErr := dsm.ds.DistributionTable(); distTableErr != nil {
- return distTableErr
- }
- return dsm.rootError
- }
|