transfer.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package cluster
  11. import (
  12. "fmt"
  13. "devt.de/krotik/common/timeutil"
  14. "devt.de/krotik/eliasdb/cluster/manager"
  15. "devt.de/krotik/eliasdb/hash"
  16. )
  17. /*
  18. runTransferWorker flag to switch off transfer record processing
  19. */
  20. var runTransferWorker = true
  21. /*
  22. logTransferWorker flag to write a log message every time the transfer worker task is running
  23. */
  24. var logTransferWorker = false
  25. /*
  26. transferWorker is the background thread which handles various tasks to provide
  27. "eventual" consistency for the cluster storage.
  28. */
  29. func (ms *memberStorage) transferWorker() {
  30. // Make sure only one transfer task is running at a time and that
  31. // subsequent requests are not queued up
  32. ms.transferLock.Lock()
  33. if !runTransferWorker || ms.transferRunning {
  34. ms.transferLock.Unlock()
  35. return
  36. }
  37. ms.transferRunning = true
  38. ms.transferLock.Unlock()
  39. defer func() {
  40. ms.transferLock.Lock()
  41. ms.transferRunning = false
  42. ms.transferLock.Unlock()
  43. }()
  44. if logTransferWorker {
  45. manager.LogDebug(ms.ds.Name(), "(TR): Running transfer worker task")
  46. }
  47. // Go through the transfer table and try to process the tasks
  48. var processed [][]byte
  49. it := hash.NewHTreeIterator(ms.at.transfer)
  50. for it.HasNext() {
  51. key, val := it.Next()
  52. if val != nil {
  53. tr := val.(*transferRec)
  54. ts, _ := timeutil.TimestampString(string(key), "UTC")
  55. manager.LogDebug(ms.ds.Name(), "(TR): ",
  56. fmt.Sprintf("Processing transfer request %v for %v from %v",
  57. tr.request.RequestType, tr.members, ts))
  58. // Send the request to all members
  59. var failedMembers []string
  60. for _, member := range tr.members {
  61. if _, err := ms.ds.sendDataRequest(member, tr.request); err != nil {
  62. manager.LogDebug(ms.ds.Name(), "(TR): ",
  63. fmt.Sprintf("Member %v Error: %v", member, err))
  64. failedMembers = append(failedMembers, member)
  65. }
  66. }
  67. // Update or remove the translation record
  68. if len(failedMembers) == 0 {
  69. processed = append(processed, key)
  70. } else if len(failedMembers) < len(tr.members) {
  71. tr.members = failedMembers
  72. ms.at.transfer.Put(key, tr)
  73. }
  74. }
  75. }
  76. // Remove all processed transfer requests
  77. for _, key := range processed {
  78. ms.at.transfer.Remove(key)
  79. }
  80. // Flush the local storage
  81. ms.gs.FlushAll()
  82. // Trigger the rebalancing task - the task will only execute if it is time
  83. go ms.rebalanceWorker(false)
  84. }