diskstoragemanager.go 17 KB


  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package storage
  11. import (
  12. "bytes"
  13. "encoding/gob"
  14. "errors"
  15. "fmt"
  16. "io"
  17. "sync"
  18. "time"
  19. "devt.de/krotik/common/errorutil"
  20. "devt.de/krotik/common/fileutil"
  21. "devt.de/krotik/common/lockutil"
  22. "devt.de/krotik/eliasdb/storage/file"
  23. "devt.de/krotik/eliasdb/storage/paging"
  24. "devt.de/krotik/eliasdb/storage/slotting"
  25. "devt.de/krotik/eliasdb/storage/util"
  26. )
  27. /*
  28. VERSION constains the version of the storage API
  29. */
  30. const VERSION = 1
  31. /*
  32. FileSiffixLockfile is the file ending for lockfiles
  33. */
  34. const FileSiffixLockfile = "lck"
  35. /*
  36. FileSuffixLogicalSlots is the file ending for a logical slot storage
  37. */
  38. const FileSuffixLogicalSlots = "ix"
  39. /*
  40. FileSuffixLogicalFreeSlots is the file ending for a free logical slot storage
  41. */
  42. const FileSuffixLogicalFreeSlots = "ixf"
  43. /*
  44. FileSuffixPhysicalSlots is the file ending for a physical slot storage
  45. */
  46. const FileSuffixPhysicalSlots = "db"
  47. /*
  48. FileSuffixPhysicalFreeSlots is the file ending for a free physical slot storage
  49. */
  50. const FileSuffixPhysicalFreeSlots = "dbf"
  51. /*
  52. BlockSizePhysicalSlots is the block for a physical slot file. Physical slots will
  53. contain actual data they need to have fairly large block sizes.
  54. */
  55. const BlockSizePhysicalSlots = 1024 * 8
  56. /*
  57. BlockSizeLogicalSlots is the block for a logical slot file. Logical slots contain only
  58. pointers they only need small blocks.
  59. */
  60. const BlockSizeLogicalSlots = 1024 * 2
  61. /*
  62. BlockSizeFreeSlots is the block for a free slot files. Files containing only free slot
  63. pointers will always be small. They only need tiny blocks.
  64. */
  65. const BlockSizeFreeSlots = 1024
  66. /*
  67. ErrReadonly is returned when attempting a write operation on a readonly datastore.
  68. */
  69. var ErrReadonly = errors.New("Storage is readonly")
  70. /*
  71. DiskStorageManager is a storage manager which can store any gob serializable datastructure.
  72. */
  73. type DiskStorageManager struct {
  74. *ByteDiskStorageManager
  75. }
  76. /*
  77. NewDiskStorageManager creates a new disk storage manager with optional
  78. transaction management. If the onlyAppend flag is set then the manager will
  79. not attempt to reuse space once it was released after use. If the
  80. transDisabled flag is set then the storage manager will not support
  81. transactions.
  82. */
  83. func NewDiskStorageManager(filename string, readonly bool, onlyAppend bool,
  84. transDisabled bool, lockfileDisabled bool) *DiskStorageManager {
  85. return &DiskStorageManager{NewByteDiskStorageManager(filename, readonly,
  86. onlyAppend, transDisabled, lockfileDisabled)}
  87. }
  88. /*
  89. Name returns the name of the StorageManager instance.
  90. */
  91. func (dsm *DiskStorageManager) Name() string {
  92. return fmt.Sprint("DiskStorageFile:", dsm.ByteDiskStorageManager.filename)
  93. }
  94. /*
  95. Serialize serializes an object into a byte slice.
  96. */
  97. func (dsm *DiskStorageManager) Serialize(o interface{}) ([]byte, error) {
  98. // Request a buffer from the buffer pool
  99. bb := BufferPool.Get().(*bytes.Buffer)
  100. defer func() {
  101. bb.Reset()
  102. BufferPool.Put(bb)
  103. }()
  104. // Serialize the object into a gob bytes stream
  105. err := gob.NewEncoder(bb).Encode(o)
  106. if err != nil {
  107. return nil, err
  108. }
  109. return bb.Bytes(), nil
  110. }
  111. /*
  112. Insert inserts an object and return its storage location.
  113. */
  114. func (dsm *DiskStorageManager) Insert(o interface{}) (uint64, error) {
  115. b, err := dsm.Serialize(o)
  116. if err != nil {
  117. return 0, err
  118. }
  119. return dsm.ByteDiskStorageManager.Insert(b)
  120. }
  121. /*
  122. Update updates a storage location.
  123. */
  124. func (dsm *DiskStorageManager) Update(loc uint64, o interface{}) error {
  125. b, err := dsm.Serialize(o)
  126. if err != nil {
  127. return err
  128. }
  129. return dsm.ByteDiskStorageManager.Update(loc, b)
  130. }
  131. /*
  132. Fetch fetches an object from a given storage location and writes it to
  133. a given data container.
  134. */
  135. func (dsm *DiskStorageManager) Fetch(loc uint64, o interface{}) error {
  136. // Request a buffer from the buffer pool
  137. bb := BufferPool.Get().(*bytes.Buffer)
  138. defer func() {
  139. bb.Reset()
  140. BufferPool.Put(bb)
  141. }()
  142. if err := dsm.ByteDiskStorageManager.Fetch(loc, bb); err != nil {
  143. return err
  144. }
  145. // Deserialize the object from a gob bytes stream
  146. return gob.NewDecoder(bb).Decode(o)
  147. }
  148. /*
  149. ByteDiskStorageManager is a disk storage manager which can only store byte slices.
  150. */
  151. type ByteDiskStorageManager struct {
  152. filename string // Filename for all managed files
  153. readonly bool // Flag to make the storage readonly
  154. onlyAppend bool // Flag for append-only mode
  155. transDisabled bool // Flag if transactions are enabled
  156. mutex *sync.Mutex // Mutex to protect actual file operations
  157. physicalSlotsSf *file.StorageFile // StorageFile for physical slots
  158. physicalSlotsPager *paging.PagedStorageFile // Pager for physical slots StorageFile
  159. physicalFreeSlotsSf *file.StorageFile // StorageFile for free physical slots
  160. physicalFreeSlotsPager *paging.PagedStorageFile // Pager for free physical slots StorageFile
  161. physicalSlotManager *slotting.PhysicalSlotManager // Manager for physical slots
  162. logicalSlotsSf *file.StorageFile // StorageFile for logical slots
  163. logicalSlotsPager *paging.PagedStorageFile // Pager for logical slots StorageFile
  164. logicalFreeSlotsSf *file.StorageFile // StorageFile for free logical slots
  165. logicalFreeSlotsPager *paging.PagedStorageFile // Pager for free logical slots StorageFile
  166. logicalSlotManager *slotting.LogicalSlotManager // Manager for physical slots
  167. lockfile *lockutil.LockFile // Lockfile manager
  168. }
  169. /*
  170. NewByteDiskStorageManager creates a new disk storage manager with optional
  171. transaction management which can only store byte slices. If the onlyAppend
  172. flag is set then the manager will not attempt to reuse space once it was
  173. released after use. If the transDisabled flag is set then the storage
  174. manager will not support transactions.
  175. */
  176. func NewByteDiskStorageManager(filename string, readonly bool, onlyAppend bool,
  177. transDisabled bool, lockfileDisabled bool) *ByteDiskStorageManager {
  178. var lf *lockutil.LockFile
  179. // Create a lockfile which is checked every 50 milliseconds
  180. if !lockfileDisabled {
  181. lf = lockutil.NewLockFile(fmt.Sprintf("%v.%v", filename, FileSiffixLockfile),
  182. time.Duration(50)*time.Millisecond)
  183. }
  184. bdsm := &ByteDiskStorageManager{filename, readonly, onlyAppend, transDisabled, &sync.Mutex{}, nil, nil,
  185. nil, nil, nil, nil, nil, nil, nil, nil, lf}
  186. err := initByteDiskStorageManager(bdsm)
  187. if err != nil {
  188. panic(fmt.Sprintf("Could not initialize DiskStroageManager: %v", filename))
  189. }
  190. return bdsm
  191. }
  192. /*
  193. DataFileExist checks if the main datastore file exists.
  194. */
  195. func DataFileExist(filename string) bool {
  196. ret, err := fileutil.PathExists(fmt.Sprintf("%v.%v.0", filename,
  197. FileSuffixPhysicalSlots))
  198. if err != nil {
  199. return false
  200. }
  201. return ret
  202. }
  203. /*
  204. Name returns the name of the StorageManager instance.
  205. */
  206. func (bdsm *ByteDiskStorageManager) Name() string {
  207. return fmt.Sprint("ByteDiskStorageFile:", bdsm.filename)
  208. }
  209. /*
  210. Root returns a root value.
  211. */
  212. func (bdsm *ByteDiskStorageManager) Root(root int) uint64 {
  213. bdsm.mutex.Lock()
  214. defer bdsm.mutex.Unlock()
  215. bdsm.checkFileOpen()
  216. return bdsm.physicalSlotsPager.Header().Root(root)
  217. }
  218. /*
  219. SetRoot writes a root value.
  220. */
  221. func (bdsm *ByteDiskStorageManager) SetRoot(root int, val uint64) {
  222. // When readonly this operation becomes a NOP
  223. if bdsm.readonly {
  224. return
  225. }
  226. bdsm.mutex.Lock()
  227. defer bdsm.mutex.Unlock()
  228. bdsm.checkFileOpen()
  229. bdsm.physicalSlotsPager.Header().SetRoot(root, val)
  230. }
  231. /*
  232. Insert inserts an object and return its storage location.
  233. */
  234. func (bdsm *ByteDiskStorageManager) Insert(o interface{}) (uint64, error) {
  235. bdsm.checkFileOpen()
  236. // Fail operation if readonly
  237. if bdsm.readonly {
  238. return 0, ErrReadonly
  239. }
  240. // Continue single threaded from here on
  241. bdsm.mutex.Lock()
  242. defer bdsm.mutex.Unlock()
  243. // Store the data in a physical slot
  244. b := o.([]byte)
  245. ploc, err := bdsm.physicalSlotManager.Insert(b, 0, uint32(len(b)))
  246. if err != nil {
  247. return 0, err
  248. }
  249. // Get a logical slot for the physical slot
  250. loc, err := bdsm.logicalSlotManager.Insert(ploc)
  251. if err != nil {
  252. return 0, err
  253. }
  254. return loc, nil
  255. }
  256. /*
  257. Update updates a storage location.
  258. */
  259. func (bdsm *ByteDiskStorageManager) Update(loc uint64, o interface{}) error {
  260. bdsm.checkFileOpen()
  261. // Fail operation if readonly
  262. if bdsm.readonly {
  263. return ErrReadonly
  264. }
  265. // Get the physical slot for the given logical slot
  266. bdsm.mutex.Lock()
  267. ploc, err := bdsm.logicalSlotManager.Fetch(loc)
  268. bdsm.mutex.Unlock()
  269. if err != nil {
  270. return err
  271. }
  272. if ploc == 0 {
  273. return ErrSlotNotFound.fireError(bdsm, fmt.Sprint("Location:",
  274. util.LocationRecord(loc), util.LocationOffset(loc)))
  275. }
  276. // Continue single threaded from here on
  277. bdsm.mutex.Lock()
  278. defer bdsm.mutex.Unlock()
  279. // Update the physical record
  280. b := o.([]byte)
  281. newPloc, err := bdsm.physicalSlotManager.Update(ploc, b, 0, uint32(len(b)))
  282. if err != nil {
  283. return err
  284. }
  285. // Update the logical slot if the physical slot has changed
  286. if newPloc != ploc {
  287. return bdsm.logicalSlotManager.Update(loc, newPloc)
  288. }
  289. return nil
  290. }
  291. /*
  292. Fetch fetches an object from a given storage location and writes it to
  293. a given data container.
  294. */
  295. func (bdsm *ByteDiskStorageManager) Fetch(loc uint64, o interface{}) error {
  296. bdsm.checkFileOpen()
  297. // Get the physical slot for the given logical slot
  298. bdsm.mutex.Lock()
  299. ploc, err := bdsm.logicalSlotManager.Fetch(loc)
  300. bdsm.mutex.Unlock()
  301. if err != nil {
  302. return err
  303. }
  304. if ploc == 0 {
  305. return ErrSlotNotFound.fireError(bdsm, fmt.Sprint("Location:",
  306. util.LocationRecord(loc), util.LocationOffset(loc)))
  307. }
  308. // Request the stored bytes
  309. bdsm.mutex.Lock()
  310. if w, ok := o.(io.Writer); ok {
  311. err = bdsm.physicalSlotManager.Fetch(ploc, w)
  312. } else {
  313. var b bytes.Buffer
  314. err = bdsm.physicalSlotManager.Fetch(ploc, &b)
  315. copy(o.([]byte), b.Bytes())
  316. }
  317. bdsm.mutex.Unlock()
  318. return err
  319. }
  320. /*
  321. FetchCached is not implemented for a ByteDiskStorageManager.
  322. Only defined to satisfy the StorageManager interface.
  323. */
  324. func (bdsm *ByteDiskStorageManager) FetchCached(loc uint64) (interface{}, error) {
  325. return nil, ErrNotInCache
  326. }
  327. /*
  328. Free frees a storage location.
  329. */
  330. func (bdsm *ByteDiskStorageManager) Free(loc uint64) error {
  331. bdsm.checkFileOpen()
  332. // Fail operation if readonly
  333. if bdsm.readonly {
  334. return ErrReadonly
  335. }
  336. // Continue single threaded from here on
  337. bdsm.mutex.Lock()
  338. defer bdsm.mutex.Unlock()
  339. // Get the physical slot for the given logical slot
  340. ploc, err := bdsm.logicalSlotManager.Fetch(loc)
  341. if err != nil {
  342. return err
  343. }
  344. if ploc == 0 {
  345. return ErrSlotNotFound.fireError(bdsm, fmt.Sprint("Location:",
  346. util.LocationRecord(loc), util.LocationOffset(loc)))
  347. }
  348. // First try to free the physical slot since here is the data
  349. // if this fails we don't touch the logical slot
  350. err = bdsm.physicalSlotManager.Free(ploc)
  351. if err != nil {
  352. return err
  353. }
  354. // This is very unlikely to fail - either way we can't do anything
  355. // at this point since the physical slot has already gone away
  356. return bdsm.logicalSlotManager.Free(loc)
  357. }
  358. /*
  359. Flush writes all pending changes to disk.
  360. */
  361. func (bdsm *ByteDiskStorageManager) Flush() error {
  362. bdsm.checkFileOpen()
  363. // When readonly this operation becomes a NOP
  364. if bdsm.readonly {
  365. return nil
  366. }
  367. ce := errorutil.NewCompositeError()
  368. // Continue single threaded from here on
  369. bdsm.mutex.Lock()
  370. defer bdsm.mutex.Unlock()
  371. // Write pending changes
  372. if err := bdsm.physicalSlotManager.Flush(); err != nil {
  373. ce.Add(err)
  374. }
  375. if err := bdsm.logicalSlotManager.Flush(); err != nil {
  376. ce.Add(err)
  377. }
  378. if err := bdsm.physicalSlotsPager.Flush(); err != nil {
  379. ce.Add(err)
  380. }
  381. if err := bdsm.physicalFreeSlotsPager.Flush(); err != nil {
  382. ce.Add(err)
  383. }
  384. if err := bdsm.logicalSlotsPager.Flush(); err != nil {
  385. ce.Add(err)
  386. }
  387. if err := bdsm.logicalFreeSlotsPager.Flush(); err != nil {
  388. ce.Add(err)
  389. }
  390. // Return errors if there were any
  391. if ce.HasErrors() {
  392. return ce
  393. }
  394. return nil
  395. }
  396. /*
  397. Rollback cancels all pending changes which have not yet been written to disk.
  398. */
  399. func (bdsm *ByteDiskStorageManager) Rollback() error {
  400. // Rollback has no effect if transactions are disabled or when readonly
  401. if bdsm.transDisabled || bdsm.readonly {
  402. return nil
  403. }
  404. bdsm.checkFileOpen()
  405. ce := errorutil.NewCompositeError()
  406. // Continue single threaded from here on
  407. bdsm.mutex.Lock()
  408. defer bdsm.mutex.Unlock()
  409. // Write pending manager changes to transaction log
  410. if err := bdsm.physicalSlotManager.Flush(); err != nil {
  411. ce.Add(err)
  412. }
  413. if err := bdsm.logicalSlotManager.Flush(); err != nil {
  414. ce.Add(err)
  415. }
  416. // Rollback current transaction
  417. if err := bdsm.physicalSlotsPager.Rollback(); err != nil {
  418. ce.Add(err)
  419. }
  420. if err := bdsm.physicalFreeSlotsPager.Rollback(); err != nil {
  421. ce.Add(err)
  422. }
  423. if err := bdsm.logicalSlotsPager.Rollback(); err != nil {
  424. ce.Add(err)
  425. }
  426. if err := bdsm.logicalFreeSlotsPager.Rollback(); err != nil {
  427. ce.Add(err)
  428. }
  429. // Return errors if there were any
  430. if ce.HasErrors() {
  431. return ce
  432. }
  433. return nil
  434. }
  435. /*
  436. Close closes the StorageManager and write all pending changes to disk.
  437. */
  438. func (bdsm *ByteDiskStorageManager) Close() error {
  439. bdsm.checkFileOpen()
  440. ce := errorutil.NewCompositeError()
  441. // Continue single threaded from here on
  442. bdsm.mutex.Lock()
  443. defer bdsm.mutex.Unlock()
  444. // Try to close all files and collect any errors which are returned
  445. if err := bdsm.physicalSlotsPager.Close(); err != nil {
  446. ce.Add(err)
  447. }
  448. if err := bdsm.physicalFreeSlotsPager.Close(); err != nil {
  449. ce.Add(err)
  450. }
  451. if err := bdsm.logicalSlotsPager.Close(); err != nil {
  452. ce.Add(err)
  453. }
  454. if err := bdsm.logicalFreeSlotsPager.Close(); err != nil {
  455. ce.Add(err)
  456. }
  457. // Return errors if there were any
  458. if ce.HasErrors() {
  459. return ce
  460. }
  461. // Release all file related objects
  462. bdsm.physicalSlotsSf = nil
  463. bdsm.physicalSlotsPager = nil
  464. bdsm.physicalFreeSlotsSf = nil
  465. bdsm.physicalFreeSlotsPager = nil
  466. bdsm.physicalSlotManager = nil
  467. bdsm.logicalSlotsSf = nil
  468. bdsm.logicalSlotsPager = nil
  469. bdsm.logicalFreeSlotsSf = nil
  470. bdsm.logicalFreeSlotsPager = nil
  471. bdsm.logicalSlotManager = nil
  472. if bdsm.lockfile != nil {
  473. return bdsm.lockfile.Finish()
  474. }
  475. return nil
  476. }
  477. /*
  478. checkFileOpen checks that the files on disk are still open.
  479. */
  480. func (bdsm *ByteDiskStorageManager) checkFileOpen() {
  481. if bdsm.physicalSlotsSf == nil {
  482. panic(fmt.Sprint("Trying to access storage after it was closed: ", bdsm.filename))
  483. }
  484. if bdsm.lockfile != nil && !bdsm.lockfile.WatcherRunning() {
  485. err := bdsm.lockfile.Finish()
  486. panic(fmt.Sprint("Error while checking lockfile:", err))
  487. }
  488. }
  489. /*
  490. initByteDiskStorageManager initialises the file managers of a given ByteDiskStorageManager.
  491. */
  492. func initByteDiskStorageManager(bdsm *ByteDiskStorageManager) error {
  493. // Kick off the lockfile watcher
  494. if bdsm.lockfile != nil {
  495. if err := bdsm.lockfile.Start(); err != nil {
  496. panic(fmt.Sprintf("Could not take ownership of lockfile %v: %v",
  497. bdsm.filename, err))
  498. }
  499. }
  500. // Try to open all files and collect all errors
  501. ce := errorutil.NewCompositeError()
  502. sf, pager, err := createFileAndPager(
  503. fmt.Sprintf("%v.%v", bdsm.filename, FileSuffixPhysicalSlots),
  504. BlockSizePhysicalSlots, bdsm)
  505. if err != nil {
  506. ce.Add(err)
  507. }
  508. bdsm.physicalSlotsSf = sf
  509. bdsm.physicalSlotsPager = pager
  510. sf, pager, err = createFileAndPager(
  511. fmt.Sprintf("%v.%v", bdsm.filename, FileSuffixPhysicalFreeSlots),
  512. BlockSizeFreeSlots, bdsm)
  513. if err != nil {
  514. ce.Add(err)
  515. }
  516. bdsm.physicalFreeSlotsSf = sf
  517. bdsm.physicalFreeSlotsPager = pager
  518. if !ce.HasErrors() {
  519. bdsm.physicalSlotManager = slotting.NewPhysicalSlotManager(bdsm.physicalSlotsPager,
  520. bdsm.physicalFreeSlotsPager, bdsm.onlyAppend)
  521. }
  522. sf, pager, err = createFileAndPager(
  523. fmt.Sprintf("%v.%v", bdsm.filename, FileSuffixLogicalSlots),
  524. BlockSizeLogicalSlots, bdsm)
  525. if err != nil {
  526. ce.Add(err)
  527. }
  528. bdsm.logicalSlotsSf = sf
  529. bdsm.logicalSlotsPager = pager
  530. sf, pager, err = createFileAndPager(
  531. fmt.Sprintf("%v.%v", bdsm.filename, FileSuffixLogicalFreeSlots),
  532. BlockSizeFreeSlots, bdsm)
  533. if err != nil {
  534. ce.Add(err)
  535. }
  536. bdsm.logicalFreeSlotsSf = sf
  537. bdsm.logicalFreeSlotsPager = pager
  538. if !ce.HasErrors() {
  539. bdsm.logicalSlotManager = slotting.NewLogicalSlotManager(bdsm.logicalSlotsPager,
  540. bdsm.logicalFreeSlotsPager)
  541. }
  542. // If there were any file related errors return at this point
  543. if ce.HasErrors() {
  544. // Release the lockfile if there were errors
  545. if bdsm.lockfile != nil {
  546. bdsm.lockfile.Finish()
  547. }
  548. return ce
  549. }
  550. // Check version
  551. version := bdsm.Root(RootIDVersion)
  552. if version > VERSION {
  553. // Try to clean up
  554. bdsm.Close()
  555. panic(fmt.Sprint("Cannot open datastore ", bdsm.filename, " - version of disk files is "+
  556. "newer than supported version. Supported version:", VERSION,
  557. " Disk files version:", version))
  558. }
  559. if version != VERSION {
  560. bdsm.SetRoot(RootIDVersion, VERSION)
  561. }
  562. return nil
  563. }
  564. /*
  565. createFileAndPager creates a storagefile and a pager.
  566. */
  567. func createFileAndPager(filename string, recordSize uint32,
  568. bdsm *ByteDiskStorageManager) (*file.StorageFile, *paging.PagedStorageFile, error) {
  569. sf, err := file.NewStorageFile(filename, recordSize, bdsm.transDisabled)
  570. if err != nil {
  571. return nil, nil, err
  572. }
  573. pager, err := paging.NewPagedStorageFile(sf)
  574. return sf, pager, err
  575. }