|
@@ -11,21 +11,24 @@
|
|
|
package graph
|
|
|
|
|
|
import (
|
|
|
+ "bufio"
|
|
|
"bytes"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
+ "strings"
|
|
|
|
|
|
"devt.de/krotik/common/errorutil"
|
|
|
"devt.de/krotik/eliasdb/graph/data"
|
|
|
+ "devt.de/krotik/eliasdb/hash"
|
|
|
)
|
|
|
|
|
|
/*
|
|
|
ExportPartition dumps the contents of a partition to an io.Writer in JSON format:
|
|
|
|
|
|
{
|
|
|
- nodes : [ { <attr> : <value> }, ... ]
|
|
|
- edges : [ { <attr> : <value> }, ... ]
|
|
|
+ nodes : [ { <attr> : <value>, ... }, ... ]
|
|
|
+ edges : [ { <attr> : <value>, ... }, ... ]
|
|
|
}
|
|
|
*/
|
|
|
func ExportPartition(out io.Writer, part string, gm *Manager) error {
|
|
@@ -207,8 +210,8 @@ ImportPartition imports the JSON contents of an io.Reader into a given partition
|
|
|
The following format is expected:
|
|
|
|
|
|
{
|
|
|
- nodes : [ { <attr> : <value> }, ... ]
|
|
|
- edges : [ { <attr> : <value> }, ... ]
|
|
|
+ nodes : [ { <attr> : <value>, ... }, ... ]
|
|
|
+ edges : [ { <attr> : <value>, ... }, ... ]
|
|
|
}
|
|
|
*/
|
|
|
func ImportPartition(in io.Reader, part string, gm *Manager) error {
|
|
@@ -251,3 +254,166 @@ func ImportPartition(in io.Reader, part string, gm *Manager) error {
|
|
|
|
|
|
return trans.Commit()
|
|
|
}
|
|
|
+
|
|
|
+/*
|
|
|
+ExportWriterFactory produces a named writer.
|
|
|
+*/
|
|
|
+type ExportWriterFactory interface {
|
|
|
+ CreateWriter(name string) (io.Writer, error)
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+LargeScaleExportPartition dumps the contents of a partition into multiple io.Writer in line-delimited JSON format:
|
|
|
+
|
|
|
+{ <attr> : <value>, ... },
|
|
|
+{ <attr> : <value>, ... },
|
|
|
+...
|
|
|
+*/
|
|
|
+func LargeScaleExportPartition(ewf ExportWriterFactory, part string, gm *Manager) error {
|
|
|
+ out, err := ewf.CreateWriter(fmt.Sprintf("%v-nodes", part))
|
|
|
+
|
|
|
+ if err == nil {
|
|
|
+ for _, k := range gm.NodeKinds() {
|
|
|
+ var it *NodeKeyIterator
|
|
|
+
|
|
|
+ if it, err = gm.NodeKeyIterator(part, k); err == nil {
|
|
|
+
|
|
|
+ for it.HasNext() {
|
|
|
+ key := it.Next()
|
|
|
+
|
|
|
+ if err = it.Error(); err == nil {
|
|
|
+ var node data.Node
|
|
|
+
|
|
|
+ if node, err = gm.FetchNode(part, key, k); err == nil {
|
|
|
+ var jsonBytes []byte
|
|
|
+
|
|
|
+ if jsonBytes, err = json.Marshal(node.Data()); err == nil {
|
|
|
+ _, err = out.Write(jsonBytes)
|
|
|
+ fmt.Fprintln(out)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ out, err = ewf.CreateWriter(fmt.Sprintf("%v-edges", part))
|
|
|
+
|
|
|
+ if err == nil {
|
|
|
+ for _, k := range gm.EdgeKinds() {
|
|
|
+ var tree *hash.HTree
|
|
|
+
|
|
|
+ if tree, err = gm.getEdgeStorageHTree(part, k, false); err == nil {
|
|
|
+ gm.mutex.RLock()
|
|
|
+ it := hash.NewHTreeIterator(tree)
|
|
|
+ gm.mutex.RUnlock()
|
|
|
+
|
|
|
+ if err = it.LastError; err == nil {
|
|
|
+
|
|
|
+ for it.HasNext() {
|
|
|
+ gm.mutex.RLock()
|
|
|
+ binaryKey, _ := it.Next()
|
|
|
+ gm.mutex.RUnlock()
|
|
|
+
|
|
|
+ if prefix := binaryKey[:len(PrefixNSAttrs)]; string(prefix) != PrefixNSAttrs {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ key := string(binaryKey[len(PrefixNSAttrs):])
|
|
|
+
|
|
|
+ if err = it.LastError; err == nil {
|
|
|
+ var node data.Node
|
|
|
+
|
|
|
+ if node, err = gm.FetchEdge(part, key, k); err == nil {
|
|
|
+ var jsonBytes []byte
|
|
|
+
|
|
|
+ if jsonBytes, err = json.Marshal(node.Data()); err == nil {
|
|
|
+ _, err = out.Write(jsonBytes)
|
|
|
+ fmt.Fprintln(out)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ImportReaderFactory produces a named reader.
|
|
|
+*/
|
|
|
+type ImportReaderFactory interface {
|
|
|
+ Readers() []string
|
|
|
+ CreateReader(name string) (io.Reader, error)
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+LargeScaleImportPartition dumps the contents of a partition into multiple io.Writer in line-delimited JSON format:
|
|
|
+
|
|
|
+{ <attr> : <value>, ... },
|
|
|
+{ <attr> : <value>, ... },
|
|
|
+...
|
|
|
+*/
|
|
|
+func LargeScaleImportPartition(irf ImportReaderFactory, part string, gm *Manager) error {
|
|
|
+ var err error
|
|
|
+
|
|
|
+ readers := irf.Readers()
|
|
|
+
|
|
|
+ trans := NewRollingTrans(NewGraphTrans(gm), 1000, gm, NewGraphTrans)
|
|
|
+
|
|
|
+ for _, r := range readers {
|
|
|
+ var in io.Reader
|
|
|
+
|
|
|
+ if in, err = irf.CreateReader(r); err == nil {
|
|
|
+ isNode := strings.HasSuffix(r, "-nodes")
|
|
|
+
|
|
|
+ scanner := bufio.NewScanner(in)
|
|
|
+ for scanner.Scan() {
|
|
|
+ var nodeData map[string]interface{}
|
|
|
+
|
|
|
+ if err = json.Unmarshal(scanner.Bytes(), &nodeData); err == nil {
|
|
|
+ if isNode {
|
|
|
+ err = gm.StoreNode(part, data.NewGraphNodeFromMap(nodeData))
|
|
|
+ } else {
|
|
|
+ err = gm.StoreEdge(part, data.NewGraphEdgeFromNode(data.NewGraphNodeFromMap(nodeData)))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if err == nil {
|
|
|
+ err = trans.Commit()
|
|
|
+ }
|
|
|
+
|
|
|
+ return err
|
|
|
+}
|