memberstorage.go 18 KB


  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package cluster
  11. import (
  12. "bytes"
  13. "encoding/json"
  14. "fmt"
  15. "sort"
  16. "strings"
  17. "sync"
  18. "devt.de/krotik/common/sortutil"
  19. "devt.de/krotik/eliasdb/cluster/manager"
  20. "devt.de/krotik/eliasdb/graph/graphstorage"
  21. "devt.de/krotik/eliasdb/hash"
  22. "devt.de/krotik/eliasdb/storage"
  23. )
  24. /*
  25. ClusterStoragePrefix is the prefix for cluster related storage managers
  26. */
  27. const ClusterStoragePrefix = "cs_"
  28. /*
  29. LocalStoragePrefix is the prefix for local storage managers
  30. */
  31. const LocalStoragePrefix = "ls_"
  32. /*
  33. memberStorage models the local storage of a cluster member. This data structure
  34. is the only thing which has access to the wrapped graphstorage.Storage.
  35. */
  36. type memberStorage struct {
  37. ds *DistributedStorage // Distributed storage which created this member storage
  38. gs graphstorage.Storage // Wrapped graphstorage.Storage
  39. at *memberAddressTable // Address table (cluster location -> local location)
  40. transferLock *sync.Mutex // Lock for the transfer task
  41. transferRunning bool // Flag to indicate that the transfer task is running
  42. rebalanceLock *sync.Mutex // Lock for the rebalance task
  43. rebalanceRunning bool // Flag to indicate that the rebalance task is running
  44. rebalanceCounter int
  45. }
  46. /*
  47. newMemberStorage creates a new memberStorage instance.
  48. */
  49. func newMemberStorage(ds *DistributedStorage, gs graphstorage.Storage) (*memberStorage, error) {
  50. sm := gs.StorageManager("cluster_translation", true)
  51. at, err := newMemberAddressTable(ds, sm)
  52. if err != nil {
  53. return nil, err
  54. }
  55. return &memberStorage{ds, gs, at, &sync.Mutex{}, false, &sync.Mutex{}, false, 0}, nil
  56. }
  57. /*
  58. handleDataRequest deals with RPC requests. It is the only function which is
  59. called by the RPC server of the member manager.
  60. */
  61. func (ms *memberStorage) handleDataRequest(request interface{}, response *interface{}) error {
  62. var err error
  63. // Make sure a request can be served
  64. distTable, distTableErr := ms.at.checkState()
  65. if distTableErr != nil {
  66. return distTableErr
  67. }
  68. dr := request.(*DataRequest)
  69. switch dr.RequestType {
  70. case RTGetMain:
  71. *response = ms.gs.MainDB()
  72. case RTSetMain:
  73. err = ms.handleSetMainRequest(distTable, dr, response)
  74. case RTSetRoot:
  75. err = ms.handleSetRootRequest(distTable, dr, response)
  76. case RTGetRoot:
  77. err = ms.handleGetRootRequest(distTable, dr, response)
  78. case RTInsert:
  79. err = ms.handleInsertRequest(distTable, dr, response)
  80. case RTUpdate:
  81. err = ms.handleUpdateRequest(distTable, dr, response)
  82. case RTFree:
  83. err = ms.handleFreeRequest(distTable, dr, response)
  84. case RTExists:
  85. err = ms.handleFetchRequest(distTable, dr, response, false)
  86. case RTFetch:
  87. err = ms.handleFetchRequest(distTable, dr, response, true)
  88. case RTRebalance:
  89. err = ms.handleRebalanceRequest(distTable, dr, response)
  90. default:
  91. err = fmt.Errorf("Unknown request type")
  92. }
  93. manager.LogDebug(ms.ds.MemberManager.Name(), fmt.Sprintf("(Store): Handled: %v %s (Transfer: %v, Error: %v)",
  94. dr.RequestType, dr.Args, dr.Transfer, err))
  95. return err
  96. }
  97. /*
  98. handleSetMainRequest sets the mainDB on the local storage manager.
  99. */
  100. func (ms *memberStorage) handleSetMainRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
  101. mainDB := ms.gs.MainDB()
  102. newMainDB := request.Value.(map[string]string)
  103. // Update keys and values
  104. for k, v := range newMainDB {
  105. mainDB[k] = v
  106. }
  107. // Check if things should be deleted
  108. var toRemove []string
  109. for k := range mainDB {
  110. if _, ok := newMainDB[k]; !ok {
  111. toRemove = append(toRemove, k)
  112. }
  113. }
  114. for _, k := range toRemove {
  115. delete(mainDB, k)
  116. }
  117. err := ms.gs.FlushMain()
  118. if !request.Transfer {
  119. ms.at.AddTransferRequest(distTable.OtherReplicationMembers(0, ms.ds.MemberManager.Name()),
  120. &DataRequest{RTSetMain, nil, request.Value, true})
  121. }
  122. return err
  123. }
  124. /*
  125. handleGetRootRequest retrieves a root value from a local storage manager.
  126. */
  127. func (ms *memberStorage) handleGetRootRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
  128. dsname := request.Args[RPStoreName].(string)
  129. root := request.Args[RPRoot].(int)
  130. sm := ms.dataStorage(dsname, false)
  131. if sm != nil {
  132. *response = sm.Root(root)
  133. }
  134. return nil
  135. }
  136. /*
  137. handleSetRootRequest sets a new root value in a local storage manager.
  138. */
  139. func (ms *memberStorage) handleSetRootRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
  140. dsname := request.Args[RPStoreName].(string)
  141. root := request.Args[RPRoot].(int)
  142. sm := ms.dataStorage(dsname, true)
  143. sm.SetRoot(root, toUInt64(request.Value))
  144. if !request.Transfer {
  145. ms.at.AddTransferRequest(distTable.OtherReplicationMembers(0, ms.ds.MemberManager.Name()),
  146. &DataRequest{RTSetRoot, request.Args, request.Value, true})
  147. }
  148. return sm.Flush()
  149. }
  150. /*
  151. handleInsertRequest inserts an object and return its cluster storage location.
  152. Distribution procedure:
  153. Client -> Cluster Member Request Receiver
  154. Cluster Member Request Receiver -> Cluster Member Primary Storage (chosen round-robin / available)
  155. Cluster Member Primary Storage writes into its Transfer Table
  156. Cluster Member Primary Storage (Transfer worker) -> Replicating Cluster Members
  157. */
  158. func (ms *memberStorage) handleInsertRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
  159. var err error
  160. var cloc uint64
  161. dsname := request.Args[RPStoreName].(string)
  162. *response = 0
  163. sm := ms.dataStorage(dsname, true)
  164. if !request.Transfer {
  165. // First get a new cluster location (on this member)
  166. cloc, err = ms.at.NewClusterLoc(dsname)
  167. } else {
  168. // If this is a transfer request we know already the cluster location
  169. cloc = toUInt64(request.Args[RPLoc])
  170. }
  171. if err == nil {
  172. var loc uint64
  173. // Insert into the local storage
  174. loc, err = sm.Insert(request.Value)
  175. if err == nil {
  176. // Add a translation
  177. _, _, err = ms.at.SetTransClusterLoc(dsname, cloc, loc, 1)
  178. if err == nil {
  179. if !request.Transfer {
  180. // Add transfer request for replication
  181. // At this point the operation has succeeded. We still need to
  182. // replicate the change to all the replicating members but
  183. // any errors happening during this shall not fail this operation.
  184. // The next rebalancing will then synchronize all members again.
  185. ms.at.AddTransferRequest(distTable.Replicas(ms.ds.MemberManager.Name()),
  186. &DataRequest{RTInsert, map[DataRequestArg]interface{}{
  187. RPStoreName: dsname,
  188. RPLoc: cloc,
  189. }, request.Value, true})
  190. }
  191. *response = cloc
  192. }
  193. }
  194. }
  195. return err
  196. }
  197. /*
  198. handleUpdateRequest updates an object and return its cluster storage location.
  199. There is indeed a chance to produce inconsistencies if members fail in the right
  200. sequence. It is assumed that these will be delt with in the next rebalance.
  201. Distribution procedure:
  202. Client -> Cluster Member Request Receiver
  203. Cluster Member Request Receiver -> Cluster Member Primary Storage or Replicating Cluster Member
  204. Storing Cluster Member does the update and writes into its transfer table
  205. Storing Cluster Member (Transfer worker) -> Replicating / Primary Cluster Members
  206. */
  207. func (ms *memberStorage) handleUpdateRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
  208. var err error
  209. var newVersion uint64
  210. dsname := request.Args[RPStoreName].(string)
  211. cloc := toUInt64(request.Args[RPLoc])
  212. *response = 0
  213. // Get the translation
  214. transRec, ok, err := ms.at.TransClusterLoc(dsname, cloc)
  215. if ok {
  216. sm := ms.dataStorage(dsname, false)
  217. if sm != nil {
  218. // Update the local storage
  219. if !request.Transfer {
  220. err = sm.Update(transRec.Loc, request.Value)
  221. newVersion = transRec.Ver + 1
  222. } else {
  223. newVersion = toUInt64(request.Args[RPVer])
  224. if newVersion >= transRec.Ver {
  225. err = sm.Update(transRec.Loc, request.Value)
  226. } else {
  227. // Outdated update requests are simply ignored
  228. err = fmt.Errorf("Received outdated update request (%v - Location: %v)",
  229. ms.ds.MemberManager.Name(), cloc)
  230. manager.LogDebug(ms.ds.MemberManager.Name(), err.Error())
  231. // Need to return no error so the transfer worker on the
  232. // other side removes its entry
  233. err = nil
  234. }
  235. }
  236. if err == nil {
  237. // Increase the version of the translation record
  238. _, _, err = ms.at.SetTransClusterLoc(dsname, cloc, transRec.Loc, newVersion)
  239. if err == nil {
  240. if !request.Transfer {
  241. // Add transfer request for replication
  242. // At this point the operation has succeeded. We still need to
  243. // replicate the change to all the replicating members but
  244. // any errors happening during this shall not fail this operation.
  245. // The next rebalancing will then synchronize all members again.
  246. ms.at.AddTransferRequest(distTable.OtherReplicationMembers(cloc, ms.ds.MemberManager.Name()),
  247. &DataRequest{RTUpdate, map[DataRequestArg]interface{}{
  248. RPStoreName: dsname,
  249. RPLoc: cloc,
  250. RPVer: newVersion,
  251. }, request.Value, true})
  252. }
  253. *response = cloc
  254. return nil
  255. }
  256. }
  257. }
  258. }
  259. if err == nil {
  260. err = fmt.Errorf("Cluster slot not found (%v - Location: %v)",
  261. ms.ds.MemberManager.Name(), cloc)
  262. }
  263. return err
  264. }
  265. /*
  266. handleFreeRequest removes an object.
  267. Distribution procedure:
  268. Client -> Cluster Member Request Receiver
  269. Cluster Member Request Receiver -> Cluster Member Primary Storage or Replicating Cluster Member
  270. Storing Cluster Member does the free and writes into its transfer table
  271. Storing Cluster Member (Transfer worker) -> Replicating / Primary Cluster Members
  272. */
  273. func (ms *memberStorage) handleFreeRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
  274. var err error
  275. dsname := request.Args[RPStoreName].(string)
  276. cloc := toUInt64(request.Args[RPLoc])
  277. // Get the translation
  278. transRec, ok, err := ms.at.TransClusterLoc(dsname, cloc)
  279. if ok {
  280. sm := ms.dataStorage(dsname, false)
  281. if sm != nil {
  282. // Remove the translation
  283. _, _, err = ms.at.RemoveTransClusterLoc(dsname, cloc)
  284. if err == nil {
  285. // Remove from the local storage
  286. err = sm.Free(transRec.Loc)
  287. if !request.Transfer {
  288. // Add transfer request for replication
  289. // At this point the operation has succeeded. We still need to
  290. // replicate the change to all the replicating members but
  291. // any errors happening during this shall not fail this operation.
  292. // The next rebalancing will then synchronize all members again.
  293. ms.at.AddTransferRequest(distTable.OtherReplicationMembers(cloc, ms.ds.MemberManager.Name()),
  294. &DataRequest{RTFree, map[DataRequestArg]interface{}{
  295. RPStoreName: dsname,
  296. RPLoc: cloc,
  297. }, nil, true})
  298. }
  299. return err
  300. }
  301. }
  302. }
  303. if err == nil {
  304. err = fmt.Errorf("Cluster slot not found (%v - Location: %v)", ms.ds.MemberManager.Name(), cloc)
  305. }
  306. return err
  307. }
  308. /*
  309. handleFetchRequest inserts an object and return its cluster storage location.
  310. */
  311. func (ms *memberStorage) handleFetchRequest(distTable *DistributionTable,
  312. request *DataRequest, response *interface{}, fetch bool) error {
  313. var err error
  314. dsname := request.Args[RPStoreName].(string)
  315. cloc := toUInt64(request.Args[RPLoc])
  316. // Get the translation
  317. transRec, ok, err := ms.at.TransClusterLoc(dsname, cloc)
  318. if ok {
  319. // Check if the data should be retrieved
  320. if !fetch {
  321. *response = true
  322. return nil
  323. }
  324. sm := ms.dataStorage(dsname, false)
  325. if sm != nil {
  326. var res []byte
  327. err = sm.Fetch(transRec.Loc, &res)
  328. if err == nil {
  329. *response = res
  330. return nil
  331. }
  332. }
  333. } else if !fetch {
  334. *response = false
  335. return err
  336. }
  337. if err == nil {
  338. err = fmt.Errorf("Cluster slot not found (%v - Location: %v)", ms.ds.MemberManager.Name(), cloc)
  339. }
  340. return err
  341. }
  342. /*
  343. handleRebalanceRequest processes rebalance requests.
  344. */
  345. func (ms *memberStorage) handleRebalanceRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
  346. var err error
  347. var tr *translationRec
  348. var found bool
  349. var res interface{}
  350. var lloc uint64
  351. handleError := func(err error) {
  352. if err != nil {
  353. manager.LogDebug(ms.ds.MemberManager.Name(), fmt.Sprintf("(Store): Error during rebalancing request handling: %v", err))
  354. }
  355. }
  356. // Get the location ranges for this member and locations which are replicated on this member.
  357. storeRangeStart, storeRangeStop := distTable.MemberRange(ms.ds.MemberManager.Name())
  358. repRangeStart, repRangeStop := distTable.ReplicationRange(ms.ds.MemberManager.Name())
  359. // Get the request data
  360. rsource := request.Args[RPSrc].(string)
  361. smnames := request.Args[RPStoreName]
  362. locs := request.Args[RPLoc]
  363. vers := request.Args[RPVer]
  364. for i, cloc := range locs.([]uint64) {
  365. // Check if there was an error from the previous iteration
  366. handleError(err)
  367. smname := smnames.([]string)[i]
  368. ver := vers.([]uint64)[i]
  369. // Do not proceed if there is an error or if the location is out of
  370. // range of responsibility
  371. notInStoreRange := cloc < storeRangeStart || cloc > storeRangeStop
  372. notInRepRange := cloc < repRangeStart || cloc > repRangeStop
  373. // Check if the location exists in the local storage
  374. tr, found, err = ms.at.TransClusterLoc(smname, cloc)
  375. if err != nil || (notInStoreRange && notInRepRange) {
  376. // Skip the location if there was an error or if this member
  377. // is not relevant for the location in question (either as primary
  378. // storage member or as replica)
  379. continue
  380. }
  381. if found {
  382. // Check if the version is newer and update the local record if it is
  383. if tr.Ver < ver {
  384. // Local record exists and needs to be updated
  385. sm := ms.dataStorage(smname, false)
  386. // Fetch the data from the remote machine
  387. res, err = ms.ds.sendDataRequest(rsource, &DataRequest{RTFetch, map[DataRequestArg]interface{}{
  388. RPStoreName: smname,
  389. RPLoc: cloc,
  390. }, nil, false})
  391. if err == nil {
  392. // Update the local storage
  393. if err = sm.Update(tr.Loc, res); err == nil {
  394. // Update the translation
  395. _, _, err = ms.at.SetTransClusterLoc(smname, cloc, tr.Loc, ver)
  396. manager.LogDebug(ms.ds.MemberManager.Name(),
  397. fmt.Sprintf("(Store): Rebalance updated %v location: %v", smname, cloc))
  398. }
  399. }
  400. }
  401. } else {
  402. // The data on the remote system should be inserted into the local
  403. // datastore.
  404. sm := ms.dataStorage(smname, true)
  405. // Fetch the data from the remote machine
  406. res, err = ms.ds.sendDataRequest(rsource, &DataRequest{RTFetch, map[DataRequestArg]interface{}{
  407. RPStoreName: smname,
  408. RPLoc: cloc,
  409. }, nil, false})
  410. if err == nil {
  411. // Insert into the local storage
  412. lloc, err = sm.Insert(res)
  413. if err == nil {
  414. // Add a translation
  415. _, _, err = ms.at.SetTransClusterLoc(smname, cloc, lloc, ver)
  416. manager.LogDebug(ms.ds.MemberManager.Name(),
  417. fmt.Sprintf("(Store): Rebalance inserted %v location: %v", smname, cloc))
  418. }
  419. }
  420. }
  421. if err == nil {
  422. // Should the sender have the data
  423. sourceSRangeStart, sourceSRangeStop := distTable.MemberRange(rsource)
  424. sourceRRangeStart, sourceRRangeStop := distTable.ReplicationRange(rsource)
  425. notInSourceSRange := cloc < sourceSRangeStart || cloc > sourceSRangeStop
  426. notInSourceRRange := cloc < sourceRRangeStart || cloc > sourceRRangeStop
  427. if notInSourceSRange && notInSourceRRange {
  428. manager.LogDebug(ms.ds.MemberManager.Name(),
  429. fmt.Sprintf("(Store): Rebalance removes %v location: %v from member %v",
  430. smname, tr.Loc, rsource))
  431. _, err = ms.ds.sendDataRequest(rsource, &DataRequest{RTFree, map[DataRequestArg]interface{}{
  432. RPStoreName: smname,
  433. RPLoc: cloc,
  434. }, nil, true})
  435. }
  436. }
  437. }
  438. handleError(err)
  439. return nil
  440. }
  441. /*
  442. dataStorage returns a storage.StorageManager which will only store byte slices.
  443. */
  444. func (ms *memberStorage) dataStorage(dsname string, create bool) storage.Manager {
  445. return ms.gs.StorageManager(LocalStoragePrefix+dsname, create)
  446. }
  447. /*
  448. dump dumps the contents of a particular member storage manager as escaped strings.
  449. (Works only for MemoryStorageManagers.)
  450. */
  451. func (ms *memberStorage) dump(smname string) string {
  452. var res string
  453. printTransferTable := func(buf *bytes.Buffer) {
  454. // Go through the transfer table and see if there is anything
  455. it := hash.NewHTreeIterator(ms.at.transfer)
  456. for it.HasNext() {
  457. _, val := it.Next()
  458. if val != nil {
  459. tr := val.(*transferRec)
  460. args, _ := json.Marshal(tr.Request.Args)
  461. vals, ok := tr.Request.Value.([]byte)
  462. if !ok {
  463. vals, _ = json.Marshal(tr.Request.Value)
  464. }
  465. buf.WriteString(fmt.Sprintf("transfer: %v - %v %v %q\n",
  466. tr.Members, tr.Request.RequestType, string(args), vals))
  467. }
  468. }
  469. }
  470. if smname == "" {
  471. // Dump the contents of the MainDB if no name is given
  472. buf := new(bytes.Buffer)
  473. buf.WriteString(fmt.Sprintf("%v MemberStorageManager MainDB\n",
  474. ms.ds.MemberManager.Name()))
  475. var keys []string
  476. for k := range ms.gs.MainDB() {
  477. keys = append(keys, k)
  478. }
  479. sort.Strings(keys)
  480. // Output local storage content with mapped cluster locations
  481. for _, k := range keys {
  482. v := ms.gs.MainDB()[k]
  483. buf.WriteString(fmt.Sprintf("%v - %q\n", k, v))
  484. }
  485. printTransferTable(buf)
  486. return buf.String()
  487. }
  488. sm := ms.dataStorage(smname, false)
  489. if sm != nil {
  490. // Make sure the storage manager is a MemoryStorageManager
  491. if msm, ok := sm.(*storage.MemoryStorageManager); ok {
  492. // Get all stored cluster locations
  493. locmap := make(map[uint64]string)
  494. it := hash.NewHTreeIterator(ms.at.translation)
  495. for it.HasNext() {
  496. k, v := it.Next()
  497. key := string(k)
  498. if strings.HasPrefix(key, fmt.Sprint(transPrefix, smname, "#")) {
  499. key = string(key[len(fmt.Sprint(transPrefix, smname, "#")):])
  500. locmap[v.(*translationRec).Loc] = fmt.Sprintf("%v (v:%v)",
  501. key, v.(*translationRec).Ver)
  502. }
  503. }
  504. buf := new(bytes.Buffer)
  505. buf.WriteString(fmt.Sprintf("%v MemberStorageManager %v\n",
  506. ms.ds.MemberManager.Name(), msm.Name()))
  507. buf.WriteString("Roots: ")
  508. // Go through root values
  509. for i := 0; i < 10; i++ {
  510. rootVal := msm.Root(i)
  511. buf.WriteString(fmt.Sprintf("%v=%v ", i, rootVal))
  512. }
  513. buf.WriteString("\n")
  514. var keys []uint64
  515. for k := range msm.Data {
  516. keys = append(keys, k)
  517. }
  518. sortutil.UInt64s(keys)
  519. // Output local storage content with mapped cluster locations
  520. for _, k := range keys {
  521. v := msm.Data[k]
  522. caddr := locmap[k]
  523. buf.WriteString(fmt.Sprintf("cloc: %v - lloc: %v - %q\n",
  524. caddr, k, v))
  525. }
  526. printTransferTable(buf)
  527. res = buf.String()
  528. }
  529. }
  530. return res
  531. }