memberstorage.go 18 KB


  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package cluster
  11. import (
  12. "bytes"
  13. "encoding/json"
  14. "fmt"
  15. "sort"
  16. "strings"
  17. "sync"
  18. "devt.de/krotik/common/sortutil"
  19. "devt.de/krotik/eliasdb/cluster/manager"
  20. "devt.de/krotik/eliasdb/graph/graphstorage"
  21. "devt.de/krotik/eliasdb/hash"
  22. "devt.de/krotik/eliasdb/storage"
  23. )
  24. /*
  25. ClusterStoragePrefix is the prefix for cluster related storage managers
  26. */
  27. const ClusterStoragePrefix = "cs_"
  28. /*
  29. LocalStoragePrefix is the prefix for local storage managers
  30. */
  31. const LocalStoragePrefix = "ls_"
  32. /*
  33. memberStorage models the local storage of a cluster member. This data structure
  34. is the only thing which has access to the wrapped graphstorage.Storage.
  35. */
  36. type memberStorage struct {
  37. ds *DistributedStorage // Distributed storage which created this member storage
  38. gs graphstorage.Storage // Wrapped graphstorage.Storage
  39. at *memberAddressTable // Address table (cluster location -> local location)
  40. transferLock *sync.Mutex // Lock for the transfer task
  41. transferRunning bool // Flag to indicate that the transfer task is running
  42. rebalanceLock *sync.Mutex // Lock for the rebalance task
  43. rebalanceRunning bool // Flag to indicate that the rebalance task is running
  44. rebalanceCounter int
  45. }
  46. /*
  47. newMemberStorage creates a new memberStorage instance.
  48. */
  49. func newMemberStorage(ds *DistributedStorage, gs graphstorage.Storage) (*memberStorage, error) {
  50. sm := gs.StorageManager("cluster_translation", true)
  51. at, err := newMemberAddressTable(ds, sm)
  52. if err != nil {
  53. return nil, err
  54. }
  55. return &memberStorage{ds, gs, at, &sync.Mutex{}, false, &sync.Mutex{}, false, 0}, nil
  56. }
  57. /*
  58. handleDataRequest deals with RPC requests. It is the only function which is
  59. called by the RPC server of the member manager.
  60. */
  61. func (ms *memberStorage) handleDataRequest(request interface{}, response *interface{}) error {
  62. var err error
  63. // Make sure a request can be served
  64. distTable, distTableErr := ms.at.checkState()
  65. if distTableErr != nil {
  66. return distTableErr
  67. }
  68. dr := request.(*DataRequest)
  69. switch dr.RequestType {
  70. case RTGetMain:
  71. *response = ms.gs.MainDB()
  72. case RTSetMain:
  73. err = ms.handleSetMainRequest(distTable, dr, response)
  74. case RTSetRoot:
  75. err = ms.handleSetRootRequest(distTable, dr, response)
  76. case RTGetRoot:
  77. err = ms.handleGetRootRequest(distTable, dr, response)
  78. case RTInsert:
  79. err = ms.handleInsertRequest(distTable, dr, response)
  80. case RTUpdate:
  81. err = ms.handleUpdateRequest(distTable, dr, response)
  82. case RTFree:
  83. err = ms.handleFreeRequest(distTable, dr, response)
  84. case RTExists:
  85. err = ms.handleFetchRequest(distTable, dr, response, false)
  86. case RTFetch:
  87. err = ms.handleFetchRequest(distTable, dr, response, true)
  88. case RTRebalance:
  89. err = ms.handleRebalanceRequest(distTable, dr, response)
  90. default:
  91. err = fmt.Errorf("Unknown request type")
  92. }
  93. manager.LogDebug(ms.ds.MemberManager.Name(), fmt.Sprintf("(Store): Handled: %v %s (Transfer: %v, Error: %v)",
  94. dr.RequestType, dr.Args, dr.Transfer, err))
  95. return err
  96. }
  97. /*
  98. handleSetMainRequest sets the mainDB on the local storage manager.
  99. */
  100. func (ms *memberStorage) handleSetMainRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
  101. mainDB := ms.gs.MainDB()
  102. newMainDB := request.Value.(map[string]string)
  103. // Update keys and values
  104. for k, v := range newMainDB {
  105. mainDB[k] = v
  106. }
  107. // Check if things should be deleted
  108. var toRemove []string
  109. for k := range mainDB {
  110. if _, ok := newMainDB[k]; !ok {
  111. toRemove = append(toRemove, k)
  112. }
  113. }
  114. for _, k := range toRemove {
  115. delete(mainDB, k)
  116. }
  117. err := ms.gs.FlushMain()
  118. if !request.Transfer {
  119. ms.at.AddTransferRequest(distTable.OtherReplicationMembers(0, ms.ds.MemberManager.Name()),
  120. &DataRequest{RTSetMain, nil, request.Value, true})
  121. }
  122. return err
  123. }
  124. /*
  125. handleGetRootRequest retrieves a root value from a local storage manager.
  126. */
  127. func (ms *memberStorage) handleGetRootRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
  128. dsname := request.Args[RPStoreName].(string)
  129. root := request.Args[RPRoot].(int)
  130. sm := ms.dataStorage(dsname, false)
  131. if sm != nil {
  132. *response = sm.Root(root)
  133. }
  134. return nil
  135. }
  136. /*
  137. handleSetRootRequest sets a new root value in a local storage manager.
  138. */
  139. func (ms *memberStorage) handleSetRootRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
  140. dsname := request.Args[RPStoreName].(string)
  141. root := request.Args[RPRoot].(int)
  142. sm := ms.dataStorage(dsname, true)
  143. sm.SetRoot(root, request.Value.(uint64))
  144. if !request.Transfer {
  145. ms.at.AddTransferRequest(distTable.OtherReplicationMembers(0, ms.ds.MemberManager.Name()),
  146. &DataRequest{RTSetRoot, request.Args, request.Value, true})
  147. }
  148. return sm.Flush()
  149. }
  150. /*
  151. handleInsertRequest inserts an object and return its cluster storage location.
  152. Distribution procedure:
  153. Client -> Cluster Member Request Receiver
  154. Cluster Member Request Receiver -> Cluster Member Primary Storage (chosen round-robin / available)
  155. Cluster Member Primary Storage writes into its Transfer Table
  156. Cluster Member Primary Storage (Transfer worker) -> Replicating Cluster Members
  157. */
  158. func (ms *memberStorage) handleInsertRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
  159. var err error
  160. var cloc uint64
  161. dsname := request.Args[RPStoreName].(string)
  162. *response = 0
  163. sm := ms.dataStorage(dsname, true)
  164. if !request.Transfer {
  165. // First get a new cluster location (on this member)
  166. cloc, err = ms.at.NewClusterLoc(dsname)
  167. } else {
  168. // If this is a transfer request we know already the cluster location
  169. cloc = request.Args[RPLoc].(uint64)
  170. }
  171. if err == nil {
  172. // Insert into the local storage
  173. loc, err := sm.Insert(request.Value)
  174. if err == nil {
  175. // Add a translation
  176. _, _, err = ms.at.SetTransClusterLoc(dsname, cloc, loc, 1)
  177. if err == nil {
  178. if !request.Transfer {
  179. // Add transfer request for replication
  180. // At this point the operation has succeeded. We still need to
  181. // replicate the change to all the replicating members but
  182. // any errors happening during this shall not fail this operation.
  183. // The next rebalancing will then synchronize all members again.
  184. ms.at.AddTransferRequest(distTable.Replicas(ms.ds.MemberManager.Name()),
  185. &DataRequest{RTInsert, map[DataRequestArg]interface{}{
  186. RPStoreName: dsname,
  187. RPLoc: cloc,
  188. }, request.Value, true})
  189. }
  190. *response = cloc
  191. }
  192. }
  193. }
  194. return err
  195. }
  196. /*
  197. handleUpdateRequest updates an object and return its cluster storage location.
  198. There is indeed a chance to produce inconsistencies if members fail in the right
  199. sequence. It is assumed that these will be delt with in the next rebalance.
  200. Distribution procedure:
  201. Client -> Cluster Member Request Receiver
  202. Cluster Member Request Receiver -> Cluster Member Primary Storage or Replicating Cluster Member
  203. Storing Cluster Member does the update and writes into its transfer table
  204. Storing Cluster Member (Transfer worker) -> Replicating / Primary Cluster Members
  205. */
  206. func (ms *memberStorage) handleUpdateRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
  207. var err error
  208. var newVersion uint64
  209. dsname := request.Args[RPStoreName].(string)
  210. cloc := request.Args[RPLoc].(uint64)
  211. *response = 0
  212. // Get the translation
  213. transRec, ok, err := ms.at.TransClusterLoc(dsname, cloc)
  214. if ok {
  215. sm := ms.dataStorage(dsname, false)
  216. if sm != nil {
  217. // Update the local storage
  218. if !request.Transfer {
  219. err = sm.Update(transRec.loc, request.Value)
  220. newVersion = transRec.ver + 1
  221. } else {
  222. newVersion = request.Args[RPVer].(uint64)
  223. if newVersion >= transRec.ver {
  224. err = sm.Update(transRec.loc, request.Value)
  225. } else {
  226. // Outdated update requests are simply ignored
  227. err = fmt.Errorf("Received outdated update request (%v - Location: %v)",
  228. ms.ds.MemberManager.Name(), cloc)
  229. manager.LogDebug(ms.ds.MemberManager.Name(), err.Error())
  230. // Need to return no error so the transfer worker on the
  231. // other side removes its entry
  232. err = nil
  233. }
  234. }
  235. if err == nil {
  236. // Increase the version of the translation record
  237. _, _, err = ms.at.SetTransClusterLoc(dsname, cloc, transRec.loc, newVersion)
  238. if err == nil {
  239. if !request.Transfer {
  240. // Add transfer request for replication
  241. // At this point the operation has succeeded. We still need to
  242. // replicate the change to all the replicating members but
  243. // any errors happening during this shall not fail this operation.
  244. // The next rebalancing will then synchronize all members again.
  245. ms.at.AddTransferRequest(distTable.OtherReplicationMembers(cloc, ms.ds.MemberManager.Name()),
  246. &DataRequest{RTUpdate, map[DataRequestArg]interface{}{
  247. RPStoreName: dsname,
  248. RPLoc: cloc,
  249. RPVer: newVersion,
  250. }, request.Value, true})
  251. }
  252. *response = cloc
  253. return nil
  254. }
  255. }
  256. }
  257. }
  258. if err == nil {
  259. err = fmt.Errorf("Cluster slot not found (%v - Location: %v)",
  260. ms.ds.MemberManager.Name(), cloc)
  261. }
  262. return err
  263. }
  264. /*
  265. handleFreeRequest removes an object.
  266. Distribution procedure:
  267. Client -> Cluster Member Request Receiver
  268. Cluster Member Request Receiver -> Cluster Member Primary Storage or Replicating Cluster Member
  269. Storing Cluster Member does the free and writes into its transfer table
  270. Storing Cluster Member (Transfer worker) -> Replicating / Primary Cluster Members
  271. */
  272. func (ms *memberStorage) handleFreeRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
  273. var err error
  274. dsname := request.Args[RPStoreName].(string)
  275. cloc := request.Args[RPLoc].(uint64)
  276. // Get the translation
  277. transRec, ok, err := ms.at.TransClusterLoc(dsname, cloc)
  278. if ok {
  279. sm := ms.dataStorage(dsname, false)
  280. if sm != nil {
  281. // Remove the translation
  282. _, _, err = ms.at.RemoveTransClusterLoc(dsname, cloc)
  283. if err == nil {
  284. // Remove from the local storage
  285. err = sm.Free(transRec.loc)
  286. if !request.Transfer {
  287. // Add transfer request for replication
  288. // At this point the operation has succeeded. We still need to
  289. // replicate the change to all the replicating members but
  290. // any errors happening during this shall not fail this operation.
  291. // The next rebalancing will then synchronize all members again.
  292. ms.at.AddTransferRequest(distTable.OtherReplicationMembers(cloc, ms.ds.MemberManager.Name()),
  293. &DataRequest{RTFree, map[DataRequestArg]interface{}{
  294. RPStoreName: dsname,
  295. RPLoc: cloc,
  296. }, nil, true})
  297. }
  298. return err
  299. }
  300. }
  301. }
  302. if err == nil {
  303. err = fmt.Errorf("Cluster slot not found (%v - Location: %v)", ms.ds.MemberManager.Name(), cloc)
  304. }
  305. return err
  306. }
  307. /*
  308. handleFetchRequest inserts an object and return its cluster storage location.
  309. */
  310. func (ms *memberStorage) handleFetchRequest(distTable *DistributionTable,
  311. request *DataRequest, response *interface{}, fetch bool) error {
  312. var err error
  313. dsname := request.Args[RPStoreName].(string)
  314. cloc := request.Args[RPLoc].(uint64)
  315. // Get the translation
  316. transRec, ok, err := ms.at.TransClusterLoc(dsname, cloc)
  317. if ok {
  318. // Check if the data should be retrieved
  319. if !fetch {
  320. *response = true
  321. return nil
  322. }
  323. sm := ms.dataStorage(dsname, false)
  324. if sm != nil {
  325. var res []byte
  326. err = sm.Fetch(transRec.loc, &res)
  327. if err == nil {
  328. *response = res
  329. return nil
  330. }
  331. }
  332. } else if !fetch {
  333. *response = false
  334. return err
  335. }
  336. if err == nil {
  337. err = fmt.Errorf("Cluster slot not found (%v - Location: %v)", ms.ds.MemberManager.Name(), cloc)
  338. }
  339. return err
  340. }
  341. /*
  342. handleRebalanceRequest processes rebalance requests.
  343. */
  344. func (ms *memberStorage) handleRebalanceRequest(distTable *DistributionTable, request *DataRequest, response *interface{}) error {
  345. var err error
  346. var tr *translationRec
  347. var found bool
  348. var res interface{}
  349. var lloc uint64
  350. handleError := func(err error) {
  351. if err != nil {
  352. manager.LogDebug(ms.ds.MemberManager.Name(), fmt.Sprintf("(Store): Error during rebalancing request handling: %v", err))
  353. }
  354. }
  355. // Get the location ranges for this member and locations which are replicated on this member.
  356. storeRangeStart, storeRangeStop := distTable.MemberRange(ms.ds.MemberManager.Name())
  357. repRangeStart, repRangeStop := distTable.ReplicationRange(ms.ds.MemberManager.Name())
  358. // Get the request data
  359. rsource := request.Args[RPSrc].(string)
  360. smnames := request.Args[RPStoreName]
  361. locs := request.Args[RPLoc]
  362. vers := request.Args[RPVer]
  363. for i, cloc := range locs.([]uint64) {
  364. // Check if there was an error from the previous iteration
  365. handleError(err)
  366. smname := smnames.([]string)[i]
  367. ver := vers.([]uint64)[i]
  368. // Do not proceed if there is an error or if the location is out of
  369. // range of responsibility
  370. notInStoreRange := cloc < storeRangeStart || cloc > storeRangeStop
  371. notInRepRange := cloc < repRangeStart || cloc > repRangeStop
  372. // Check if the location exists in the local storage
  373. tr, found, err = ms.at.TransClusterLoc(smname, cloc)
  374. if err != nil || (notInStoreRange && notInRepRange) {
  375. // Skip the location if there was an error or if this member
  376. // is not relevant for the location in question (either as primary
  377. // storage member or as replica)
  378. continue
  379. }
  380. if found {
  381. // Check if the version is newer and update the local record if it is
  382. if tr.ver < ver {
  383. // Local record exists and needs to be updated
  384. sm := ms.dataStorage(smname, false)
  385. // Fetch the data from the remote machine
  386. res, err = ms.ds.sendDataRequest(rsource, &DataRequest{RTFetch, map[DataRequestArg]interface{}{
  387. RPStoreName: smname,
  388. RPLoc: cloc,
  389. }, nil, false})
  390. if err == nil {
  391. // Update the local storage
  392. if err = sm.Update(tr.loc, res); err == nil {
  393. // Update the translation
  394. _, _, err = ms.at.SetTransClusterLoc(smname, cloc, tr.loc, ver)
  395. manager.LogDebug(ms.ds.MemberManager.Name(),
  396. fmt.Sprintf("(Store): Rebalance updated %v location: %v", smname, cloc))
  397. }
  398. }
  399. }
  400. } else {
  401. // The data on the remote system should be inserted into the local
  402. // datastore.
  403. sm := ms.dataStorage(smname, true)
  404. // Fetch the data from the remote machine
  405. res, err = ms.ds.sendDataRequest(rsource, &DataRequest{RTFetch, map[DataRequestArg]interface{}{
  406. RPStoreName: smname,
  407. RPLoc: cloc,
  408. }, nil, false})
  409. if err == nil {
  410. // Insert into the local storage
  411. lloc, err = sm.Insert(res)
  412. if err == nil {
  413. // Add a translation
  414. _, _, err = ms.at.SetTransClusterLoc(smname, cloc, lloc, ver)
  415. manager.LogDebug(ms.ds.MemberManager.Name(),
  416. fmt.Sprintf("(Store): Rebalance inserted %v location: %v", smname, cloc))
  417. }
  418. }
  419. }
  420. if err == nil {
  421. // Should the sender have the data
  422. sourceSRangeStart, sourceSRangeStop := distTable.MemberRange(rsource)
  423. sourceRRangeStart, sourceRRangeStop := distTable.ReplicationRange(rsource)
  424. notInSourceSRange := cloc < sourceSRangeStart || cloc > sourceSRangeStop
  425. notInSourceRRange := cloc < sourceRRangeStart || cloc > sourceRRangeStop
  426. if notInSourceSRange && notInSourceRRange {
  427. manager.LogDebug(ms.ds.MemberManager.Name(),
  428. fmt.Sprintf("(Store): Rebalance removes %v location: %v from member %v",
  429. smname, tr.loc, rsource))
  430. _, err = ms.ds.sendDataRequest(rsource, &DataRequest{RTFree, map[DataRequestArg]interface{}{
  431. RPStoreName: smname,
  432. RPLoc: cloc,
  433. }, nil, true})
  434. }
  435. }
  436. }
  437. handleError(err)
  438. return nil
  439. }
  440. /*
  441. dataStorage returns a storage.StorageManager which will only store byte slices.
  442. */
  443. func (ms *memberStorage) dataStorage(dsname string, create bool) storage.Manager {
  444. return ms.gs.StorageManager(LocalStoragePrefix+dsname, create)
  445. }
  446. /*
  447. dump dumps the contents of a particular member storage manager as escaped strings.
  448. (Works only for MemoryStorageManagers.)
  449. */
  450. func (ms *memberStorage) dump(smname string) string {
  451. var res string
  452. printTransferTable := func(buf *bytes.Buffer) {
  453. // Go through the transfer table and see if there is anything
  454. it := hash.NewHTreeIterator(ms.at.transfer)
  455. for it.HasNext() {
  456. _, val := it.Next()
  457. if val != nil {
  458. tr := val.(*transferRec)
  459. args, _ := json.Marshal(tr.request.Args)
  460. vals, ok := tr.request.Value.([]byte)
  461. if !ok {
  462. vals, _ = json.Marshal(tr.request.Value)
  463. }
  464. buf.WriteString(fmt.Sprintf("transfer: %v - %v %v %q\n",
  465. tr.members, tr.request.RequestType, string(args), vals))
  466. }
  467. }
  468. }
  469. if smname == "" {
  470. // Dump the contents of the MainDB if no name is given
  471. buf := new(bytes.Buffer)
  472. buf.WriteString(fmt.Sprintf("%v MemberStorageManager MainDB\n",
  473. ms.ds.MemberManager.Name()))
  474. var keys []string
  475. for k := range ms.gs.MainDB() {
  476. keys = append(keys, k)
  477. }
  478. sort.Strings(keys)
  479. // Output local storage content with mapped cluster locations
  480. for _, k := range keys {
  481. v := ms.gs.MainDB()[k]
  482. buf.WriteString(fmt.Sprintf("%v - %q\n", k, v))
  483. }
  484. printTransferTable(buf)
  485. return buf.String()
  486. }
  487. sm := ms.dataStorage(smname, false)
  488. if sm != nil {
  489. // Make sure the storage manager is a MemoryStorageManager
  490. if msm, ok := sm.(*storage.MemoryStorageManager); ok {
  491. // Get all stored cluster locations
  492. locmap := make(map[uint64]string)
  493. it := hash.NewHTreeIterator(ms.at.translation)
  494. for it.HasNext() {
  495. k, v := it.Next()
  496. key := string(k)
  497. if strings.HasPrefix(key, transPrefix) {
  498. key = string(key[len(fmt.Sprint(transPrefix, smname, "#")):])
  499. locmap[v.(*translationRec).loc] = fmt.Sprintf("%v (v:%v)",
  500. key, v.(*translationRec).ver)
  501. }
  502. }
  503. buf := new(bytes.Buffer)
  504. buf.WriteString(fmt.Sprintf("%v MemberStorageManager %v\n",
  505. ms.ds.MemberManager.Name(), msm.Name()))
  506. buf.WriteString("Roots: ")
  507. // Go through root values
  508. for i := 0; i < 10; i++ {
  509. rootVal := msm.Root(i)
  510. buf.WriteString(fmt.Sprintf("%v=%v ", i, rootVal))
  511. }
  512. buf.WriteString("\n")
  513. var keys []uint64
  514. for k := range msm.Data {
  515. keys = append(keys, k)
  516. }
  517. sortutil.UInt64s(keys)
  518. // Output local storage content with mapped cluster locations
  519. for _, k := range keys {
  520. v := msm.Data[k]
  521. caddr := locmap[k]
  522. buf.WriteString(fmt.Sprintf("cloc: %v - lloc: %v - %q\n",
  523. caddr, k, v))
  524. }
  525. printTransferTable(buf)
  526. res = buf.String()
  527. }
  528. }
  529. return res
  530. }