memberaddresstable.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package cluster
  11. import (
  12. "encoding/gob"
  13. "errors"
  14. "fmt"
  15. "sync"
  16. "time"
  17. "devt.de/krotik/common/timeutil"
  18. "devt.de/krotik/eliasdb/cluster/manager"
  19. "devt.de/krotik/eliasdb/hash"
  20. "devt.de/krotik/eliasdb/storage"
  21. )
  22. func init() {
  23. // Make sure we can use the relevant types in a gob operation
  24. gob.Register(&translationRec{})
  25. gob.Register(&transferRec{})
  26. }
  27. /*
  28. rootIDTranslationTree is the root id for the translation map
  29. */
  30. const rootIDTranslationTree = 2
  31. /*
  32. rootIDTransferTree is the root id for the transfer map
  33. */
  34. const rootIDTransferTree = 3
  35. /*
  36. transPrefix is the prefix for translation entries (cluster location -> local location)
  37. */
  38. const transPrefix = "t"
  39. /*
  40. rangePrefix is the prefix for range counters
  41. */
  42. const newlocPrefix = "n"
  43. /*
  44. translationRec is a translation record which stores a local storage location with a
  45. version number.
  46. */
  47. type translationRec struct {
  48. Loc uint64 // Local storage location
  49. Ver uint64 // Version of the local stored data
  50. }
  51. /*
  52. transferRec is a transfer record which stores a data transfer request.
  53. */
  54. type transferRec struct {
  55. Members []string // Target members
  56. Request *DataRequest // Data request
  57. }
  58. /*
  59. memberAddressTable is used by a memberStorage to manage cluster locations and their link to
  60. local locations.
  61. */
  62. type memberAddressTable struct {
  63. ds *DistributedStorage // Related distribution storage
  64. sm storage.Manager // Storage manager which stores this translation table
  65. translation *hash.HTree // Tree which stores the translation table (cluster location -> physical location)
  66. transfer *hash.HTree // Tree which stores the transfer table
  67. newlocCounters map[string]uint64 // Cached counter values to create new cluster locations
  68. newlocCounterLock *sync.Mutex // Lock for cached counter values
  69. }
  70. /*
  71. newMemberAddressTable creates a new member address table.
  72. */
  73. func newMemberAddressTable(ds *DistributedStorage, sm storage.Manager) (*memberAddressTable, error) {
  74. var err error
  75. var translation, transfer *hash.HTree
  76. var ret *memberAddressTable
  77. translation, err = getHtree(rootIDTranslationTree, sm)
  78. if err == nil {
  79. transfer, err = getHtree(rootIDTransferTree, sm)
  80. if err == nil {
  81. err = sm.Flush()
  82. if err == nil {
  83. ret = &memberAddressTable{ds, sm, translation, transfer, make(map[string]uint64), &sync.Mutex{}}
  84. }
  85. }
  86. }
  87. return ret, err
  88. }
  89. /*
  90. NewClusterLoc returns a new cluster location for a given storage manager.
  91. */
  92. func (mat *memberAddressTable) NewClusterLoc(dsname string) (uint64, error) {
  93. var ret uint64
  94. var err error
  95. var dsm *DistributedStorageManager
  96. // Check member is operational
  97. distTable, distTableErr := mat.checkState()
  98. if distTableErr != nil {
  99. return 0, distTableErr
  100. }
  101. // Get the location range which is allowed
  102. rangeStart, rangeStop := distTable.MemberRange(mat.ds.MemberManager.Name())
  103. // Get counter
  104. newLocCounter, _, _ := mat.newlocCounter(dsname)
  105. // Check that rangeCounter is sensible
  106. if newLocCounter < rangeStart {
  107. newLocCounter = rangeStart
  108. }
  109. // Get a StorageManager instance if required
  110. if newLocCounter == rangeStart {
  111. dsm = mat.ds.StorageManager(dsname, true).(*DistributedStorageManager)
  112. }
  113. locExists := func(dsname string, candidate uint64) (bool, error) {
  114. // We might be a new member - check with other members if we are at the start
  115. // of our range
  116. if newLocCounter == rangeStart {
  117. ok, err := dsm.Exists(candidate)
  118. if err != nil || ok {
  119. return err == nil && ok, err
  120. }
  121. }
  122. return mat.translation.Exists(transKey(dsname, candidate))
  123. }
  124. candidate := newLocCounter
  125. ok, err := locExists(dsname, candidate)
  126. if err == nil {
  127. if ok {
  128. // Candidate exists - search for a better one
  129. var i uint64
  130. for i = rangeStart; i <= rangeStop; i++ {
  131. ok, err = locExists(dsname, i)
  132. if err == nil && !ok && i != 0 {
  133. ret = i
  134. goto SearchResult
  135. } else if err != nil {
  136. goto SearchResult
  137. }
  138. }
  139. err = errors.New("Could not find any free storage location on this member")
  140. SearchResult:
  141. } else {
  142. // Candidate does not exist - it is a new location
  143. ret = candidate
  144. }
  145. }
  146. // At this point we either have an error or a valid location in ret
  147. if err == nil {
  148. newLocCounter = ret + 1
  149. if newLocCounter > rangeStop {
  150. // Reset range counter - next time we test which if there is anything
  151. // left in this range
  152. newLocCounter = 1
  153. }
  154. mat.setNewlocCounter(dsname, newLocCounter)
  155. mat.sm.Flush()
  156. }
  157. return ret, err
  158. }
  159. /*
  160. AddTransferRequest adds a data transfer request which can be picked up by the transferWorker.
  161. */
  162. func (mat *memberAddressTable) AddTransferRequest(targetMembers []string, request *DataRequest) {
  163. // Get a unique key for the transfer request
  164. key := timeutil.MakeTimestamp()
  165. ex, err := mat.transfer.Exists([]byte(key))
  166. for ex && err == nil {
  167. key = timeutil.MakeTimestamp()
  168. time.Sleep(time.Millisecond)
  169. ex, err = mat.transfer.Exists([]byte(key))
  170. }
  171. // Store the transfer request
  172. if err == nil {
  173. _, err := mat.transfer.Put([]byte(key), &transferRec{targetMembers, request})
  174. if err == nil {
  175. mat.sm.Flush()
  176. }
  177. }
  178. if request != nil {
  179. ts, _ := timeutil.TimestampString(string(key), "UTC")
  180. manager.LogDebug(mat.ds.Name(), "(Store): ",
  181. fmt.Sprintf("Added transfer request %v (Error: %v) to %v from %v",
  182. request.RequestType, err, targetMembers, ts))
  183. }
  184. }
  185. /*
  186. TransClusterLoc translates a cluster location to a local location. Returns the translated
  187. location, a flag if the location was found and lookup errors.
  188. */
  189. func (mat *memberAddressTable) TransClusterLoc(dsname string, clusterLoc uint64) (*translationRec, bool, error) {
  190. v, err := mat.translation.Get(transKey(dsname, clusterLoc))
  191. if v == nil {
  192. return nil, false, err
  193. }
  194. return v.(*translationRec), true, err
  195. }
  196. /*
  197. SetTransClusterLoc adds a translation from a cluster location to a local location. Returns the
  198. previously stored translated location, a flag if the location was found and errors.
  199. */
  200. func (mat *memberAddressTable) SetTransClusterLoc(dsname string, clusterLoc uint64,
  201. localLoc uint64, localVer uint64) (*translationRec, bool, error) {
  202. v, err := mat.translation.Put(transKey(dsname, clusterLoc), &translationRec{localLoc, localVer})
  203. if err == nil {
  204. mat.sm.Flush()
  205. }
  206. if v == nil {
  207. return nil, false, err
  208. }
  209. return v.(*translationRec), true, err
  210. }
  211. /*
  212. RemoveTransClusterLoc removes a translation of a cluster location. Returns the
  213. previously stored translated location, a flag if the location was found and errors.
  214. */
  215. func (mat *memberAddressTable) RemoveTransClusterLoc(dsname string, clusterLoc uint64) (*translationRec, bool, error) {
  216. v, err := mat.translation.Remove(transKey(dsname, clusterLoc))
  217. if err == nil {
  218. mat.sm.Flush()
  219. }
  220. if v == nil {
  221. return nil, false, err
  222. }
  223. return v.(*translationRec), true, err
  224. }
  225. /*
  226. Check the state of cluster member. Return an error if the member is not
  227. operational.
  228. */
  229. func (mat *memberAddressTable) checkState() (*DistributionTable, error) {
  230. distTable, distTableErr := mat.ds.DistributionTable()
  231. if distTableErr != nil {
  232. return nil, fmt.Errorf("Storage is currently disabled on member: %v (%v)",
  233. mat.ds.MemberManager.Name(), distTableErr)
  234. }
  235. return distTable, nil
  236. }
  237. // Helper functions
  238. // ================
  239. /*
  240. newlocCounter returns the location counter for a given storage manager. Returns the translated
  241. location, a flag if the location was found and lookup errors.
  242. */
  243. func (mat *memberAddressTable) newlocCounter(dsname string) (uint64, bool, error) {
  244. // Try to get the counter from the cache
  245. mat.newlocCounterLock.Lock()
  246. cv, ok := mat.newlocCounters[dsname]
  247. mat.newlocCounterLock.Unlock()
  248. if ok {
  249. return cv, true, nil
  250. }
  251. // Lookup the counter
  252. v, err := mat.translation.Get(newlocCounterKey(dsname))
  253. if v == nil {
  254. return 1, false, err
  255. }
  256. ret := toUInt64(v)
  257. // Store counter in the cache
  258. mat.newlocCounterLock.Lock()
  259. mat.newlocCounters[dsname] = ret
  260. mat.newlocCounterLock.Unlock()
  261. return ret, true, err
  262. }
  263. /*
  264. setNewlocCounter sets a location counter for a given storage manager.
  265. */
  266. func (mat *memberAddressTable) setNewlocCounter(dsname string, counter uint64) error {
  267. // Store counter in the cache and HTree
  268. mat.newlocCounterLock.Lock()
  269. mat.newlocCounters[dsname] = counter
  270. mat.newlocCounterLock.Unlock()
  271. _, err := mat.translation.Put(newlocCounterKey(dsname), counter)
  272. return err
  273. }
  274. /*
  275. newlocCounterKey returns the counter key for a given storage manager.
  276. */
  277. func newlocCounterKey(dsname string) []byte {
  278. return []byte(fmt.Sprint(newlocPrefix, dsname))
  279. }
  280. /*
  281. transKey returns the translation map lookup key for a given cluster location and storage manager.
  282. */
  283. func transKey(dsname string, loc uint64) []byte {
  284. return []byte(fmt.Sprint(transPrefix, dsname, "#", loc))
  285. }
  286. /*
  287. getHtree returns a HTree from a given storage.Manager with a given root ID.
  288. */
  289. func getHtree(rootID int, sm storage.Manager) (*hash.HTree, error) {
  290. var htree *hash.HTree
  291. var err error
  292. loc := sm.Root(rootID)
  293. if loc == 0 {
  294. // Create a new HTree and store its location
  295. htree, err = hash.NewHTree(sm)
  296. if err == nil {
  297. // Make sure the new root id is persisted
  298. sm.SetRoot(rootID, htree.Location())
  299. }
  300. } else {
  301. // Load existing HTree
  302. htree, err = hash.LoadHTree(sm, loc)
  303. }
  304. return htree, err
  305. }