123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package slotting
- import (
- "io"
- "devt.de/krotik/eliasdb/storage/file"
- "devt.de/krotik/eliasdb/storage/paging"
- "devt.de/krotik/eliasdb/storage/paging/view"
- "devt.de/krotik/eliasdb/storage/slotting/pageview"
- "devt.de/krotik/eliasdb/storage/util"
- )
- /*
- AllocationRoundUpThreshold is used to decide if a slot size should be rounded up. If an
- allocation would leave less than AllocationRoundUpThreshold + 1 left on the page then
- the allocation size is rounded up to fit the page
- */
- const AllocationRoundUpThreshold = 16
- /*
- PhysicalSlotManager data structure
- */
- type PhysicalSlotManager struct {
- storagefile *file.StorageFile // StorageFile which is wrapped
- pager *paging.PagedStorageFile // Pager for StorageFile
- freeManager *FreePhysicalSlotManager // Manager for free slots
- recordSize uint32 // Size of records
- availableRecordSize uint32 // Available space on records
- }
- /*
- NewPhysicalSlotManager creates a new object to manage physical slots. This
- factory function requires two PagedStorageFiles the first will hold the actual
- physical slots, the second is used to manage free physical slots.
- */
- func NewPhysicalSlotManager(psf *paging.PagedStorageFile,
- fpsf *paging.PagedStorageFile, onlyAppend bool) *PhysicalSlotManager {
- sf := psf.StorageFile()
- freeManager := NewFreePhysicalSlotManager(fpsf, onlyAppend)
- recordSize := sf.RecordSize()
- return &PhysicalSlotManager{sf, psf, freeManager,
- recordSize, recordSize - pageview.OffsetData}
- }
- /*
- Insert inserts a new piece of data.
- */
- func (psm *PhysicalSlotManager) Insert(data []byte, start uint32, length uint32) (uint64, error) {
- if length == 0 {
- panic("Cannot insert 0 bytes of data")
- }
- location, err := psm.allocate(length)
- if err != nil {
- return 0, err
- }
- err = psm.write(location, data, start, length)
- if err != nil {
- // Since the write operation failed declare the previous allocated space
- // as free
- psm.freeManager.Add(location, length)
- return 0, err
- }
- return location, nil
- }
- /*
- Update updates the data in a slot.
- */
- func (psm *PhysicalSlotManager) Update(location uint64, data []byte, start uint32, length uint32) (uint64, error) {
- record, err := psm.storagefile.Get(util.LocationRecord(location))
- if err != nil {
- return 0, err
- }
- offset := util.LocationOffset(location)
- availableSize := util.AvailableSize(record, int(offset))
- psm.storagefile.ReleaseInUse(record)
- if length > availableSize || availableSize-length > util.MaxAvailableSizeDifference {
- // Reallocate if the new data is too big for the old slot or if the
- // data is much smaller than the available space in the slot (i.e.
- // there would be a lot of waste)
- // Error handling for free call is done by the first Get call of
- // this function.
- psm.Free(location)
- location, err = psm.allocate(length)
- if err != nil {
- return 0, err
- }
- }
- err = psm.write(location, data, start, length)
- if err != nil {
- return 0, err
- }
- return location, nil
- }
- /*
- Fetch fetches data from a specified location.
- */
- func (psm *PhysicalSlotManager) Fetch(location uint64, writer io.Writer) error {
- cursor := paging.NewPageCursor(psm.pager, view.TypeDataPage, util.LocationRecord(location))
- record, err := psm.storagefile.Get(cursor.Current())
- if err != nil {
- return err
- }
- length := util.CurrentSize(record, int(util.LocationOffset(location)))
- if length == 0 {
- // Return at this point if there is nothing to read
- psm.storagefile.ReleaseInUseID(cursor.Current(), false)
- return nil
- }
- // Read now the bytes
- restSize := length
- recordOffset := uint32(util.LocationOffset(location) + util.SizeInfoSize)
- for restSize > 0 {
- // Calculate how much data should be read
- toCopy := psm.recordSize - uint32(recordOffset)
- if restSize < toCopy {
- // If the record can contain more than restSize just
- // read restSize
- toCopy = restSize
- }
- // Read the data
- writer.Write(record.Data()[recordOffset : recordOffset+toCopy])
- // Calculate the rest size and new offset
- restSize -= toCopy
- psm.storagefile.ReleaseInUseID(cursor.Current(), false)
- // Go to the next record
- if restSize > 0 {
- // Error handling is done by surrounding Get calls
- next, _ := cursor.Next()
- record, err = psm.storagefile.Get(next)
- if err != nil {
- return err
- }
- recordOffset = pageview.OffsetData
- }
- }
- return nil
- }
- /*
- Free frees a given physical slot. The given slot is given to the FreePhysicalSlotManager.
- */
- func (psm *PhysicalSlotManager) Free(location uint64) error {
- slotRecord := util.LocationRecord(location)
- slotOffset := int(util.LocationOffset(location))
- record, err := psm.storagefile.Get(slotRecord)
- if err != nil {
- return err
- }
- util.SetCurrentSize(record, slotOffset, 0)
- psm.storagefile.ReleaseInUseID(slotRecord, true)
- psm.freeManager.Add(location, util.AvailableSize(record, slotOffset))
- return nil
- }
- /*
- Flush writes all pending changes.
- */
- func (psm *PhysicalSlotManager) Flush() error {
- return psm.freeManager.Flush()
- }
- /*
- write writes data to a location. Should an error occurs, then the already written data
- is not cleaned up.
- */
- func (psm *PhysicalSlotManager) write(location uint64, data []byte, start uint32, length uint32) error {
- cursor := paging.NewPageCursor(psm.pager, view.TypeDataPage, util.LocationRecord(location))
- record, err := psm.storagefile.Get(cursor.Current())
- if err != nil {
- return err
- }
- util.SetCurrentSize(record, int(util.LocationOffset(location)), length)
- if length == 0 {
- // Return at this point if there is nothing to write
- psm.storagefile.ReleaseInUseID(cursor.Current(), true)
- return nil
- }
- // Write now the bytes
- restSize := length
- dataOffset := start
- recordOffset := uint32(util.LocationOffset(location) + util.SizeInfoSize)
- for restSize > 0 {
- // Calculate how much data should be written
- toCopy := psm.recordSize - uint32(recordOffset)
- if restSize < toCopy {
- // If the record can contain more than restSize just
- // write restSize
- toCopy = restSize
- }
- // Write the data
- dataOffset2 := dataOffset + toCopy
- recordOffset2 := recordOffset + toCopy
- copy(record.Data()[recordOffset:recordOffset2], data[dataOffset:dataOffset2])
- // Calculate the rest size and new offset
- restSize -= toCopy
- dataOffset += toCopy
- psm.storagefile.ReleaseInUseID(cursor.Current(), true)
- // Go to the next record
- if restSize > 0 {
- // Error handling is done by surrounding Get calls
- next, _ := cursor.Next()
- record, err = psm.storagefile.Get(next)
- if err != nil {
- return err
- }
- recordOffset = pageview.OffsetData
- }
- }
- return nil
- }
- /*
- allocate allocates a new slot of a given size.
- */
- func (psm *PhysicalSlotManager) allocate(size uint32) (uint64, error) {
- // Normalize slot size
- normalizedSize := util.NormalizeSlotSize(size)
- // Try to find a free slot which was previously allocated
- loc, err := psm.freeManager.Get(normalizedSize)
- if err != nil {
- return 0, err
- }
- // If nothing of the right size was previously allocated then allocate
- // something new
- if loc == 0 {
- lastpage := psm.pager.Last(view.TypeDataPage)
- loc, err = psm.allocateNew(normalizedSize, lastpage)
- if err != nil {
- return 0, err
- }
- } else {
- // IF a location was found in the freeManager then try
- // to access it to make sure it is available - revert otherwise
- slotRecord := util.LocationRecord(loc)
- slotOffset := int(util.LocationOffset(loc))
- record, err := psm.storagefile.Get(slotRecord)
- if err != nil {
- // Revert back - the size may now be wrong but this is
- // still better than losing the whole record
- psm.freeManager.Add(loc, normalizedSize)
- return 0, err
- }
- util.SetCurrentSize(record, slotOffset, 0)
- psm.storagefile.ReleaseInUseID(slotRecord, true)
- }
- return loc, nil
- }
- /*
- allocateNew allocates a new slot in the PagedStorageFile. Errors during this function might
- cause the allocation of empty pages. The last allocated page pointers might
- get out of sync with the actual data pages.
- */
- func (psm *PhysicalSlotManager) allocateNew(size uint32, startPage uint64) (uint64, error) {
- var record *file.Record
- var pv *pageview.DataPage
- var offset uint32
- var header int
- var err error
- if startPage == 0 {
- // Create a new page if there is no start page
- startPage, err = psm.pager.AllocatePage(view.TypeDataPage)
- if err != nil {
- return 0, err
- }
- // Get the newly allocated page - all error checking was
- // done in the previous AllocatePage call
- record, _ = psm.storagefile.Get(startPage)
- pv = pageview.NewDataPage(record)
- pv.SetOffsetFirst(pageview.OffsetData)
- util.SetCurrentSize(record, pageview.OffsetData, 0)
- util.SetAvailableSize(record, pageview.OffsetData, 0)
- } else {
- record, err = psm.storagefile.Get(startPage)
- if err != nil {
- return 0, err
- }
- pv = pageview.NewDataPage(record)
- }
- offset = uint32(pv.OffsetFirst())
- if offset == 0 {
- // Take care of the special case if the current page was filled
- // exactly by the previous row
- psm.storagefile.ReleaseInUse(record)
- return psm.allocateNew(size, 0)
- }
- // Check if the last existing page is full - in that case just allocate
- // a new page
- header = int(offset)
- if offset == psm.recordSize || offset > psm.recordSize-util.SizeInfoSize {
- // Go to next page
- psm.storagefile.ReleaseInUse(record)
- return psm.allocateNew(size, 0)
- }
- slotsize := util.AvailableSize(record, header)
- // Loop over the slots and update the header and offset pointer - stop
- // if there is an empty space or we reach the end of the page
- for slotsize != 0 && offset < psm.recordSize {
- offset += slotsize + util.SizeInfoSize
- if offset == psm.recordSize || offset > psm.recordSize-util.SizeInfoSize {
- // Go to next page
- psm.storagefile.ReleaseInUse(record)
- return psm.allocateNew(size, 0)
- }
- header = int(offset)
- slotsize = util.AvailableSize(record, header)
- }
- // At this point we have the location for the new row
- loc := util.PackLocation(startPage, uint16(offset))
- // Calculate the remaining free space for the current page
- rspace := psm.recordSize - offset - util.SizeInfoSize
- if rspace < size {
- // If the remaining space is not enough we must allocate new pages
- // Increase the size if after the allocation only
- // ALLOCATION_ROUND_UP_THRESHOLD bytes would remain
- // on the record
- freeSpaceLastRecord := (size - rspace) % psm.availableRecordSize
- if (psm.availableRecordSize - freeSpaceLastRecord) <=
- (AllocationRoundUpThreshold + util.SizeInfoSize) {
- newsize := size
- newsize += (psm.availableRecordSize - freeSpaceLastRecord)
- nnewsize := util.NormalizeSlotSize(newsize)
- // Only do so if the new value is a valid normalized value
- if newsize == nnewsize {
- size = newsize
- }
- }
- // Write row header
- util.SetAvailableSize(record, header, size)
- psm.storagefile.ReleaseInUseID(startPage, true)
- // Calculate the rest size which needs to be allocated
- allocSize := size - rspace
- // Now allocate whole pages
- for allocSize >= psm.availableRecordSize {
- startPage, err = psm.pager.AllocatePage(view.TypeDataPage)
- if err != nil {
- return 0, err
- }
- // Error checking was done in previous AllocatePage call
- record, _ = psm.storagefile.Get(startPage)
- pv = pageview.NewDataPage(record)
- // Since this page contains only data there is no first row
- // offset
- pv.SetOffsetFirst(0)
- psm.storagefile.ReleaseInUseID(startPage, true)
- allocSize -= psm.availableRecordSize
- }
- // If there is still a rest left allocate one more page but reserve
- // only a part of it for the row
- if allocSize > 0 {
- startPage, err = psm.pager.AllocatePage(view.TypeDataPage)
- if err != nil {
- return 0, err
- }
- // Error checking was done in previous AllocatePage call
- record, _ = psm.storagefile.Get(startPage)
- pv = pageview.NewDataPage(record)
- pv.SetOffsetFirst(uint16(pageview.OffsetData + allocSize))
- psm.storagefile.ReleaseInUseID(startPage, true)
- }
- } else {
- // We found a free space on the current page
- // Increase the size if after the allocation only
- // ALLOCATION_ROUND_UP_THRESHOLD bytes would remain
- // on the record
- if (rspace - size) <= (AllocationRoundUpThreshold + util.SizeInfoSize) {
- newsize := rspace
- nnewsize := util.NormalizeSlotSize(newsize)
- // Only do so if the new value is a valid normalized value
- if newsize == nnewsize {
- size = newsize
- }
- }
- // Write row header
- util.SetAvailableSize(record, header, size)
- psm.storagefile.ReleaseInUseID(startPage, true)
- }
- return loc, nil
- }
|