Przeglądaj źródła

feat: Large scale import/export functionality

Matthias Ladkau 2 lat temu
rodzic
commit
d93b21702f
5 zmienionych plików z 503 dodań i 38 usunięć
  1. 103 34
      cli/eliasdb.go
  2. 73 0
      graph/graphmanager_test.go
  3. 178 4
      graph/import_export.go
  4. 102 0
      graph/import_export_test.go
  5. 47 0
      importexport.md

+ 103 - 34
cli/eliasdb.go

@@ -45,6 +45,7 @@ import (
 	"io"
 	"io/ioutil"
 	"os"
+	"path"
 	"path/filepath"
 	"strings"
 	"time"
@@ -294,7 +295,9 @@ func handleServerCommandLine(gm *graph.Manager) bool {
 	var ecalConsole *bool
 
 	importDb := flag.String("import", "", "Import a database from a zip file")
+	importDbLS := flag.String("import-ls", "", "Large scale import from a directory")
 	exportDb := flag.String("export", "", "Export the current database to a zip file")
+	exportDbLS := flag.String("export-ls", "", "Large scale export to a directory")
 
 	if config.Bool(config.EnableECALScripts) {
 		ecalConsole = flag.Bool("ecal-console", false, "Start an interactive interpreter console for ECAL")
@@ -319,6 +322,62 @@ func handleServerCommandLine(gm *graph.Manager) bool {
 		return true
 	}
 
+	err = handleSimpleImportExport(importDb, exportDb, gm)
+
+	err = handleLargeScaleImportExport(importDbLS, exportDbLS, gm)
+
+	if ecalConsole != nil && *ecalConsole && err == nil {
+		var term termutil.ConsoleLineTerminal
+
+		isExitLine := func(s string) bool {
+			return s == "exit" || s == "q" || s == "quit" || s == "bye" || s == "\x04"
+		}
+
+		term, err = termutil.NewConsoleLineTerminal(os.Stdout)
+		if err == nil {
+			term, err = termutil.AddHistoryMixin(term, "", isExitLine)
+			if err == nil {
+				tid := api.SI.Interpreter.RuntimeProvider.NewThreadID()
+
+				runECALConsole := func(delay int) {
+					defer term.StopTerm()
+
+					time.Sleep(time.Duration(delay) * time.Millisecond)
+
+					term.WriteString(fmt.Sprintln("Type 'q' or 'quit' to exit the shell and '?' to get help"))
+
+					line, err := term.NextLine()
+					for err == nil && !isExitLine(line) {
+						trimmedLine := strings.TrimSpace(line)
+
+						api.SI.Interpreter.HandleInput(term, trimmedLine, tid)
+
+						line, err = term.NextLine()
+					}
+				}
+
+				if err = term.StartTerm(); err == nil {
+					if *noServ {
+						runECALConsole(0)
+					} else {
+						go runECALConsole(3000)
+					}
+				}
+			}
+		}
+	}
+
+	if err != nil {
+		fmt.Println(err.Error())
+		return true
+	}
+
+	return *noServ
+}
+
+func handleSimpleImportExport(importDb, exportDb *string, gm *graph.Manager) error {
+	var err error
+
 	if *importDb != "" {
 		var zipFile *zip.ReadCloser
 
@@ -346,7 +405,7 @@ func handleServerCommandLine(gm *graph.Manager) bool {
 		}
 	}
 
-	if *exportDb != "" {
+	if *exportDb != "" && err == nil {
 		var zipFile *os.File
 
 		fmt.Println("Exporting to:", *exportDb)
@@ -375,52 +434,62 @@ func handleServerCommandLine(gm *graph.Manager) bool {
 		}
 	}
 
-	if ecalConsole != nil && *ecalConsole {
-		var term termutil.ConsoleLineTerminal
+	return err
+}
 
-		isExitLine := func(s string) bool {
-			return s == "exit" || s == "q" || s == "quit" || s == "bye" || s == "\x04"
-		}
+type directoryFactory struct {
+	pathPrefix string
+}
 
-		term, err = termutil.NewConsoleLineTerminal(os.Stdout)
-		if err == nil {
-			term, err = termutil.AddHistoryMixin(term, "", isExitLine)
-			if err == nil {
-				tid := api.SI.Interpreter.RuntimeProvider.NewThreadID()
+func (tf *directoryFactory) Readers() ([]string, error) {
+	var files []string
 
-				runECALConsole := func(delay int) {
-					defer term.StopTerm()
+	fileInfos, err := ioutil.ReadDir(tf.pathPrefix)
 
-					time.Sleep(time.Duration(delay) * time.Millisecond)
+	if err == nil {
+		for _, fi := range fileInfos {
+			if !fi.IsDir() && strings.HasPrefix(fi.Name(), "db-") {
+				name := fi.Name()[3:]
+				files = append(files, strings.TrimSuffix(name, path.Ext(name)))
+			}
+		}
+	}
 
-					term.WriteString(fmt.Sprintln("Type 'q' or 'quit' to exit the shell and '?' to get help"))
+	return files, err
+}
 
-					line, err := term.NextLine()
-					for err == nil && !isExitLine(line) {
-						trimmedLine := strings.TrimSpace(line)
+func (tf *directoryFactory) CreateWriter(name string) (io.Writer, error) {
+	return os.Create(path.Join(tf.pathPrefix, "db-"+name+".ndjson"))
+}
 
-						api.SI.Interpreter.HandleInput(term, trimmedLine, tid)
+func (tf *directoryFactory) CreateReader(name string) (io.Reader, error) {
+	return os.Open(path.Join(tf.pathPrefix, "db-"+name+".ndjson"))
+}
 
-						line, err = term.NextLine()
-					}
-				}
+func handleLargeScaleImportExport(importDbLS, exportDbLS *string, gm *graph.Manager) error {
+	var err error
 
-				if err = term.StartTerm(); err == nil {
+	if *importDbLS != "" {
+		stat, err := os.Stat(*importDbLS)
 
-					if *noServ {
-						runECALConsole(0)
-					} else {
-						go runECALConsole(3000)
-					}
-				}
-			}
+		if err == nil && stat.IsDir() {
+			fmt.Println("Importing from:", *importDbLS)
+
+			fac := &directoryFactory{*importDbLS}
+
+			err = graph.LargeScaleImportPartition(fac, gm)
 		}
 	}
 
-	if err != nil {
-		fmt.Println(err.Error())
-		return true
+	if *exportDbLS != "" && err == nil {
+		fmt.Println("Exporting to:", *exportDbLS)
+
+		err = os.MkdirAll(*exportDbLS, 0777)
+
+		fac := &directoryFactory{*exportDbLS}
+
+		err = graph.LargeScaleExportPartition(fac, gm)
 	}
 
-	return *noServ
+	return err
 }

+ 73 - 0
graph/graphmanager_test.go

@@ -17,6 +17,7 @@ import (
 	"testing"
 
 	"devt.de/krotik/common/fileutil"
+	"devt.de/krotik/eliasdb/graph/data"
 	"devt.de/krotik/eliasdb/graph/graphstorage"
 )
 
@@ -75,3 +76,75 @@ NewGraphManager returns a new GraphManager instance without loading rules.
 func newGraphManagerNoRules(gs graphstorage.Storage) *Manager {
 	return createGraphManager(gs)
 }
+
+/*
+Create a GraphManager which has some prefilled data.
+*/
+func songGraph() (*Manager, *graphstorage.MemoryGraphStorage) {
+
+	mgs := graphstorage.NewMemoryGraphStorage("mystorage")
+	gm := NewGraphManager(mgs)
+
+	constructEdge := func(key string, node1 data.Node, node2 data.Node, number int) data.Edge {
+		edge := data.NewGraphEdge()
+
+		edge.SetAttr("key", key)
+		edge.SetAttr("kind", "Wrote")
+
+		edge.SetAttr(data.EdgeEnd1Key, node1.Key())
+		edge.SetAttr(data.EdgeEnd1Kind, node1.Kind())
+		edge.SetAttr(data.EdgeEnd1Role, "Author")
+		edge.SetAttr(data.EdgeEnd1Cascading, true)
+
+		edge.SetAttr(data.EdgeEnd2Key, node2.Key())
+		edge.SetAttr(data.EdgeEnd2Kind, node2.Kind())
+		edge.SetAttr(data.EdgeEnd2Role, "Song")
+		edge.SetAttr(data.EdgeEnd2Cascading, false)
+
+		edge.SetAttr("number", number)
+
+		return edge
+	}
+
+	storeSong := func(node data.Node, name string, ranking int, number int) {
+		node3 := data.NewGraphNode()
+		node3.SetAttr("key", name)
+		node3.SetAttr("kind", "Song")
+		node3.SetAttr("name", name)
+		node3.SetAttr("ranking", ranking)
+		gm.StoreNode("main", node3)
+		gm.StoreEdge("main", constructEdge(name+"-edge", node, node3, number))
+	}
+
+	node0 := data.NewGraphNode()
+	node0.SetAttr("key", "000")
+	node0.SetAttr("kind", "Author")
+	node0.SetAttr("name", "John")
+	gm.StoreNode("main", node0)
+
+	storeSong(node0, "Aria1", 8, 1)
+	storeSong(node0, "Aria2", 2, 2)
+	storeSong(node0, "Aria3", 4, 3)
+	storeSong(node0, "Aria4", 18, 4)
+
+	node1 := data.NewGraphNode()
+	node1.SetAttr("key", "123")
+	node1.SetAttr("kind", "Author")
+	node1.SetAttr("name", "Mike")
+	gm.StoreNode("main", node1)
+
+	storeSong(node1, "LoveSong3", 1, 3)
+	storeSong(node1, "FightSong4", 3, 4)
+	storeSong(node1, "DeadSong2", 6, 2)
+	storeSong(node1, "StrangeSong1", 5, 1)
+
+	node2 := data.NewGraphNode()
+	node2.SetAttr("key", "456")
+	node2.SetAttr("kind", "Author")
+	node2.SetAttr("name", "Hans")
+	gm.StoreNode("main", node2)
+
+	storeSong(node2, "MyOnlySong3", 19, 3)
+
+	return gm, mgs.(*graphstorage.MemoryGraphStorage)
+}

+ 178 - 4
graph/import_export.go

@@ -11,21 +11,24 @@
 package graph
 
 import (
+	"bufio"
 	"bytes"
 	"encoding/json"
 	"fmt"
 	"io"
+	"strings"
 
 	"devt.de/krotik/common/errorutil"
 	"devt.de/krotik/eliasdb/graph/data"
+	"devt.de/krotik/eliasdb/hash"
 )
 
 /*
 ExportPartition dumps the contents of a partition to an io.Writer in JSON format:
 
 	{
-		nodes : [ { <attr> : <value> }, ... ]
-		edges : [ { <attr> : <value> }, ... ]
+		nodes : [ { <attr> : <value>, ... }, ... ]
+		edges : [ { <attr> : <value>, ... }, ... ]
 	}
 */
 func ExportPartition(out io.Writer, part string, gm *Manager) error {
@@ -207,8 +210,8 @@ ImportPartition imports the JSON contents of an io.Reader into a given partition
 The following format is expected:
 
 	{
-		nodes : [ { <attr> : <value> }, ... ]
-		edges : [ { <attr> : <value> }, ... ]
+		nodes : [ { <attr> : <value>, ... }, ... ]
+		edges : [ { <attr> : <value>, ... }, ... ]
 	}
 */
 func ImportPartition(in io.Reader, part string, gm *Manager) error {
@@ -251,3 +254,174 @@ func ImportPartition(in io.Reader, part string, gm *Manager) error {
 
 	return trans.Commit()
 }
+
+/*
+ExportWriterFactory produces a named writer.
+*/
+type ExportWriterFactory interface {
+	CreateWriter(name string) (io.Writer, error)
+}
+
+/*
+LargeScaleExportPartition dumps the contents of a partition into multiple io.Writer in line-delimited JSON format:
+
+{ <attr> : <value>, ... },
+{ <attr> : <value>, ... },
+...
+*/
+func LargeScaleExportPartition(ewf ExportWriterFactory, gm *Manager) error {
+	var out io.Writer
+	var err error
+
+	for _, part := range gm.Partitions() {
+
+		if out, err = ewf.CreateWriter(fmt.Sprintf("%v-nodes", part)); err == nil {
+			for _, k := range gm.NodeKinds() {
+				var it *NodeKeyIterator
+
+				if it, err = gm.NodeKeyIterator(part, k); err == nil {
+
+					for it.HasNext() {
+						key := it.Next()
+
+						if err = it.Error(); err == nil {
+							var node data.Node
+
+							if node, err = gm.FetchNode(part, key, k); err == nil {
+								var jsonBytes []byte
+								if jsonBytes, err = json.Marshal(node.Data()); err == nil {
+									_, err = out.Write(jsonBytes)
+									fmt.Fprintln(out)
+								}
+							}
+						}
+
+						if err != nil {
+							break
+						}
+					}
+				}
+
+				if err != nil {
+					break
+				}
+			}
+		}
+
+		if err == nil {
+			if out, err = ewf.CreateWriter(fmt.Sprintf("%v-edges", part)); err == nil {
+				for _, k := range gm.EdgeKinds() {
+					var tree *hash.HTree
+
+					if tree, err = gm.getEdgeStorageHTree(part, k, false); err == nil {
+						gm.mutex.RLock()
+						it := hash.NewHTreeIterator(tree)
+						gm.mutex.RUnlock()
+
+						if err = it.LastError; err == nil {
+
+							for it.HasNext() {
+								gm.mutex.RLock()
+								binaryKey, _ := it.Next()
+								gm.mutex.RUnlock()
+
+								if prefix := binaryKey[:len(PrefixNSAttrs)]; string(prefix) != PrefixNSAttrs {
+									continue
+								}
+
+								key := string(binaryKey[len(PrefixNSAttrs):])
+
+								if err = it.LastError; err == nil {
+									var node data.Node
+
+									if node, err = gm.FetchEdge(part, key, k); err == nil {
+										var jsonBytes []byte
+
+										if jsonBytes, err = json.Marshal(node.Data()); err == nil {
+											if _, err = out.Write(jsonBytes); err == nil {
+												_, err = fmt.Fprintln(out)
+											}
+										}
+									}
+								}
+
+								if err != nil {
+									break
+								}
+							}
+						}
+					}
+
+					if err != nil {
+						break
+					}
+
+				}
+			}
+		}
+
+		if err != nil {
+			break
+		}
+	}
+
+	return err
+}
+
+/*
+ImportReaderFactory produces a named reader.
+*/
+type ImportReaderFactory interface {
+	Readers() ([]string, error)
+	CreateReader(name string) (io.Reader, error)
+}
+
+/*
+LargeScaleImportPartition dumps the contents of a partition into multiple io.Writer in line-delimited JSON format:
+
+{ <attr> : <value>, ... },
+{ <attr> : <value>, ... },
+...
+*/
+func LargeScaleImportPartition(irf ImportReaderFactory, gm *Manager) error {
+	readers, err := irf.Readers()
+
+	if err == nil {
+		trans := NewRollingTrans(NewGraphTrans(gm), 1000, gm, NewGraphTrans)
+
+		for _, r := range readers {
+			var in io.Reader
+
+			if in, err = irf.CreateReader(r); err == nil {
+				isNode := strings.HasSuffix(r, "-nodes")
+
+				scanner := bufio.NewScanner(in)
+				for scanner.Scan() {
+					var nodeData map[string]interface{}
+
+					if err = json.Unmarshal(scanner.Bytes(), &nodeData); err == nil {
+						if isNode {
+							err = trans.StoreNode(strings.TrimSuffix(r, "-nodes"), data.NewGraphNodeFromMap(nodeData))
+						} else {
+							err = trans.StoreEdge(strings.TrimSuffix(r, "-edges"), data.NewGraphEdgeFromNode(data.NewGraphNodeFromMap(nodeData)))
+						}
+					}
+
+					if err != nil {
+						break
+					}
+				}
+			}
+
+			if err != nil {
+				break
+			}
+		}
+
+		if err == nil {
+			err = trans.Commit()
+		}
+	}
+
+	return err
+}

+ 102 - 0
graph/import_export_test.go

@@ -12,6 +12,7 @@ package graph
 
 import (
 	"bytes"
+	"io"
 	"strings"
 	"testing"
 
@@ -398,3 +399,104 @@ func TestImportExportError(t *testing.T) {
 	}
 
 }
+
+type testFactory struct {
+	readers []string
+	buf     map[string]*bytes.Buffer
+}
+
+func (tf *testFactory) CreateWriter(name string) (io.Writer, error) {
+	var b bytes.Buffer
+
+	tf.readers = append(tf.readers, name)
+	tf.buf[name] = &b
+
+	return &b, nil
+}
+
+func (tf *testFactory) Readers() ([]string, error) {
+	return tf.readers, nil
+}
+
+func (tf *testFactory) CreateReader(name string) (io.Reader, error) {
+	return bytes.NewBuffer(tf.buf[name].Bytes()), nil
+}
+
+func TestScaleExport(t *testing.T) {
+	gm, _ := songGraph()
+
+	tf := &testFactory{make([]string, 0), make(map[string]*bytes.Buffer)}
+
+	if err := LargeScaleExportPartition(tf, gm); err != nil {
+		t.Error(err)
+		return
+	}
+
+	var out1 bytes.Buffer
+
+	if err := ExportPartition(&out1, "main", gm); err != nil {
+		t.Error(err)
+		return
+	}
+	res1 := SortDump(out1.String())
+
+	mgs2 := graphstorage.NewMemoryGraphStorage("mystorage2")
+	gm2 := NewGraphManager(mgs2)
+
+	if err := LargeScaleImportPartition(tf, gm2); err != nil {
+		t.Error(err)
+		return
+	}
+
+	var out2 bytes.Buffer
+
+	if err := ExportPartition(&out2, "main", gm2); err != nil {
+		t.Error(err)
+		return
+	}
+	res2 := SortDump(out2.String())
+
+	if res1 != res2 {
+		t.Error("Unexpected result - results of import/export are different")
+		return
+	}
+
+	// Test failures
+	gm, gs := songGraph()
+
+	tfImport := tf
+	tf = &testFactory{make([]string, 0), make(map[string]*bytes.Buffer)}
+
+	msm := gs.StorageManager("main"+"Song"+StorageSuffixNodes, false).(*storage.MemoryStorageManager)
+	msm.AccessMap[5] = storage.AccessCacheAndFetchSeriousError
+
+	if err := LargeScaleExportPartition(tf, gm); err == nil {
+		t.Error("Error was expected")
+		return
+	}
+
+	delete(msm.AccessMap, 5)
+
+	msm = gs.StorageManager("main"+"Wrote"+StorageSuffixEdges, false).(*storage.MemoryStorageManager)
+	msm.AccessMap[5] = storage.AccessCacheAndFetchSeriousError
+
+	if err := LargeScaleExportPartition(tf, gm); err == nil {
+		t.Error("Error was expected")
+		return
+	}
+
+	delete(msm.AccessMap, 5)
+
+	mgs2 = graphstorage.NewMemoryGraphStorage("mystorage2")
+	gm2 = NewGraphManager(mgs2)
+
+	msm = mgs2.StorageManager("main"+"Song"+StorageSuffixNodes, true).(*storage.MemoryStorageManager)
+	msm.AccessMap[2] = storage.AccessInsertError
+
+	if err := LargeScaleImportPartition(tfImport, gm2); err == nil {
+		t.Error("Error was expected")
+		return
+	}
+
+	delete(msm.AccessMap, 2)
+}

+ 47 - 0
importexport.md

@@ -0,0 +1,47 @@
+EliasDB Import/Export
+=====================
+
+EliasDB supports importing and exporting of data in various ways:
+- By [embedding](embedding.md) EliasDB in another Go project.
+- By using the [REST API](http://petstore.swagger.io/?url=https://devt.de/krotik/eliasdb/raw/master/swagger.json) interface.
+- By running an [ECAL](http://petstore.swagger.io/?url=https://devt.de/krotik/eliasdb/raw/master/swagger.json) script.
+- By running the `EliasDB` executable with import/export parameters in the CLI.
+
+Bulk importing and exporting is best done through the last option.
+
+Bulk importing and exporting via the CLI
+--
+Bulk import/export through the CLI is available using the `eliasdb` binary with the `server` command. In general there are two different types of import/export modes:
+- Normal import/export through a single compact ZIP file.
+- Large scale import/export though multiple ZIP files.
+
+Parameter|Description
+-|-
+-export|Export the current DB into a ZIP file. The data of each partition is stored into a separate file as a JSON object.
+-import|Import into the current DB from a ZIP file. The data is expected in the same format as in the `-export` case.
+-export-ls|Export the current DB into multiple ZIP file. The data of each partition is stored into two separate files for nodes and edges in a line-delimited JSON format.
+-import-ls|Import into the current DB from a ZIP file. The data is expected in the same format as in the `-export-ls` case.
+
+By default the server will start after the import/export operation. This can be disabled by using the `-no-serv` parameter.
+
+Format for normal import/export
+--
+The normal import/export will work on a single ZIP file which contains a series of `.json` files.
+
+```
+eliasdb server -export mydb.zip -no-serv
+eliasdb server -import mydb.zip -no-serv
+```
+
+The name of each file will become a separate partition. Each of these `.json` files contains a single JSON object with the following structure:
+```
+{
+  nodes : [ { <attr> : <value>, ... }, ... ]
+  edges : [ { <attr> : <value>, ... }, ... ]
+}
+```
+When embedding EliasDB in another Go project this can be produced and consumed via `graph.ExportPartition` and `graph.ImportPartition`.
+
+Format for large scale import/export
+--
+The large scale