/* * EliasDB * * Copyright 2016 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ package storage import ( "bytes" "encoding/gob" "errors" "fmt" "io" "sync" "time" "devt.de/krotik/common/errorutil" "devt.de/krotik/common/fileutil" "devt.de/krotik/common/lockutil" "devt.de/krotik/eliasdb/storage/file" "devt.de/krotik/eliasdb/storage/paging" "devt.de/krotik/eliasdb/storage/slotting" "devt.de/krotik/eliasdb/storage/util" ) /* VERSION constains the version of the storage API */ const VERSION = 1 /* FileSiffixLockfile is the file ending for lockfiles */ const FileSiffixLockfile = "lck" /* FileSuffixLogicalSlots is the file ending for a logical slot storage */ const FileSuffixLogicalSlots = "ix" /* FileSuffixLogicalFreeSlots is the file ending for a free logical slot storage */ const FileSuffixLogicalFreeSlots = "ixf" /* FileSuffixPhysicalSlots is the file ending for a physical slot storage */ const FileSuffixPhysicalSlots = "db" /* FileSuffixPhysicalFreeSlots is the file ending for a free physical slot storage */ const FileSuffixPhysicalFreeSlots = "dbf" /* BlockSizePhysicalSlots is the block for a physical slot file. Physical slots will contain actual data they need to have fairly large block sizes. */ const BlockSizePhysicalSlots = 1024 * 8 /* BlockSizeLogicalSlots is the block for a logical slot file. Logical slots contain only pointers they only need small blocks. */ const BlockSizeLogicalSlots = 1024 * 2 /* BlockSizeFreeSlots is the block for a free slot files. Files containing only free slot pointers will always be small. They only need tiny blocks. */ const BlockSizeFreeSlots = 1024 /* ErrReadonly is returned when attempting a write operation on a readonly datastore. */ var ErrReadonly = errors.New("Storage is readonly") /* DiskStorageManager is a storage manager which can store any gob serializable datastructure. */ type DiskStorageManager struct { *ByteDiskStorageManager } /* NewDiskStorageManager creates a new disk storage manager with optional transaction management. If the onlyAppend flag is set then the manager will not attempt to reuse space once it was released after use. If the transDisabled flag is set then the storage manager will not support transactions. */ func NewDiskStorageManager(filename string, readonly bool, onlyAppend bool, transDisabled bool, lockfileDisabled bool) *DiskStorageManager { return &DiskStorageManager{NewByteDiskStorageManager(filename, readonly, onlyAppend, transDisabled, lockfileDisabled)} } /* Name returns the name of the StorageManager instance. */ func (dsm *DiskStorageManager) Name() string { return fmt.Sprint("DiskStorageFile:", dsm.ByteDiskStorageManager.filename) } /* Serialize serializes an object into a byte slice. */ func (dsm *DiskStorageManager) Serialize(o interface{}) ([]byte, error) { // Request a buffer from the buffer pool bb := BufferPool.Get().(*bytes.Buffer) defer func() { bb.Reset() BufferPool.Put(bb) }() // Serialize the object into a gob bytes stream err := gob.NewEncoder(bb).Encode(o) if err != nil { return nil, err } return bb.Bytes(), nil } /* Insert inserts an object and return its storage location. */ func (dsm *DiskStorageManager) Insert(o interface{}) (uint64, error) { b, err := dsm.Serialize(o) if err != nil { return 0, err } return dsm.ByteDiskStorageManager.Insert(b) } /* Update updates a storage location. */ func (dsm *DiskStorageManager) Update(loc uint64, o interface{}) error { b, err := dsm.Serialize(o) if err != nil { return err } return dsm.ByteDiskStorageManager.Update(loc, b) } /* Fetch fetches an object from a given storage location and writes it to a given data container. */ func (dsm *DiskStorageManager) Fetch(loc uint64, o interface{}) error { // Request a buffer from the buffer pool bb := BufferPool.Get().(*bytes.Buffer) defer func() { bb.Reset() BufferPool.Put(bb) }() if err := dsm.ByteDiskStorageManager.Fetch(loc, bb); err != nil { return err } // Deserialize the object from a gob bytes stream return gob.NewDecoder(bb).Decode(o) } /* ByteDiskStorageManager is a disk storage manager which can only store byte slices. */ type ByteDiskStorageManager struct { filename string // Filename for all managed files readonly bool // Flag to make the storage readonly onlyAppend bool // Flag for append-only mode transDisabled bool // Flag if transactions are enabled mutex *sync.Mutex // Mutex to protect actual file operations physicalSlotsSf *file.StorageFile // StorageFile for physical slots physicalSlotsPager *paging.PagedStorageFile // Pager for physical slots StorageFile physicalFreeSlotsSf *file.StorageFile // StorageFile for free physical slots physicalFreeSlotsPager *paging.PagedStorageFile // Pager for free physical slots StorageFile physicalSlotManager *slotting.PhysicalSlotManager // Manager for physical slots logicalSlotsSf *file.StorageFile // StorageFile for logical slots logicalSlotsPager *paging.PagedStorageFile // Pager for logical slots StorageFile logicalFreeSlotsSf *file.StorageFile // StorageFile for free logical slots logicalFreeSlotsPager *paging.PagedStorageFile // Pager for free logical slots StorageFile logicalSlotManager *slotting.LogicalSlotManager // Manager for physical slots lockfile *lockutil.LockFile // Lockfile manager } /* NewByteDiskStorageManager creates a new disk storage manager with optional transaction management which can only store byte slices. If the onlyAppend flag is set then the manager will not attempt to reuse space once it was released after use. If the transDisabled flag is set then the storage manager will not support transactions. */ func NewByteDiskStorageManager(filename string, readonly bool, onlyAppend bool, transDisabled bool, lockfileDisabled bool) *ByteDiskStorageManager { var lf *lockutil.LockFile // Create a lockfile which is checked every 50 milliseconds if !lockfileDisabled { lf = lockutil.NewLockFile(fmt.Sprintf("%v.%v", filename, FileSiffixLockfile), time.Duration(50)*time.Millisecond) } bdsm := &ByteDiskStorageManager{filename, readonly, onlyAppend, transDisabled, &sync.Mutex{}, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, lf} err := initByteDiskStorageManager(bdsm) if err != nil { panic(fmt.Sprintf("Could not initialize DiskStroageManager: %v", filename)) } return bdsm } /* DataFileExist checks if the main datastore file exists. */ func DataFileExist(filename string) bool { ret, err := fileutil.PathExists(fmt.Sprintf("%v.%v.0", filename, FileSuffixPhysicalSlots)) if err != nil { return false } return ret } /* Name returns the name of the StorageManager instance. */ func (bdsm *ByteDiskStorageManager) Name() string { return fmt.Sprint("ByteDiskStorageFile:", bdsm.filename) } /* Root returns a root value. */ func (bdsm *ByteDiskStorageManager) Root(root int) uint64 { bdsm.mutex.Lock() defer bdsm.mutex.Unlock() bdsm.checkFileOpen() return bdsm.physicalSlotsPager.Header().Root(root) } /* SetRoot writes a root value. */ func (bdsm *ByteDiskStorageManager) SetRoot(root int, val uint64) { // When readonly this operation becomes a NOP if bdsm.readonly { return } bdsm.mutex.Lock() defer bdsm.mutex.Unlock() bdsm.checkFileOpen() bdsm.physicalSlotsPager.Header().SetRoot(root, val) } /* Insert inserts an object and return its storage location. */ func (bdsm *ByteDiskStorageManager) Insert(o interface{}) (uint64, error) { bdsm.checkFileOpen() // Fail operation if readonly if bdsm.readonly { return 0, ErrReadonly } // Continue single threaded from here on bdsm.mutex.Lock() defer bdsm.mutex.Unlock() // Store the data in a physical slot b := o.([]byte) ploc, err := bdsm.physicalSlotManager.Insert(b, 0, uint32(len(b))) if err != nil { return 0, err } // Get a logical slot for the physical slot loc, err := bdsm.logicalSlotManager.Insert(ploc) if err != nil { return 0, err } return loc, nil } /* Update updates a storage location. */ func (bdsm *ByteDiskStorageManager) Update(loc uint64, o interface{}) error { bdsm.checkFileOpen() // Fail operation if readonly if bdsm.readonly { return ErrReadonly } // Get the physical slot for the given logical slot bdsm.mutex.Lock() ploc, err := bdsm.logicalSlotManager.Fetch(loc) bdsm.mutex.Unlock() if err != nil { return err } if ploc == 0 { return NewStorageManagerError(ErrSlotNotFound, fmt.Sprint("Location:", util.LocationRecord(loc), util.LocationOffset(loc)), bdsm.Name()) } // Continue single threaded from here on bdsm.mutex.Lock() defer bdsm.mutex.Unlock() // Update the physical record b := o.([]byte) newPloc, err := bdsm.physicalSlotManager.Update(ploc, b, 0, uint32(len(b))) if err != nil { return err } // Update the logical slot if the physical slot has changed if newPloc != ploc { return bdsm.logicalSlotManager.Update(loc, newPloc) } return nil } /* Fetch fetches an object from a given storage location and writes it to a given data container. */ func (bdsm *ByteDiskStorageManager) Fetch(loc uint64, o interface{}) error { bdsm.checkFileOpen() // Get the physical slot for the given logical slot bdsm.mutex.Lock() ploc, err := bdsm.logicalSlotManager.Fetch(loc) bdsm.mutex.Unlock() if err != nil { return err } if ploc == 0 { return NewStorageManagerError(ErrSlotNotFound, fmt.Sprint("Location:", util.LocationRecord(loc), util.LocationOffset(loc)), bdsm.Name()) } // Request the stored bytes bdsm.mutex.Lock() if w, ok := o.(io.Writer); ok { err = bdsm.physicalSlotManager.Fetch(ploc, w) } else { var b bytes.Buffer err = bdsm.physicalSlotManager.Fetch(ploc, &b) copy(o.([]byte), b.Bytes()) } bdsm.mutex.Unlock() return err } /* FetchCached is not implemented for a ByteDiskStorageManager. Only defined to satisfy the StorageManager interface. */ func (bdsm *ByteDiskStorageManager) FetchCached(loc uint64) (interface{}, error) { return nil, NewStorageManagerError(ErrNotInCache, "", bdsm.Name()) } /* Free frees a storage location. */ func (bdsm *ByteDiskStorageManager) Free(loc uint64) error { bdsm.checkFileOpen() // Fail operation if readonly if bdsm.readonly { return ErrReadonly } // Continue single threaded from here on bdsm.mutex.Lock() defer bdsm.mutex.Unlock() // Get the physical slot for the given logical slot ploc, err := bdsm.logicalSlotManager.Fetch(loc) if err != nil { return err } if ploc == 0 { return NewStorageManagerError(ErrSlotNotFound, fmt.Sprint("Location:", util.LocationRecord(loc), util.LocationOffset(loc)), bdsm.Name()) } // First try to free the physical slot since here is the data // if this fails we don't touch the logical slot err = bdsm.physicalSlotManager.Free(ploc) if err != nil { return err } // This is very unlikely to fail - either way we can't do anything // at this point since the physical slot has already gone away return bdsm.logicalSlotManager.Free(loc) } /* Flush writes all pending changes to disk. */ func (bdsm *ByteDiskStorageManager) Flush() error { bdsm.checkFileOpen() // When readonly this operation becomes a NOP if bdsm.readonly { return nil } ce := errorutil.NewCompositeError() // Continue single threaded from here on bdsm.mutex.Lock() defer bdsm.mutex.Unlock() // Write pending changes if err := bdsm.physicalSlotManager.Flush(); err != nil { ce.Add(err) } if err := bdsm.logicalSlotManager.Flush(); err != nil { ce.Add(err) } if err := bdsm.physicalSlotsPager.Flush(); err != nil { ce.Add(err) } if err := bdsm.physicalFreeSlotsPager.Flush(); err != nil { ce.Add(err) } if err := bdsm.logicalSlotsPager.Flush(); err != nil { ce.Add(err) } if err := bdsm.logicalFreeSlotsPager.Flush(); err != nil { ce.Add(err) } // Return errors if there were any if ce.HasErrors() { return ce } return nil } /* Rollback cancels all pending changes which have not yet been written to disk. */ func (bdsm *ByteDiskStorageManager) Rollback() error { // Rollback has no effect if transactions are disabled or when readonly if bdsm.transDisabled || bdsm.readonly { return nil } bdsm.checkFileOpen() ce := errorutil.NewCompositeError() // Continue single threaded from here on bdsm.mutex.Lock() defer bdsm.mutex.Unlock() // Write pending manager changes to transaction log if err := bdsm.physicalSlotManager.Flush(); err != nil { ce.Add(err) } if err := bdsm.logicalSlotManager.Flush(); err != nil { ce.Add(err) } // Rollback current transaction if err := bdsm.physicalSlotsPager.Rollback(); err != nil { ce.Add(err) } if err := bdsm.physicalFreeSlotsPager.Rollback(); err != nil { ce.Add(err) } if err := bdsm.logicalSlotsPager.Rollback(); err != nil { ce.Add(err) } if err := bdsm.logicalFreeSlotsPager.Rollback(); err != nil { ce.Add(err) } // Return errors if there were any if ce.HasErrors() { return ce } return nil } /* Close closes the StorageManager and write all pending changes to disk. */ func (bdsm *ByteDiskStorageManager) Close() error { bdsm.checkFileOpen() ce := errorutil.NewCompositeError() // Continue single threaded from here on bdsm.mutex.Lock() defer bdsm.mutex.Unlock() // Try to close all files and collect any errors which are returned if err := bdsm.physicalSlotsPager.Close(); err != nil { ce.Add(err) } if err := bdsm.physicalFreeSlotsPager.Close(); err != nil { ce.Add(err) } if err := bdsm.logicalSlotsPager.Close(); err != nil { ce.Add(err) } if err := bdsm.logicalFreeSlotsPager.Close(); err != nil { ce.Add(err) } // Return errors if there were any if ce.HasErrors() { return ce } // Release all file related objects bdsm.physicalSlotsSf = nil bdsm.physicalSlotsPager = nil bdsm.physicalFreeSlotsSf = nil bdsm.physicalFreeSlotsPager = nil bdsm.physicalSlotManager = nil bdsm.logicalSlotsSf = nil bdsm.logicalSlotsPager = nil bdsm.logicalFreeSlotsSf = nil bdsm.logicalFreeSlotsPager = nil bdsm.logicalSlotManager = nil if bdsm.lockfile != nil { return bdsm.lockfile.Finish() } return nil } /* checkFileOpen checks that the files on disk are still open. */ func (bdsm *ByteDiskStorageManager) checkFileOpen() { if bdsm.physicalSlotsSf == nil { panic(fmt.Sprint("Trying to access storage after it was closed: ", bdsm.filename)) } if bdsm.lockfile != nil && !bdsm.lockfile.WatcherRunning() { err := bdsm.lockfile.Finish() panic(fmt.Sprint("Error while checking lockfile:", err)) } } /* initByteDiskStorageManager initialises the file managers of a given ByteDiskStorageManager. */ func initByteDiskStorageManager(bdsm *ByteDiskStorageManager) error { // Kick off the lockfile watcher if bdsm.lockfile != nil { if err := bdsm.lockfile.Start(); err != nil { panic(fmt.Sprintf("Could not take ownership of lockfile %v: %v", bdsm.filename, err)) } } // Try to open all files and collect all errors ce := errorutil.NewCompositeError() sf, pager, err := createFileAndPager( fmt.Sprintf("%v.%v", bdsm.filename, FileSuffixPhysicalSlots), BlockSizePhysicalSlots, bdsm) if err != nil { ce.Add(err) } bdsm.physicalSlotsSf = sf bdsm.physicalSlotsPager = pager sf, pager, err = createFileAndPager( fmt.Sprintf("%v.%v", bdsm.filename, FileSuffixPhysicalFreeSlots), BlockSizeFreeSlots, bdsm) if err != nil { ce.Add(err) } bdsm.physicalFreeSlotsSf = sf bdsm.physicalFreeSlotsPager = pager if !ce.HasErrors() { bdsm.physicalSlotManager = slotting.NewPhysicalSlotManager(bdsm.physicalSlotsPager, bdsm.physicalFreeSlotsPager, bdsm.onlyAppend) } sf, pager, err = createFileAndPager( fmt.Sprintf("%v.%v", bdsm.filename, FileSuffixLogicalSlots), BlockSizeLogicalSlots, bdsm) if err != nil { ce.Add(err) } bdsm.logicalSlotsSf = sf bdsm.logicalSlotsPager = pager sf, pager, err = createFileAndPager( fmt.Sprintf("%v.%v", bdsm.filename, FileSuffixLogicalFreeSlots), BlockSizeFreeSlots, bdsm) if err != nil { ce.Add(err) } bdsm.logicalFreeSlotsSf = sf bdsm.logicalFreeSlotsPager = pager if !ce.HasErrors() { bdsm.logicalSlotManager = slotting.NewLogicalSlotManager(bdsm.logicalSlotsPager, bdsm.logicalFreeSlotsPager) } // If there were any file related errors return at this point if ce.HasErrors() { // Release the lockfile if there were errors if bdsm.lockfile != nil { bdsm.lockfile.Finish() } return ce } // Check version version := bdsm.Root(RootIDVersion) if version > VERSION { // Try to clean up bdsm.Close() panic(fmt.Sprint("Cannot open datastore ", bdsm.filename, " - version of disk files is "+ "newer than supported version. Supported version:", VERSION, " Disk files version:", version)) } if version != VERSION { bdsm.SetRoot(RootIDVersion, VERSION) } return nil } /* createFileAndPager creates a storagefile and a pager. */ func createFileAndPager(filename string, recordSize uint32, bdsm *ByteDiskStorageManager) (*file.StorageFile, *paging.PagedStorageFile, error) { sf, err := file.NewStorageFile(filename, recordSize, bdsm.transDisabled) if err != nil { return nil, nil, err } pager, err := paging.NewPagedStorageFile(sf) return sf, pager, err }