123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package storage
- import (
- "errors"
- "flag"
- "fmt"
- "os"
- "strconv"
- "testing"
- "time"
- "devt.de/krotik/common/fileutil"
- )
- const DBDIR = "storagemanagertest"
- // Main function for all tests in this package
- func TestMain(m *testing.M) {
- flag.Parse()
- // Setup
- if res, _ := fileutil.PathExists(DBDIR); res {
- os.RemoveAll(DBDIR)
- }
- err := os.Mkdir(DBDIR, 0770)
- if err != nil {
- fmt.Print("Could not create test directory:", err.Error())
- os.Exit(1)
- }
- // Run the tests
- res := m.Run()
- // Teardown
- err = os.RemoveAll(DBDIR)
- if err != nil {
- fmt.Print("Could not remove test directory:", err.Error())
- }
- os.Exit(res)
- }
- var enableConcurrencyTest = false
- func TestStorageManagerConcurrency(t *testing.T) {
- // Disabled for normal testing
- if !enableConcurrencyTest {
- return
- }
- var retChans []chan error
- threads := 50
- ops := 1000
- dsm := NewDiskStorageManager(DBDIR+"/ctest_dsm", false, false, true, false)
- cdsm := NewCachedDiskStorageManager(dsm, 50000)
- sm := cdsm
- start := time.Now()
- for i := 1; i < threads+1; i++ {
- retChan := make(chan error)
- retChans = append(retChans, retChan)
- // Kick off thread
- fmt.Println("Id:", strconv.Itoa(i), " start")
- go runConcurrencyTest(strconv.Itoa(i), sm, ops, retChan)
- }
- // Wait for threads to complete
- for i := 0; i < threads; i++ {
- retChan := retChans[i]
- err := <-retChan
- if err != nil {
- fmt.Println("Id:", strconv.Itoa(i), " Error:", err)
- } else {
- fmt.Println("Id:", strconv.Itoa(i), " ok")
- }
- }
- elapsed := time.Since(start)
- fmt.Println("Total time:", elapsed)
- sm.Close()
- }
- var enablePerformanceTest = false
- func TestStorageManagerPerformance(t *testing.T) {
- // Disabled for normal testing
- if !enablePerformanceTest {
- return
- }
- // Test multiple read/write operations in concurrent threads
- start := time.Now()
- // Last iteration here shows the cache running out of available entries -
- // Since we ask for the same elements in the same order we completely
- // loose the benefit of the cache (i.e. oldest elements are removed first)
- for i := 1000; i < 51001; i += 5000 {
- dsm := NewDiskStorageManager(DBDIR+"/ptest_dsm", false, false, true, false)
- cdsm := NewCachedDiskStorageManager(dsm, 50000)
- runPerformanceTest("1", cdsm, i)
- }
- elapsed := time.Since(start)
- fmt.Println("Total time:", elapsed)
- }
- func runConcurrencyTest(id string, sm Manager, ops int, retChan chan error) {
- errorChan := make(chan error)
- tc := &testclient{make([]uint64, 0)}
- // Insert, Fetch, Update, Fetch some data
- start := time.Now()
- go tc.clientInsert(id, sm, ops, errorChan)
- res := <-errorChan
- if res != nil {
- retChan <- res
- }
- go tc.clientFetch(id, "test", sm, ops, errorChan)
- res = <-errorChan
- if res != nil {
- retChan <- res
- }
- go tc.clientUpdate(id, "t35ter", sm, ops, errorChan)
- res = <-errorChan
- if res != nil {
- retChan <- res
- }
- go tc.clientFetch(id, "t35ter", sm, ops, errorChan)
- res = <-errorChan
- if res != nil {
- retChan <- res
- }
- elapsed := time.Since(start).Nanoseconds() / (1000 * 1000)
- fmt.Println("Id:", id, " Strings:", ops, " Time:", elapsed)
- retChan <- nil
- }
- func runPerformanceTest(id string, sm Manager, ops int) {
- var elapsed1, elapsed2, elapsed3, elapsed4, elapsed5, elapsed6, elapsed7 int64
- errorChan := make(chan error)
- tc := &testclient{make([]uint64, 0)}
- // Insert some data
- start := time.Now()
- go tc.clientInsert(id, sm, ops, errorChan)
- res := <-errorChan
- if res != nil {
- fmt.Println("tc.clientInsert:", res)
- }
- elapsed1 = time.Since(start).Nanoseconds() / (1000 * 1000)
- // Read data back
- start = time.Now()
- go tc.clientFetch(id, "test", sm, ops, errorChan)
- res = <-errorChan
- if res != nil {
- fmt.Println("tc.clientFetch:", res)
- }
- elapsed2 = time.Since(start).Nanoseconds() / (1000 * 1000)
- // Read data back a 2nd time
- start = time.Now()
- go tc.clientFetch(id, "test", sm, ops, errorChan)
- res = <-errorChan
- if res != nil {
- fmt.Println("tc.clientFetch:", res)
- }
- elapsed3 = time.Since(start).Nanoseconds() / (1000 * 1000)
- // Update the data without reallocation
- start = time.Now()
- go tc.clientUpdate(id, "t35t", sm, ops, errorChan)
- res = <-errorChan
- if res != nil {
- fmt.Println("tc.clientUpdate:", res)
- }
- elapsed4 = time.Since(start).Nanoseconds() / (1000 * 1000)
- // Read data back a 3nd time
- start = time.Now()
- go tc.clientFetch(id, "t35t", sm, ops, errorChan)
- res = <-errorChan
- if res != nil {
- fmt.Println("tc.clientFetch:", res)
- }
- elapsed5 = time.Since(start).Nanoseconds() / (1000 * 1000)
- // Update the data with reallocation
- start = time.Now()
- go tc.clientUpdate(id, "teststring", sm, ops, errorChan)
- res = <-errorChan
- if res != nil {
- fmt.Println("tc.clientUpdate:", res)
- }
- elapsed6 = time.Since(start).Nanoseconds() / (1000 * 1000)
- // Read data back a 4th time
- start = time.Now()
- go tc.clientFetch(id, "teststring", sm, ops, errorChan)
- <-errorChan
- elapsed7 = time.Since(start).Nanoseconds() / (1000 * 1000)
- fmt.Println("Strings,", ops, ",Insert,", elapsed1, ",Fetch1,", elapsed2, ",Fetch2,",
- elapsed3, ",Update,", elapsed4, ",Fetch3,", elapsed5, ",Update Realloc,",
- elapsed6, ",Fetch,", elapsed7)
- sm.Close()
- }
- type testclient struct {
- locs []uint64
- }
- /*
- clientInsert inserts some test data.
- */
- func (tc *testclient) clientInsert(id string, sm Manager, ops int, errorChan chan error) {
- // Write stull
- for i := 0; i < ops; i++ {
- loc, err := sm.Insert(fmt.Sprint("test-", id, i))
- if err != nil {
- errorChan <- errors.New(fmt.Sprint("Error during insert thread:", id, " iteration:", i, " error:", err.Error()))
- return
- }
- tc.locs = append(tc.locs, loc)
- }
- // Flush changes to disk
- if err := sm.Flush(); err != nil {
- errorChan <- errors.New(fmt.Sprint("Error during flush thread:", id, " error:", err.Error()))
- return
- }
- errorChan <- nil
- }
- /*
- clientFetch reads back test data.
- */
- func (tc *testclient) clientFetch(id string, teststring string, sm Manager, ops int, errorChan chan error) {
- var obj interface{}
- var res string
- var err error
- for i := 0; i < ops; i++ {
- obj, _ = sm.FetchCached(tc.locs[i])
- if obj == nil {
- err = sm.Fetch(tc.locs[i], &res)
- } else {
- res = obj.(string)
- }
- if err != nil {
- errorChan <- errors.New(fmt.Sprint("Error during fetch thread:", id, " iteration:", i, " error:", err.Error()))
- return
- }
- if res != fmt.Sprint(teststring, "-", id, i) {
- errorChan <- errors.New(fmt.Sprint("Unexpected fetch result thread:", id, " iteration:", i, " result:", res))
- return
- }
- }
- errorChan <- nil
- }
- /*
- clientUpdate updates test data without requiring relocation.
- */
- func (tc *testclient) clientUpdate(id string, teststring string, sm Manager, ops int, errorChan chan error) {
- // Write stull
- for i := 0; i < ops; i++ {
- err := sm.Update(tc.locs[i], fmt.Sprint(teststring, "-", id, i))
- if err != nil {
- errorChan <- errors.New(fmt.Sprint("Error during update thread:", id, " iteration:", i, " error:", err.Error()))
- return
- }
- }
- // Flush changes to disk
- if err := sm.Flush(); err != nil {
- errorChan <- errors.New(fmt.Sprint("Error during flush thread:", id, " error:", err.Error()))
- return
- }
- errorChan <- nil
- }
|