| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 | /* * EliasDB * * Copyright 2016 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */package clusterimport (	"fmt"	"devt.de/krotik/common/timeutil"	"devt.de/krotik/eliasdb/cluster/manager"	"devt.de/krotik/eliasdb/hash")/*runTransferWorker flag to switch off transfer record processing*/var runTransferWorker = true/*logTransferWorker flag to write a log message every time the transfer worker task is running*/var logTransferWorker = false/*transferWorker is the background thread which handles various tasks to provide"eventual" consistency for the cluster storage.*/func (ms *memberStorage) transferWorker() {	// Make sure only one transfer task is running at a time and that	// subsequent requests are not queued up	ms.transferLock.Lock()	if !runTransferWorker || ms.transferRunning {		ms.transferLock.Unlock()		return	}	ms.transferRunning = true	ms.transferLock.Unlock()	defer func() {		ms.transferLock.Lock()		ms.transferRunning = false		ms.transferLock.Unlock()	}()	if logTransferWorker {		manager.LogDebug(ms.ds.Name(), "(TR): Running transfer worker task")	}	// Go through the transfer table and try to process the tasks	var processed [][]byte	it := hash.NewHTreeIterator(ms.at.transfer)	for it.HasNext() {		key, val := it.Next()		if val != nil {			tr := val.(*transferRec)			ts, _ := timeutil.TimestampString(string(key), "UTC")			manager.LogDebug(ms.ds.Name(), "(TR): ",				fmt.Sprintf("Processing transfer request %v for %v from %v",					tr.Request.RequestType, tr.Members, ts))			// Send the request to all members			var failedMembers []string			for _, member := range tr.Members {				if _, err := ms.ds.sendDataRequest(member, tr.Request); err != nil {					manager.LogDebug(ms.ds.Name(), "(TR): ",						fmt.Sprintf("Member %v Error: %v", member, err))					failedMembers = append(failedMembers, member)				}			}			// Update or remove the translation record			if len(failedMembers) == 0 {				processed = append(processed, key)			} else if len(failedMembers) < len(tr.Members) {				tr.Members = failedMembers				ms.at.transfer.Put(key, tr)			}		}	}	// Remove all processed transfer requests	for _, key := range processed {		ms.at.transfer.Remove(key)	}	// Flush the local storage	ms.gs.FlushAll()	// Trigger the rebalancing task - the task will only execute if it is time	go ms.rebalanceWorker(false)}
 |