123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package manager
- import (
- "fmt"
- "strconv"
- "strings"
- )
- /*
- runHouseKeeping flag to switch off automatic start of housekeeping
- */
- var runHousekeeping = true
- /*
- FreqHousekeeping is the frequency of running housekeeping tasks (ms)
- */
- var FreqHousekeeping float64 = 1000
- /*
- logHousekeeping flag to write a log message every time the housekeeping task is running
- */
- var logHousekeeping = false
- /*
- HousekeepingWorker is the background thread which handles various tasks to provide
- "eventual" consistency for the cluster.
- */
- func (mm *MemberManager) HousekeepingWorker() {
- mm.housekeepingLock.Lock()
- defer mm.housekeepingLock.Unlock()
- if mm.StopHousekeeping {
- return
- } else if logHousekeeping {
- LogDebug(mm.name, "(HK): Running housekeeping task")
- }
- // Special function which ensures that the given member is removed from the
- // failed list.
- removeFromFailedState := func(peer string) {
- mm.Client.maplock.Lock()
- defer mm.Client.maplock.Unlock()
- if _, ok := mm.Client.failed[peer]; ok {
- // Remove a member from the failed state list and send an update
- LogDebug(mm.name, "(HK): ",
- fmt.Sprintf("Removing %v from list of failed members", peer))
- delete(mm.Client.failed, peer)
- }
- }
- // Housekeeping will try to talk to all peers
- resolveConflict := false // Flag to resolve a state conflict at the end of a cycle.
- for peer := range mm.Client.peers {
- LogDebug(mm.name, "(HK): ",
- fmt.Sprintf("Housekeeping talking to: %v", peer))
- // Send a ping to the member
- res, err := mm.Client.SendPing(peer, "")
- if err != nil {
- LogDebug(mm.name, "(HK): ",
- fmt.Sprintf("Error pinging %v - %v", peer, err))
- continue
- } else if len(res) == 1 {
- LogDebug(mm.name, "(HK): ",
- fmt.Sprintf("Member %v says this instance is not part of the cluster", peer))
- mm.Client.maplock.Lock()
- mm.Client.failed[peer] = ErrNotMember.Error()
- mm.Client.maplock.Unlock()
- continue
- }
- // Check timestamp on the result and see where this member is:
- peerTsMember := res[1]
- peerTsTS, _ := strconv.ParseInt(res[2], 10, 64)
- peerTsOldMember := res[3]
- peerTsOldTS, _ := strconv.ParseInt(res[4], 10, 64)
- simmTS, _ := mm.stateInfo.Get(StateInfoTS)
- mmTS := simmTS.([]string)
- simmOldTS, _ := mm.stateInfo.Get(StateInfoTSOLD)
- mmOldTS := simmOldTS.([]string)
- mmTsMember := mmTS[0]
- mmTsTS, _ := strconv.ParseInt(mmTS[1], 10, 64)
- mmTsOldMember := mmOldTS[0]
- mmTsOldTS, _ := strconv.ParseInt(mmOldTS[1], 10, 64)
- LogDebug(mm.name, "(HK): ",
- fmt.Sprintf("TS Me : Curr:%v:%v - Old:%v:%v", mmTsMember, mmTsTS, mmTsOldMember, mmTsOldTS))
- LogDebug(mm.name, "(HK): ",
- fmt.Sprintf("TS Peer: Curr:%v:%v - Old:%v:%v", peerTsMember, peerTsTS, peerTsOldMember, peerTsOldTS))
- if peerTsTS > mmTsTS || peerTsMember != mmTsMember {
- // Peer has a newer version
- if peerTsMember == mmTsMember && peerTsOldMember == mmTsMember && peerTsOldTS == mmTsTS {
- // Peer has the next state info version - update the local state info
- sf, err := mm.Client.SendStateInfoRequest(peer)
- if err == nil {
- LogDebug(mm.name, ": Updating state info of member")
- mm.applyStateInfo(sf)
- }
- } else {
- // Peer has a different version - potential conflict send a
- // state update at the end of the cycle
- if sf, err := mm.Client.SendStateInfoRequest(peer); err == nil {
- LogDebug(mm.name, ": Merging members in state infos")
- // Add any newly known cluster members
- mm.applyStateInfoPeers(sf, false)
- resolveConflict = true
- }
- }
- // Remove the member from the failed state list if it is on there
- removeFromFailedState(peer)
- } else if peerTsTS == mmTsTS && peerTsMember == mmTsMember {
- // Peer is up-to-date - check if it is in a failed state list
- removeFromFailedState(peer)
- }
- // We do nothing with members using an outdated cluster state
- // they should update eventually through their own housekeeping
- }
- // Check if there is a new failed members list
- sfFailed, _ := mm.stateInfo.Get(StateInfoFAILED)
- if len(sfFailed.([]string))/2 != len(mm.Client.failed) || resolveConflict {
- LogDebug(mm.name, "(HK): ",
- fmt.Sprintf("Updating other members with current failed members list: %v",
- strings.Join(mm.Client.FailedPeerErrors(), ", ")))
- if err := mm.UpdateClusterStateInfo(); err != nil {
- // Just update local state info if we could not update the peers
- LogDebug(mm.name, "(HK): ",
- fmt.Sprintf("Could not update cluster state: %v", err.Error()))
- mm.updateStateInfo(true)
- }
- }
- // Notify others that housekeeping has finished
- mm.notifyHouseKeeping()
- }
|