memberaddresstable.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package cluster
  11. import (
  12. "errors"
  13. "fmt"
  14. "sync"
  15. "time"
  16. "devt.de/krotik/common/timeutil"
  17. "devt.de/krotik/eliasdb/cluster/manager"
  18. "devt.de/krotik/eliasdb/hash"
  19. "devt.de/krotik/eliasdb/storage"
  20. )
  21. /*
  22. rootIDTranslationTree is the root id for the translation map
  23. */
  24. const rootIDTranslationTree = 2
  25. /*
  26. rootIDTransferTree is the root id for the transfer map
  27. */
  28. const rootIDTransferTree = 3
  29. /*
  30. transPrefix is the prefix for translation entries (cluster location -> local location)
  31. */
  32. const transPrefix = "t"
  33. /*
  34. rangePrefix is the prefix for range counters
  35. */
  36. const newlocPrefix = "n"
  37. /*
  38. translationRec is a translation record which stores a local storage location with a
  39. version number.
  40. */
  41. type translationRec struct {
  42. loc uint64 // Local storage location
  43. ver uint64 // Version of the local stored data
  44. }
  45. /*
  46. transferRec is a transfer record which stores a data transfer request.
  47. */
  48. type transferRec struct {
  49. members []string // Target members
  50. request *DataRequest // Data request
  51. }
  52. /*
  53. memberAddressTable is used by a memberStorage to manage cluster locations and their link to
  54. local locations.
  55. */
  56. type memberAddressTable struct {
  57. ds *DistributedStorage // Related distribution storage
  58. sm storage.Manager // Storage manager which stores this translation table
  59. translation *hash.HTree // Tree which stores the translation table (cluster location -> physical location)
  60. transfer *hash.HTree // Tree which stores the transfer table
  61. newlocCounters map[string]uint64 // Cached counter values to create new cluster locations
  62. newlocCounterLock *sync.Mutex // Lock for cached counter values
  63. }
  64. /*
  65. newMemberAddressTable creates a new member address table.
  66. */
  67. func newMemberAddressTable(ds *DistributedStorage, sm storage.Manager) (*memberAddressTable, error) {
  68. var err error
  69. var translation, transfer *hash.HTree
  70. var ret *memberAddressTable
  71. translation, err = getHtree(rootIDTranslationTree, sm)
  72. if err == nil {
  73. transfer, err = getHtree(rootIDTransferTree, sm)
  74. if err == nil {
  75. err = sm.Flush()
  76. if err == nil {
  77. ret = &memberAddressTable{ds, sm, translation, transfer, make(map[string]uint64), &sync.Mutex{}}
  78. }
  79. }
  80. }
  81. return ret, err
  82. }
  83. /*
  84. NewClusterLoc returns a new cluster location for a given storage manager.
  85. */
  86. func (mat *memberAddressTable) NewClusterLoc(dsname string) (uint64, error) {
  87. var ret uint64
  88. var err error
  89. var dsm *DistributedStorageManager
  90. // Check member is operational
  91. distTable, distTableErr := mat.checkState()
  92. if distTableErr != nil {
  93. return 0, distTableErr
  94. }
  95. // Get the location range which is allowed
  96. rangeStart, rangeStop := distTable.MemberRange(mat.ds.MemberManager.Name())
  97. // Get counter
  98. newLocCounter, _, _ := mat.newlocCounter(dsname)
  99. // Check that rangeCounter is sensible
  100. if newLocCounter < rangeStart {
  101. newLocCounter = rangeStart
  102. }
  103. // Get a StorageManager instance if required
  104. if newLocCounter == rangeStart {
  105. dsm = mat.ds.StorageManager(dsname, true).(*DistributedStorageManager)
  106. }
  107. locExists := func(dsname string, candidate uint64) (bool, error) {
  108. // We might be a new member - check with other members if we are at the start
  109. // of our range
  110. if newLocCounter == rangeStart {
  111. ok, err := dsm.Exists(candidate)
  112. if err != nil || ok {
  113. return err == nil && ok, err
  114. }
  115. }
  116. return mat.translation.Exists(transKey(dsname, candidate))
  117. }
  118. candidate := newLocCounter
  119. ok, err := locExists(dsname, candidate)
  120. if err == nil {
  121. if ok {
  122. // Candidate exists - search for a better one
  123. var i uint64
  124. for i = rangeStart; i <= rangeStop; i++ {
  125. ok, err = locExists(dsname, i)
  126. if err == nil && !ok {
  127. ret = i
  128. goto SearchResult
  129. } else if err != nil {
  130. goto SearchResult
  131. }
  132. }
  133. err = errors.New("Could not find any free storage location on this member")
  134. SearchResult:
  135. } else {
  136. // Candidate does not exist - it is a new location
  137. ret = candidate
  138. }
  139. }
  140. // At this point we either have an error or a valid location in ret
  141. if err == nil {
  142. newLocCounter = ret + 1
  143. if newLocCounter > rangeStop {
  144. // Reset range counter - next time we test which if there is anything
  145. // left in this range
  146. newLocCounter = 0
  147. }
  148. mat.setNewlocCounter(dsname, newLocCounter)
  149. mat.sm.Flush()
  150. }
  151. return ret, err
  152. }
  153. /*
  154. AddTransferRequest adds a data transfer request which can be picked up by the transferWorker.
  155. */
  156. func (mat *memberAddressTable) AddTransferRequest(targetMembers []string, request *DataRequest) {
  157. // Get a unique key for the transfer request
  158. key := timeutil.MakeTimestamp()
  159. ex, err := mat.transfer.Exists([]byte(key))
  160. for ex && err == nil {
  161. key = timeutil.MakeTimestamp()
  162. time.Sleep(time.Millisecond)
  163. ex, err = mat.transfer.Exists([]byte(key))
  164. }
  165. // Store the transfer request
  166. if err == nil {
  167. _, err := mat.transfer.Put([]byte(key), &transferRec{targetMembers, request})
  168. if err == nil {
  169. mat.sm.Flush()
  170. }
  171. }
  172. if request != nil {
  173. ts, _ := timeutil.TimestampString(string(key), "UTC")
  174. manager.LogDebug(mat.ds.Name(), "(Store): ",
  175. fmt.Sprintf("Added transfer request %v (Error: %v) to %v from %v",
  176. request.RequestType, err, targetMembers, ts))
  177. }
  178. }
  179. /*
  180. TransClusterLoc translates a cluster location to a local location. Returns the translated
  181. location, a flag if the location was found and lookup errors.
  182. */
  183. func (mat *memberAddressTable) TransClusterLoc(dsname string, clusterLoc uint64) (*translationRec, bool, error) {
  184. v, err := mat.translation.Get(transKey(dsname, clusterLoc))
  185. if v == nil {
  186. return nil, false, err
  187. }
  188. return v.(*translationRec), true, err
  189. }
  190. /*
  191. SetTransClusterLoc adds a translation from a cluster location to a local location. Returns the
  192. previously stored translated location, a flag if the location was found and errors.
  193. */
  194. func (mat *memberAddressTable) SetTransClusterLoc(dsname string, clusterLoc uint64,
  195. localLoc uint64, localVer uint64) (*translationRec, bool, error) {
  196. v, err := mat.translation.Put(transKey(dsname, clusterLoc), &translationRec{localLoc, localVer})
  197. if err == nil {
  198. mat.sm.Flush()
  199. }
  200. if v == nil {
  201. return nil, false, err
  202. }
  203. return v.(*translationRec), true, err
  204. }
  205. /*
  206. RemoveTransClusterLoc removes a translation of a cluster location. Returns the
  207. previously stored translated location, a flag if the location was found and errors.
  208. */
  209. func (mat *memberAddressTable) RemoveTransClusterLoc(dsname string, clusterLoc uint64) (*translationRec, bool, error) {
  210. v, err := mat.translation.Remove(transKey(dsname, clusterLoc))
  211. if err == nil {
  212. mat.sm.Flush()
  213. }
  214. if v == nil {
  215. return nil, false, err
  216. }
  217. return v.(*translationRec), true, err
  218. }
  219. /*
  220. Check the state of cluster member. Return an error if the member is not
  221. operational.
  222. */
  223. func (mat *memberAddressTable) checkState() (*DistributionTable, error) {
  224. distTable, distTableErr := mat.ds.DistributionTable()
  225. if distTableErr != nil {
  226. return nil, fmt.Errorf("Storage is currently disabled on member: %v (%v)",
  227. mat.ds.MemberManager.Name(), distTableErr)
  228. }
  229. return distTable, nil
  230. }
  231. // Helper functions
  232. // ================
  233. /*
  234. newlocCounter returns the location counter for a given storage manager. Returns the translated
  235. location, a flag if the location was found and lookup errors.
  236. */
  237. func (mat *memberAddressTable) newlocCounter(dsname string) (uint64, bool, error) {
  238. // Try to get the counter from the cache
  239. mat.newlocCounterLock.Lock()
  240. cv, ok := mat.newlocCounters[dsname]
  241. mat.newlocCounterLock.Unlock()
  242. if ok {
  243. return cv, true, nil
  244. }
  245. // Lookup the counter
  246. v, err := mat.translation.Get(newlocCounterKey(dsname))
  247. if v == nil {
  248. return 0, false, err
  249. }
  250. ret := v.(uint64)
  251. // Store counter in the cache
  252. mat.newlocCounterLock.Lock()
  253. mat.newlocCounters[dsname] = ret
  254. mat.newlocCounterLock.Unlock()
  255. return ret, true, err
  256. }
  257. /*
  258. setNewlocCounter sets a location counter for a given storage manager.
  259. */
  260. func (mat *memberAddressTable) setNewlocCounter(dsname string, counter uint64) error {
  261. // Store counter in the cache and HTree
  262. mat.newlocCounterLock.Lock()
  263. mat.newlocCounters[dsname] = counter
  264. mat.newlocCounterLock.Unlock()
  265. _, err := mat.translation.Put(newlocCounterKey(dsname), counter)
  266. return err
  267. }
  268. /*
  269. newlocCounterKey returns the counter key for a given storage manager.
  270. */
  271. func newlocCounterKey(dsname string) []byte {
  272. return []byte(fmt.Sprint(newlocPrefix, dsname))
  273. }
  274. /*
  275. transKey returns the translation map lookup key for a given cluster location and storage manager.
  276. */
  277. func transKey(dsname string, loc uint64) []byte {
  278. return []byte(fmt.Sprint(transPrefix, dsname, "#", loc))
  279. }
  280. /*
  281. getHtree returns a HTree from a given storage.Manager with a given root ID.
  282. */
  283. func getHtree(rootID int, sm storage.Manager) (*hash.HTree, error) {
  284. var htree *hash.HTree
  285. var err error
  286. loc := sm.Root(rootID)
  287. if loc == 0 {
  288. // Create a new HTree and store its location
  289. htree, err = hash.NewHTree(sm)
  290. if err == nil {
  291. // Make sure the new root id is persisted
  292. sm.SetRoot(rootID, htree.Location())
  293. }
  294. } else {
  295. // Load existing HTree
  296. htree, err = hash.LoadHTree(sm, loc)
  297. }
  298. return htree, err
  299. }