2 Commits 3a2e92a4eb ... d93b21702f

Author SHA1 Message Date
  Matthias Ladkau d93b21702f feat: Large scale import/export functionality 2 years ago
  Matthias Ladkau 3a2e92a4eb feat: Large scale import/export functionality 2 years ago
4 changed files with 259 additions and 126 deletions
  1. 103 34
      cli/eliasdb.go
  2. 87 79
      graph/import_export.go
  3. 61 12
      graph/import_export_test.go
  4. 8 1
      importexport.md

+ 103 - 34
cli/eliasdb.go

@@ -45,6 +45,7 @@ import (
 	"io"
 	"io/ioutil"
 	"os"
+	"path"
 	"path/filepath"
 	"strings"
 	"time"
@@ -294,7 +295,9 @@ func handleServerCommandLine(gm *graph.Manager) bool {
 	var ecalConsole *bool
 
 	importDb := flag.String("import", "", "Import a database from a zip file")
+	importDbLS := flag.String("import-ls", "", "Large scale import from a directory")
 	exportDb := flag.String("export", "", "Export the current database to a zip file")
+	exportDbLS := flag.String("export-ls", "", "Large scale export to a directory")
 
 	if config.Bool(config.EnableECALScripts) {
 		ecalConsole = flag.Bool("ecal-console", false, "Start an interactive interpreter console for ECAL")
@@ -319,6 +322,62 @@ func handleServerCommandLine(gm *graph.Manager) bool {
 		return true
 	}
 
+	err = handleSimpleImportExport(importDb, exportDb, gm)
+
+	err = handleLargeScaleImportExport(importDbLS, exportDbLS, gm)
+
+	if ecalConsole != nil && *ecalConsole && err == nil {
+		var term termutil.ConsoleLineTerminal
+
+		isExitLine := func(s string) bool {
+			return s == "exit" || s == "q" || s == "quit" || s == "bye" || s == "\x04"
+		}
+
+		term, err = termutil.NewConsoleLineTerminal(os.Stdout)
+		if err == nil {
+			term, err = termutil.AddHistoryMixin(term, "", isExitLine)
+			if err == nil {
+				tid := api.SI.Interpreter.RuntimeProvider.NewThreadID()
+
+				runECALConsole := func(delay int) {
+					defer term.StopTerm()
+
+					time.Sleep(time.Duration(delay) * time.Millisecond)
+
+					term.WriteString(fmt.Sprintln("Type 'q' or 'quit' to exit the shell and '?' to get help"))
+
+					line, err := term.NextLine()
+					for err == nil && !isExitLine(line) {
+						trimmedLine := strings.TrimSpace(line)
+
+						api.SI.Interpreter.HandleInput(term, trimmedLine, tid)
+
+						line, err = term.NextLine()
+					}
+				}
+
+				if err = term.StartTerm(); err == nil {
+					if *noServ {
+						runECALConsole(0)
+					} else {
+						go runECALConsole(3000)
+					}
+				}
+			}
+		}
+	}
+
+	if err != nil {
+		fmt.Println(err.Error())
+		return true
+	}
+
+	return *noServ
+}
+
+func handleSimpleImportExport(importDb, exportDb *string, gm *graph.Manager) error {
+	var err error
+
 	if *importDb != "" {
 		var zipFile *zip.ReadCloser
 
@@ -346,7 +405,7 @@ func handleServerCommandLine(gm *graph.Manager) bool {
 		}
 	}
 
-	if *exportDb != "" {
+	if *exportDb != "" && err == nil {
 		var zipFile *os.File
 
 		fmt.Println("Exporting to:", *exportDb)
@@ -375,52 +434,62 @@ func handleServerCommandLine(gm *graph.Manager) bool {
 		}
 	}
 
-	if ecalConsole != nil && *ecalConsole {
-		var term termutil.ConsoleLineTerminal
+	return err
+}
 
-		isExitLine := func(s string) bool {
-			return s == "exit" || s == "q" || s == "quit" || s == "bye" || s == "\x04"
-		}
+type directoryFactory struct {
+	pathPrefix string
+}
 
-		term, err = termutil.NewConsoleLineTerminal(os.Stdout)
-		if err == nil {
-			term, err = termutil.AddHistoryMixin(term, "", isExitLine)
-			if err == nil {
-				tid := api.SI.Interpreter.RuntimeProvider.NewThreadID()
+func (tf *directoryFactory) Readers() ([]string, error) {
+	var files []string
 
-				runECALConsole := func(delay int) {
-					defer term.StopTerm()
+	fileInfos, err := ioutil.ReadDir(tf.pathPrefix)
 
-					time.Sleep(time.Duration(delay) * time.Millisecond)
+	if err == nil {
+		for _, fi := range fileInfos {
+			if !fi.IsDir() && strings.HasPrefix(fi.Name(), "db-") {
+				name := fi.Name()[3:]
+				files = append(files, strings.TrimSuffix(name, path.Ext(name)))
+			}
+		}
+	}
 
-					term.WriteString(fmt.Sprintln("Type 'q' or 'quit' to exit the shell and '?' to get help"))
+	return files, err
+}
 
-					line, err := term.NextLine()
-					for err == nil && !isExitLine(line) {
-						trimmedLine := strings.TrimSpace(line)
+func (tf *directoryFactory) CreateWriter(name string) (io.Writer, error) {
+	return os.Create(path.Join(tf.pathPrefix, "db-"+name+".ndjson"))
+}
 
-						api.SI.Interpreter.HandleInput(term, trimmedLine, tid)
+func (tf *directoryFactory) CreateReader(name string) (io.Reader, error) {
+	return os.Open(path.Join(tf.pathPrefix, "db-"+name+".ndjson"))
+}
 
-						line, err = term.NextLine()
-					}
-				}
+func handleLargeScaleImportExport(importDbLS, exportDbLS *string, gm *graph.Manager) error {
+	var err error
 
-				if err = term.StartTerm(); err == nil {
+	if *importDbLS != "" {
+		stat, err := os.Stat(*importDbLS)
 
-					if *noServ {
-						runECALConsole(0)
-					} else {
-						go runECALConsole(3000)
-					}
-				}
-			}
+		if err == nil && stat.IsDir() {
+			fmt.Println("Importing from:", *importDbLS)
+
+			fac := &directoryFactory{*importDbLS}
+
+			err = graph.LargeScaleImportPartition(fac, gm)
 		}
 	}
 
-	if err != nil {
-		fmt.Println(err.Error())
-		return true
+	if *exportDbLS != "" && err == nil {
+		fmt.Println("Exporting to:", *exportDbLS)
+
+		err = os.MkdirAll(*exportDbLS, 0777)
+
+		fac := &directoryFactory{*exportDbLS}
+
+		err = graph.LargeScaleExportPartition(fac, gm)
 	}
 
-	return *noServ
+	return err
 }

+ 87 - 79
graph/import_export.go

@@ -269,91 +269,99 @@ LargeScaleExportPartition dumps the contents of a partition into multiple io.Wri
 { <attr> : <value>, ... },
 ...
 */
-func LargeScaleExportPartition(ewf ExportWriterFactory, part string, gm *Manager) error {
-	out, err := ewf.CreateWriter(fmt.Sprintf("%v-nodes", part))
+func LargeScaleExportPartition(ewf ExportWriterFactory, gm *Manager) error {
+	var out io.Writer
+	var err error
 
-	if err == nil {
-		for _, k := range gm.NodeKinds() {
-			var it *NodeKeyIterator
+	for _, part := range gm.Partitions() {
 
-			if it, err = gm.NodeKeyIterator(part, k); err == nil {
+		if out, err = ewf.CreateWriter(fmt.Sprintf("%v-nodes", part)); err == nil {
+			for _, k := range gm.NodeKinds() {
+				var it *NodeKeyIterator
 
-				for it.HasNext() {
-					key := it.Next()
+				if it, err = gm.NodeKeyIterator(part, k); err == nil {
 
-					if err = it.Error(); err == nil {
-						var node data.Node
+					for it.HasNext() {
+						key := it.Next()
 
-						if node, err = gm.FetchNode(part, key, k); err == nil {
-							var jsonBytes []byte
+						if err = it.Error(); err == nil {
+							var node data.Node
 
-							if jsonBytes, err = json.Marshal(node.Data()); err == nil {
-								_, err = out.Write(jsonBytes)
-								fmt.Fprintln(out)
+							if node, err = gm.FetchNode(part, key, k); err == nil {
+								var jsonBytes []byte
+								if jsonBytes, err = json.Marshal(node.Data()); err == nil {
+									_, err = out.Write(jsonBytes)
+									fmt.Fprintln(out)
+								}
 							}
 						}
-					}
 
-					if err != nil {
-						break
+						if err != nil {
+							break
+						}
 					}
 				}
-			}
 
-			if err != nil {
-				break
+				if err != nil {
+					break
+				}
 			}
 		}
-	}
 
-	out, err = ewf.CreateWriter(fmt.Sprintf("%v-edges", part))
+		if err == nil {
+			if out, err = ewf.CreateWriter(fmt.Sprintf("%v-edges", part)); err == nil {
+				for _, k := range gm.EdgeKinds() {
+					var tree *hash.HTree
 
-	if err == nil {
-		for _, k := range gm.EdgeKinds() {
-			var tree *hash.HTree
+					if tree, err = gm.getEdgeStorageHTree(part, k, false); err == nil {
+						gm.mutex.RLock()
+						it := hash.NewHTreeIterator(tree)
+						gm.mutex.RUnlock()
 
-			if tree, err = gm.getEdgeStorageHTree(part, k, false); err == nil {
-				gm.mutex.RLock()
-				it := hash.NewHTreeIterator(tree)
-				gm.mutex.RUnlock()
+						if err = it.LastError; err == nil {
 
-				if err = it.LastError; err == nil {
+							for it.HasNext() {
+								gm.mutex.RLock()
+								binaryKey, _ := it.Next()
+								gm.mutex.RUnlock()
 
-					for it.HasNext() {
-						gm.mutex.RLock()
-						binaryKey, _ := it.Next()
-						gm.mutex.RUnlock()
+								if prefix := binaryKey[:len(PrefixNSAttrs)]; string(prefix) != PrefixNSAttrs {
+									continue
+								}
 
-						if prefix := binaryKey[:len(PrefixNSAttrs)]; string(prefix) != PrefixNSAttrs {
-							continue
-						}
+								key := string(binaryKey[len(PrefixNSAttrs):])
 
-						key := string(binaryKey[len(PrefixNSAttrs):])
+								if err = it.LastError; err == nil {
+									var node data.Node
 
-						if err = it.LastError; err == nil {
-							var node data.Node
+									if node, err = gm.FetchEdge(part, key, k); err == nil {
+										var jsonBytes []byte
 
-							if node, err = gm.FetchEdge(part, key, k); err == nil {
-								var jsonBytes []byte
+										if jsonBytes, err = json.Marshal(node.Data()); err == nil {
+											if _, err = out.Write(jsonBytes); err == nil {
+												_, err = fmt.Fprintln(out)
+											}
+										}
+									}
+								}
 
-								if jsonBytes, err = json.Marshal(node.Data()); err == nil {
-									_, err = out.Write(jsonBytes)
-									fmt.Fprintln(out)
+								if err != nil {
+									break
 								}
 							}
 						}
+					}
 
-						if err != nil {
-							break
-						}
+					if err != nil {
+						break
 					}
-				}
-			}
 
-			if err != nil {
-				break
+				}
 			}
+		}
 
+		if err != nil {
+			break
 		}
 	}
 
@@ -364,7 +372,7 @@ func LargeScaleExportPartition(ewf ExportWriterFactory, part string, gm *Manager
 ImportReaderFactory produces a named reader.
 */
 type ImportReaderFactory interface {
-	Readers() []string
+	Readers() ([]string, error)
 	CreateReader(name string) (io.Reader, error)
 }
 
@@ -375,45 +383,45 @@ LargeScaleImportPartition dumps the contents of a partition into multiple io.Wri
 { <attr> : <value>, ... },
 ...
 */
-func LargeScaleImportPartition(irf ImportReaderFactory, part string, gm *Manager) error {
-	var err error
+func LargeScaleImportPartition(irf ImportReaderFactory, gm *Manager) error {
+	readers, err := irf.Readers()
 
-	readers := irf.Readers()
+	if err == nil {
+		trans := NewRollingTrans(NewGraphTrans(gm), 1000, gm, NewGraphTrans)
 
-	trans := NewRollingTrans(NewGraphTrans(gm), 1000, gm, NewGraphTrans)
+		for _, r := range readers {
+			var in io.Reader
 
-	for _, r := range readers {
-		var in io.Reader
+			if in, err = irf.CreateReader(r); err == nil {
+				isNode := strings.HasSuffix(r, "-nodes")
 
-		if in, err = irf.CreateReader(r); err == nil {
-			isNode := strings.HasSuffix(r, "-nodes")
+				scanner := bufio.NewScanner(in)
+				for scanner.Scan() {
+					var nodeData map[string]interface{}
 
-			scanner := bufio.NewScanner(in)
-			for scanner.Scan() {
-				var nodeData map[string]interface{}
+					if err = json.Unmarshal(scanner.Bytes(), &nodeData); err == nil {
+						if isNode {
+							err = trans.StoreNode(strings.TrimSuffix(r, "-nodes"), data.NewGraphNodeFromMap(nodeData))
+						} else {
+							err = trans.StoreEdge(strings.TrimSuffix(r, "-edges"), data.NewGraphEdgeFromNode(data.NewGraphNodeFromMap(nodeData)))
+						}
+					}
 
-				if err = json.Unmarshal(scanner.Bytes(), &nodeData); err == nil {
-					if isNode {
-						err = gm.StoreNode(part, data.NewGraphNodeFromMap(nodeData))
-					} else {
-						err = gm.StoreEdge(part, data.NewGraphEdgeFromNode(data.NewGraphNodeFromMap(nodeData)))
+					if err != nil {
+						break
 					}
 				}
+			}
 
-				if err != nil {
-					break
-				}
+			if err != nil {
+				break
 			}
 		}
 
-		if err != nil {
-			break
+		if err == nil {
+			err = trans.Commit()
 		}
 	}
 
-	if err == nil {
-		err = trans.Commit()
-	}
-
 	return err
 }

+ 61 - 12
graph/import_export_test.go

@@ -12,7 +12,6 @@ package graph
 
 import (
 	"bytes"
-	"fmt"
 	"io"
 	"strings"
 	"testing"
@@ -415,12 +414,12 @@ func (tf *testFactory) CreateWriter(name string) (io.Writer, error) {
 	return &b, nil
 }
 
-func (tf *testFactory) Readers() []string {
-	return tf.readers
+func (tf *testFactory) Readers() ([]string, error) {
+	return tf.readers, nil
 }
 
 func (tf *testFactory) CreateReader(name string) (io.Reader, error) {
-	return tf.buf[name], nil
+	return bytes.NewBuffer(tf.buf[name].Bytes()), nil
 }
 
 func TestScaleExport(t *testing.T) {
@@ -428,26 +427,76 @@ func TestScaleExport(t *testing.T) {
 
 	tf := &testFactory{make([]string, 0), make(map[string]*bytes.Buffer)}
 
-	LargeScaleExportPartition(tf, "main", gm)
+	if err := LargeScaleExportPartition(tf, gm); err != nil {
+		t.Error(err)
+		return
+	}
 
 	var out1 bytes.Buffer
 
-	ExportPartition(&out1, "main", gm)
+	if err := ExportPartition(&out1, "main", gm); err != nil {
+		t.Error(err)
+		return
+	}
 	res1 := SortDump(out1.String())
 
-	fmt.Println(res1)
-
 	mgs2 := graphstorage.NewMemoryGraphStorage("mystorage2")
 	gm2 := NewGraphManager(mgs2)
 
-	LargeScaleImportPartition(tf, "main", gm2)
+	if err := LargeScaleImportPartition(tf, gm2); err != nil {
+		t.Error(err)
+		return
+	}
 
 	var out2 bytes.Buffer
 
-	ExportPartition(&out2, "main", gm2)
+	if err := ExportPartition(&out2, "main", gm2); err != nil {
+		t.Error(err)
+		return
+	}
 	res2 := SortDump(out2.String())
 
-	fmt.Println(res2)
+	if res1 != res2 {
+		t.Error("Unexpected result - results of import/export are different")
+		return
+	}
+
+	// Test failures
+	gm, gs := songGraph()
+
+	tfImport := tf
+	tf = &testFactory{make([]string, 0), make(map[string]*bytes.Buffer)}
+
+	msm := gs.StorageManager("main"+"Song"+StorageSuffixNodes, false).(*storage.MemoryStorageManager)
+	msm.AccessMap[5] = storage.AccessCacheAndFetchSeriousError
+
+	if err := LargeScaleExportPartition(tf, gm); err == nil {
+		t.Error("Error was expected")
+		return
+	}
+
+	delete(msm.AccessMap, 5)
+
+	msm = gs.StorageManager("main"+"Wrote"+StorageSuffixEdges, false).(*storage.MemoryStorageManager)
+	msm.AccessMap[5] = storage.AccessCacheAndFetchSeriousError
+
+	if err := LargeScaleExportPartition(tf, gm); err == nil {
+		t.Error("Error was expected")
+		return
+	}
+
+	delete(msm.AccessMap, 5)
+
+	mgs2 = graphstorage.NewMemoryGraphStorage("mystorage2")
+	gm2 = NewGraphManager(mgs2)
+
+	msm = mgs2.StorageManager("main"+"Song"+StorageSuffixNodes, true).(*storage.MemoryStorageManager)
+	msm.AccessMap[2] = storage.AccessInsertError
+
+	if err := LargeScaleImportPartition(tfImport, gm2); err == nil {
+		t.Error("Error was expected")
+		return
+	}
 
-	fmt.Println("-->", res1 == res2)
+	delete(msm.AccessMap, 2)
 }

+ 8 - 1
importexport.md

@@ -26,7 +26,14 @@ By default the server will start after the import/export operation. This can be
 
 Format for normal import/export
 --
-The normal import/export will work on a single ZIP file which container a series of `.json` files. The name of each file will become a separate partition. Each of these `.json` files contains a single JSON object with the following structure:
+The normal import/export will work on a single ZIP file which contains a series of `.json` files.
+
+```
+eliasdb server -export mydb.zip -no-serv
+eliasdb server -import mydb.zip -no-serv
+```
+
+The name of each file will become a separate partition. Each of these `.json` files contains a single JSON object with the following structure:
 ```
 {
   nodes : [ { <attr> : <value>, ... }, ... ]