physicalslotmanager.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package slotting
  11. import (
  12. "io"
  13. "devt.de/krotik/eliasdb/storage/file"
  14. "devt.de/krotik/eliasdb/storage/paging"
  15. "devt.de/krotik/eliasdb/storage/paging/view"
  16. "devt.de/krotik/eliasdb/storage/slotting/pageview"
  17. "devt.de/krotik/eliasdb/storage/util"
  18. )
  19. /*
  20. AllocationRoundUpThreshold is used to decide if a slot size should be rounded up. If an
  21. allocation would leave less than AllocationRoundUpThreshold + 1 left on the page then
  22. the allocation size is rounded up to fit the page
  23. */
  24. const AllocationRoundUpThreshold = 16
  25. /*
  26. PhysicalSlotManager data structure
  27. */
  28. type PhysicalSlotManager struct {
  29. storagefile *file.StorageFile // StorageFile which is wrapped
  30. pager *paging.PagedStorageFile // Pager for StorageFile
  31. freeManager *FreePhysicalSlotManager // Manager for free slots
  32. recordSize uint32 // Size of records
  33. availableRecordSize uint32 // Available space on records
  34. }
  35. /*
  36. NewPhysicalSlotManager creates a new object to manage physical slots. This
  37. factory function requires two PagedStorageFiles the first will hold the actual
  38. physical slots, the second is used to manage free physical slots.
  39. */
  40. func NewPhysicalSlotManager(psf *paging.PagedStorageFile,
  41. fpsf *paging.PagedStorageFile, onlyAppend bool) *PhysicalSlotManager {
  42. sf := psf.StorageFile()
  43. freeManager := NewFreePhysicalSlotManager(fpsf, onlyAppend)
  44. recordSize := sf.RecordSize()
  45. return &PhysicalSlotManager{sf, psf, freeManager,
  46. recordSize, recordSize - pageview.OffsetData}
  47. }
  48. /*
  49. Insert inserts a new piece of data.
  50. */
  51. func (psm *PhysicalSlotManager) Insert(data []byte, start uint32, length uint32) (uint64, error) {
  52. if length == 0 {
  53. panic("Cannot insert 0 bytes of data")
  54. }
  55. location, err := psm.allocate(length)
  56. if err != nil {
  57. return 0, err
  58. }
  59. err = psm.write(location, data, start, length)
  60. if err != nil {
  61. // Since the write operation failed declare the previous allocated space
  62. // as free
  63. psm.freeManager.Add(location, length)
  64. return 0, err
  65. }
  66. return location, nil
  67. }
  68. /*
  69. Update updates the data in a slot.
  70. */
  71. func (psm *PhysicalSlotManager) Update(location uint64, data []byte, start uint32, length uint32) (uint64, error) {
  72. record, err := psm.storagefile.Get(util.LocationRecord(location))
  73. if err != nil {
  74. return 0, err
  75. }
  76. offset := util.LocationOffset(location)
  77. availableSize := util.AvailableSize(record, int(offset))
  78. psm.storagefile.ReleaseInUse(record)
  79. if length > availableSize || availableSize-length > util.MaxAvailableSizeDifference {
  80. // Reallocate if the new data is too big for the old slot or if the
  81. // data is much smaller than the available space in the slot (i.e.
  82. // there would be a lot of waste)
  83. // Error handling for free call is done by the first Get call of
  84. // this function.
  85. psm.Free(location)
  86. location, err = psm.allocate(length)
  87. if err != nil {
  88. return 0, err
  89. }
  90. }
  91. err = psm.write(location, data, start, length)
  92. if err != nil {
  93. return 0, err
  94. }
  95. return location, nil
  96. }
  97. /*
  98. Fetch fetches data from a specified location.
  99. */
  100. func (psm *PhysicalSlotManager) Fetch(location uint64, writer io.Writer) error {
  101. cursor := paging.NewPageCursor(psm.pager, view.TypeDataPage, util.LocationRecord(location))
  102. record, err := psm.storagefile.Get(cursor.Current())
  103. if err != nil {
  104. return err
  105. }
  106. length := util.CurrentSize(record, int(util.LocationOffset(location)))
  107. if length == 0 {
  108. // Return at this point if there is nothing to read
  109. psm.storagefile.ReleaseInUseID(cursor.Current(), false)
  110. return nil
  111. }
  112. // Read now the bytes
  113. restSize := length
  114. recordOffset := uint32(util.LocationOffset(location) + util.SizeInfoSize)
  115. for restSize > 0 {
  116. // Calculate how much data should be read
  117. toCopy := psm.recordSize - uint32(recordOffset)
  118. if restSize < toCopy {
  119. // If the record can contain more than restSize just
  120. // read restSize
  121. toCopy = restSize
  122. }
  123. // Read the data
  124. writer.Write(record.Data()[recordOffset : recordOffset+toCopy])
  125. // Calculate the rest size and new offset
  126. restSize -= toCopy
  127. psm.storagefile.ReleaseInUseID(cursor.Current(), false)
  128. // Go to the next record
  129. if restSize > 0 {
  130. // Error handling is done by surrounding Get calls
  131. next, _ := cursor.Next()
  132. record, err = psm.storagefile.Get(next)
  133. if err != nil {
  134. return err
  135. }
  136. recordOffset = pageview.OffsetData
  137. }
  138. }
  139. return nil
  140. }
  141. /*
  142. Free frees a given physical slot. The given slot is given to the FreePhysicalSlotManager.
  143. */
  144. func (psm *PhysicalSlotManager) Free(location uint64) error {
  145. slotRecord := util.LocationRecord(location)
  146. slotOffset := int(util.LocationOffset(location))
  147. record, err := psm.storagefile.Get(slotRecord)
  148. if err != nil {
  149. return err
  150. }
  151. util.SetCurrentSize(record, slotOffset, 0)
  152. psm.storagefile.ReleaseInUseID(slotRecord, true)
  153. psm.freeManager.Add(location, util.AvailableSize(record, slotOffset))
  154. return nil
  155. }
  156. /*
  157. Flush writes all pending changes.
  158. */
  159. func (psm *PhysicalSlotManager) Flush() error {
  160. return psm.freeManager.Flush()
  161. }
  162. /*
  163. write writes data to a location. Should an error occurs, then the already written data
  164. is not cleaned up.
  165. */
  166. func (psm *PhysicalSlotManager) write(location uint64, data []byte, start uint32, length uint32) error {
  167. cursor := paging.NewPageCursor(psm.pager, view.TypeDataPage, util.LocationRecord(location))
  168. record, err := psm.storagefile.Get(cursor.Current())
  169. if err != nil {
  170. return err
  171. }
  172. util.SetCurrentSize(record, int(util.LocationOffset(location)), length)
  173. if length == 0 {
  174. // Return at this point if there is nothing to write
  175. psm.storagefile.ReleaseInUseID(cursor.Current(), true)
  176. return nil
  177. }
  178. // Write now the bytes
  179. restSize := length
  180. dataOffset := start
  181. recordOffset := uint32(util.LocationOffset(location) + util.SizeInfoSize)
  182. for restSize > 0 {
  183. // Calculate how much data should be written
  184. toCopy := psm.recordSize - uint32(recordOffset)
  185. if restSize < toCopy {
  186. // If the record can contain more than restSize just
  187. // write restSize
  188. toCopy = restSize
  189. }
  190. // Write the data
  191. dataOffset2 := dataOffset + toCopy
  192. recordOffset2 := recordOffset + toCopy
  193. copy(record.Data()[recordOffset:recordOffset2], data[dataOffset:dataOffset2])
  194. // Calculate the rest size and new offset
  195. restSize -= toCopy
  196. dataOffset += toCopy
  197. psm.storagefile.ReleaseInUseID(cursor.Current(), true)
  198. // Go to the next record
  199. if restSize > 0 {
  200. // Error handling is done by surrounding Get calls
  201. next, _ := cursor.Next()
  202. record, err = psm.storagefile.Get(next)
  203. if err != nil {
  204. return err
  205. }
  206. recordOffset = pageview.OffsetData
  207. }
  208. }
  209. return nil
  210. }
  211. /*
  212. allocate allocates a new slot of a given size.
  213. */
  214. func (psm *PhysicalSlotManager) allocate(size uint32) (uint64, error) {
  215. // Normalize slot size
  216. normalizedSize := util.NormalizeSlotSize(size)
  217. // Try to find a free slot which was previously allocated
  218. loc, err := psm.freeManager.Get(normalizedSize)
  219. if err != nil {
  220. return 0, err
  221. }
  222. // If nothing of the right size was previously allocated then allocate
  223. // something new
  224. if loc == 0 {
  225. lastpage := psm.pager.Last(view.TypeDataPage)
  226. loc, err = psm.allocateNew(normalizedSize, lastpage)
  227. if err != nil {
  228. return 0, err
  229. }
  230. } else {
  231. // IF a location was found in the freeManager then try
  232. // to access it to make sure it is available - revert otherwise
  233. slotRecord := util.LocationRecord(loc)
  234. slotOffset := int(util.LocationOffset(loc))
  235. record, err := psm.storagefile.Get(slotRecord)
  236. if err != nil {
  237. // Revert back - the size may now be wrong but this is
  238. // still better than losing the whole record
  239. psm.freeManager.Add(loc, normalizedSize)
  240. return 0, err
  241. }
  242. util.SetCurrentSize(record, slotOffset, 0)
  243. psm.storagefile.ReleaseInUseID(slotRecord, true)
  244. }
  245. return loc, nil
  246. }
  247. /*
  248. allocateNew allocates a new slot in the PagedStorageFile. Errors during this function might
  249. cause the allocation of empty pages. The last allocated page pointers might
  250. get out of sync with the actual data pages.
  251. */
  252. func (psm *PhysicalSlotManager) allocateNew(size uint32, startPage uint64) (uint64, error) {
  253. var record *file.Record
  254. var pv *pageview.DataPage
  255. var offset uint32
  256. var header int
  257. var err error
  258. if startPage == 0 {
  259. // Create a new page if there is no start page
  260. startPage, err = psm.pager.AllocatePage(view.TypeDataPage)
  261. if err != nil {
  262. return 0, err
  263. }
  264. // Get the newly allocated page - all error checking was
  265. // done in the previous AllocatePage call
  266. record, _ = psm.storagefile.Get(startPage)
  267. pv = pageview.NewDataPage(record)
  268. pv.SetOffsetFirst(pageview.OffsetData)
  269. util.SetCurrentSize(record, pageview.OffsetData, 0)
  270. util.SetAvailableSize(record, pageview.OffsetData, 0)
  271. } else {
  272. record, err = psm.storagefile.Get(startPage)
  273. if err != nil {
  274. return 0, err
  275. }
  276. pv = pageview.NewDataPage(record)
  277. }
  278. offset = uint32(pv.OffsetFirst())
  279. if offset == 0 {
  280. // Take care of the special case if the current page was filled
  281. // exactly by the previous row
  282. psm.storagefile.ReleaseInUse(record)
  283. return psm.allocateNew(size, 0)
  284. }
  285. // Check if the last existing page is full - in that case just allocate
  286. // a new page
  287. header = int(offset)
  288. if offset == psm.recordSize || offset > psm.recordSize-util.SizeInfoSize {
  289. // Go to next page
  290. psm.storagefile.ReleaseInUse(record)
  291. return psm.allocateNew(size, 0)
  292. }
  293. slotsize := util.AvailableSize(record, header)
  294. // Loop over the slots and update the header and offset pointer - stop
  295. // if there is an empty space or we reach the end of the page
  296. for slotsize != 0 && offset < psm.recordSize {
  297. offset += slotsize + util.SizeInfoSize
  298. if offset == psm.recordSize || offset > psm.recordSize-util.SizeInfoSize {
  299. // Go to next page
  300. psm.storagefile.ReleaseInUse(record)
  301. return psm.allocateNew(size, 0)
  302. }
  303. header = int(offset)
  304. slotsize = util.AvailableSize(record, header)
  305. }
  306. // At this point we have the location for the new row
  307. loc := util.PackLocation(startPage, uint16(offset))
  308. // Calculate the remaining free space for the current page
  309. rspace := psm.recordSize - offset - util.SizeInfoSize
  310. if rspace < size {
  311. // If the remaining space is not enough we must allocate new pages
  312. // Increase the size if after the allocation only
  313. // ALLOCATION_ROUND_UP_THRESHOLD bytes would remain
  314. // on the record
  315. freeSpaceLastRecord := (size - rspace) % psm.availableRecordSize
  316. if (psm.availableRecordSize - freeSpaceLastRecord) <=
  317. (AllocationRoundUpThreshold + util.SizeInfoSize) {
  318. newsize := size
  319. newsize += (psm.availableRecordSize - freeSpaceLastRecord)
  320. nnewsize := util.NormalizeSlotSize(newsize)
  321. // Only do so if the new value is a valid normalized value
  322. if newsize == nnewsize {
  323. size = newsize
  324. }
  325. }
  326. // Write row header
  327. util.SetAvailableSize(record, header, size)
  328. psm.storagefile.ReleaseInUseID(startPage, true)
  329. // Calculate the rest size which needs to be allocated
  330. allocSize := size - rspace
  331. // Now allocate whole pages
  332. for allocSize >= psm.availableRecordSize {
  333. startPage, err = psm.pager.AllocatePage(view.TypeDataPage)
  334. if err != nil {
  335. return 0, err
  336. }
  337. // Error checking was done in previous AllocatePage call
  338. record, _ = psm.storagefile.Get(startPage)
  339. pv = pageview.NewDataPage(record)
  340. // Since this page contains only data there is no first row
  341. // offset
  342. pv.SetOffsetFirst(0)
  343. psm.storagefile.ReleaseInUseID(startPage, true)
  344. allocSize -= psm.availableRecordSize
  345. }
  346. // If there is still a rest left allocate one more page but reserve
  347. // only a part of it for the row
  348. if allocSize > 0 {
  349. startPage, err = psm.pager.AllocatePage(view.TypeDataPage)
  350. if err != nil {
  351. return 0, err
  352. }
  353. // Error checking was done in previous AllocatePage call
  354. record, _ = psm.storagefile.Get(startPage)
  355. pv = pageview.NewDataPage(record)
  356. pv.SetOffsetFirst(uint16(pageview.OffsetData + allocSize))
  357. psm.storagefile.ReleaseInUseID(startPage, true)
  358. }
  359. } else {
  360. // We found a free space on the current page
  361. // Increase the size if after the allocation only
  362. // ALLOCATION_ROUND_UP_THRESHOLD bytes would remain
  363. // on the record
  364. if (rspace - size) <= (AllocationRoundUpThreshold + util.SizeInfoSize) {
  365. newsize := rspace
  366. nnewsize := util.NormalizeSlotSize(newsize)
  367. // Only do so if the new value is a valid normalized value
  368. if newsize == nnewsize {
  369. size = newsize
  370. }
  371. }
  372. // Write row header
  373. util.SetAvailableSize(record, header, size)
  374. psm.storagefile.ReleaseInUseID(startPage, true)
  375. }
  376. return loc, nil
  377. }