123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package cluster
- import (
- "fmt"
- "strconv"
- "strings"
- "devt.de/krotik/eliasdb/cluster/manager"
- "devt.de/krotik/eliasdb/hash"
- )
- /*
- MaxSizeRebalanceLists is the maximum size for rebalancing lists within one rebalance request.
- */
- const MaxSizeRebalanceLists = 100
- /*
- runRebalanceWorker flag to switch off automatic rebalancing
- */
- var runRebalanceWorker = true
- /*
- logRebalanceWorker flag to write a log message every time automatic rebalancing is running
- */
- var logRebalanceWorker = false
- /*
- rebalanceHousekeepingInterval defines how often housekeeping needs to run before
- a rebalance task is run.
- */
- var rebalanceHousekeepingInterval = 180
- /*
- rebalanceWorker is the background thread which handles automatic rebalancing
- when the configuration of the cluster changes or to autocorrect certain errors.
- */
- func (ms *memberStorage) rebalanceWorker(forceRun bool) {
- // Make sure only one transfer task is running at a time and that
- // subsequent requests are not queued up
- ms.rebalanceLock.Lock()
- if !runRebalanceWorker || ms.rebalanceRunning {
- ms.rebalanceLock.Unlock()
- return
- }
- // Make sure rebalancing only runs every rebalanceHousekeepingInterval
- if !forceRun && ms.rebalanceCounter > 0 {
- ms.rebalanceCounter--
- ms.rebalanceLock.Unlock()
- return
- }
- ms.rebalanceCounter = rebalanceHousekeepingInterval
- ms.rebalanceRunning = true
- ms.rebalanceLock.Unlock()
- defer func() {
- ms.rebalanceLock.Lock()
- ms.rebalanceRunning = false
- ms.rebalanceLock.Unlock()
- }()
- if logRebalanceWorker {
- manager.LogDebug(ms.ds.Name(), "(RB): Running rebalance worker task")
- }
- distTable, err := ms.ds.DistributionTable()
- if err != nil {
- manager.LogDebug(ms.ds.Name(), "(RB): Cannot rebalance not operational cluster: ",
- err.Error())
- return
- }
- // Go through all maintained stuff and collect storage name, location and version
- it := hash.NewHTreeIterator(ms.at.translation)
- for it.HasNext() {
- chunks := MaxSizeRebalanceLists
- maintLocs := make([]uint64, 0, MaxSizeRebalanceLists)
- maintVers := make([]uint64, 0, MaxSizeRebalanceLists)
- maintMgmts := make([]string, 0, MaxSizeRebalanceLists)
- for it.HasNext() || chunks <= 0 {
- key, val := it.Next()
- if tr, ok := val.(*translationRec); ok {
- smname := strings.Split(string(key[len(transPrefix):]), "#")[0]
- cloc, _ := strconv.ParseUint(string(key[len(fmt.Sprint(transPrefix, smname, "#")):]), 10, 64)
- maintMgmts = append(maintMgmts, smname)
- maintLocs = append(maintLocs, cloc)
- maintVers = append(maintVers, tr.Ver)
- }
- }
- // Send info about maintained stuff to all relevant members
- receiverMap := make(map[string]string)
- for _, cloc := range maintLocs {
- primary, replicas := distTable.LocationHome(cloc)
- members := make([]string, 0, len(replicas)+1)
- members = append(members, primary)
- members = append(members, replicas...)
- for _, member := range members {
- _, ok := receiverMap[member]
- if member == ms.ds.MemberManager.Name() || ok {
- continue
- }
- receiverMap[member] = ""
- request := &DataRequest{RTRebalance, map[DataRequestArg]interface{}{
- RPStoreName: maintMgmts,
- RPLoc: maintLocs,
- RPVer: maintVers,
- RPSrc: ms.ds.MemberManager.Name(),
- }, nil, false}
- _, err := ms.ds.sendDataRequest(member, request)
- if err != nil {
- manager.LogDebug(ms.ds.Name(), "(RB): ",
- fmt.Sprintf("Member %v Error: %v", member, err))
- }
- }
- }
- }
- }
|