123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package graph
- import (
- "bufio"
- "bytes"
- "encoding/json"
- "fmt"
- "io"
- "strings"
- "devt.de/krotik/common/errorutil"
- "devt.de/krotik/eliasdb/graph/data"
- "devt.de/krotik/eliasdb/hash"
- )
- /*
- ExportPartition dumps the contents of a partition to an io.Writer in JSON format:
- {
- nodes : [ { <attr> : <value>, ... }, ... ]
- edges : [ { <attr> : <value>, ... }, ... ]
- }
- */
- func ExportPartition(out io.Writer, part string, gm *Manager) error {
- // Use a map to unique found edge keys
- edgeKeys := make(map[string]string)
- writeData := func(data map[string]interface{}) {
- nk := 0
- for k, v := range data {
- // JSON encode value - ignore values which cannot be JSON encoded
- jv, err := json.Marshal(v)
- // Encoding errors result in a null value
- if err != nil {
- jv = []byte("null")
- }
- // Write out the node attributes
- fmt.Fprintf(out, " \"%s\" : %s", k, jv)
- if nk < len(data)-1 {
- fmt.Fprint(out, ",")
- }
- fmt.Fprint(out, "\n")
- nk++
- }
- }
- // Iterate over all available node kinds
- fmt.Fprint(out, `{
- "nodes" : [
- `)
- // Loop over all available kinds and build iterators if nodes
- // exist in the given partition
- var iters []*NodeKeyIterator
- var kinds []string
- for _, k := range gm.NodeKinds() {
- it, err := gm.NodeKeyIterator(part, k)
- if err != nil {
- return err
- }
- if it != nil {
- iters = append(iters, it)
- kinds = append(kinds, k)
- }
- }
- for ik, it := range iters {
- // Iterate over all node keys
- for i := 0; it.HasNext(); i++ {
- key := it.Next()
- if it.LastError != nil {
- return it.LastError
- }
- node, err := gm.FetchNode(part, key, kinds[ik])
- if err != nil {
- return err
- }
- // Fetch all connected relationships and store their key and kind
- _, edges, err := gm.TraverseMulti(part, key, kinds[ik], ":::", false)
- if err != nil {
- return err
- }
- for _, edge := range edges {
- edgeKeys[edge.Kind()+edge.Key()] = edge.Kind()
- }
- // Write out JSON object
- fmt.Fprint(out, " {\n")
- writeData(node.Data())
- if it.HasNext() || ik < len(iters)-1 {
- fmt.Fprint(out, " },\n")
- } else {
- fmt.Fprint(out, " }\n")
- }
- }
- }
- fmt.Fprint(out, ` ],
- "edges" : [
- `)
- // Iterate over all available edge kinds
- ie := 0
- for key, kind := range edgeKeys {
- key = key[len(kind):]
- edge, err := gm.FetchEdge(part, key, kind)
- if err != nil {
- return err
- }
- // Write out JSON object
- fmt.Fprint(out, " {\n")
- writeData(edge.Data())
- if ie < len(edgeKeys)-1 {
- fmt.Fprint(out, " },\n")
- } else {
- fmt.Fprint(out, " }\n")
- }
- ie++
- }
- fmt.Fprint(out, ` ]
- }`)
- return nil
- }
- /*
- SortDump sorts a string result which was produced by ExportPartition.
- Do not use this for very large results. Panics if the input data is not valid.
- */
- func SortDump(in string) string {
- var nodes []data.Node
- var edges []data.Node
- dec := json.NewDecoder(bytes.NewBufferString(in))
- gdata := make(map[string][]map[string]interface{})
- errorutil.AssertOk(dec.Decode(&gdata))
- nDataList := gdata["nodes"]
- for _, n := range nDataList {
- nodes = append(nodes, data.NewGraphNodeFromMap(n))
- }
- data.NodeSort(nodes)
- for i, n := range nodes {
- nDataList[i] = n.Data()
- }
- eDataList := gdata["edges"]
- for _, n := range eDataList {
- edges = append(edges, data.NewGraphNodeFromMap(n))
- }
- data.NodeSort(edges)
- for i, e := range edges {
- eDataList[i] = e.Data()
- }
- res, err := json.MarshalIndent(map[string]interface{}{
- "nodes": nDataList,
- "edges": eDataList,
- }, "", " ")
- errorutil.AssertOk(err)
- return string(res)
- }
- /*
- ImportPartition imports the JSON contents of an io.Reader into a given partition.
- The following format is expected:
- {
- nodes : [ { <attr> : <value>, ... }, ... ]
- edges : [ { <attr> : <value>, ... }, ... ]
- }
- */
- func ImportPartition(in io.Reader, part string, gm *Manager) error {
- dec := json.NewDecoder(in)
- gdata := make(map[string][]map[string]interface{})
- if err := dec.Decode(&gdata); err != nil {
- return fmt.Errorf("Could not decode file content as object with list of nodes and edges: %s", err.Error())
- }
- nDataList := gdata["nodes"]
- eDataList := gdata["edges"]
- // Create a transaction
- trans := NewGraphTrans(gm)
- // Store nodes in transaction
- for _, ndata := range nDataList {
- node := data.NewGraphNodeFromMap(ndata)
- if err := trans.StoreNode(part, node); err != nil {
- return err
- }
- }
- // Store edges in transaction
- for _, edata := range eDataList {
- edge := data.NewGraphEdgeFromNode(data.NewGraphNodeFromMap(edata))
- if err := trans.StoreEdge(part, edge); err != nil {
- return err
- }
- }
- // Commit transaction
- return trans.Commit()
- }
- /*
- ExportWriterFactory produces a named writer.
- */
- type ExportWriterFactory interface {
- CreateWriter(name string) (io.Writer, error)
- }
- /*
- LargeScaleExportPartition dumps the contents of a partition into multiple io.Writer in line-delimited JSON format:
- { <attr> : <value>, ... },
- { <attr> : <value>, ... },
- ...
- */
- func LargeScaleExportPartition(ewf ExportWriterFactory, part string, gm *Manager) error {
- out, err := ewf.CreateWriter(fmt.Sprintf("%v-nodes", part))
- if err == nil {
- for _, k := range gm.NodeKinds() {
- var it *NodeKeyIterator
- if it, err = gm.NodeKeyIterator(part, k); err == nil {
- for it.HasNext() {
- key := it.Next()
- if err = it.Error(); err == nil {
- var node data.Node
- if node, err = gm.FetchNode(part, key, k); err == nil {
- var jsonBytes []byte
- if jsonBytes, err = json.Marshal(node.Data()); err == nil {
- _, err = out.Write(jsonBytes)
- fmt.Fprintln(out)
- }
- }
- }
- if err != nil {
- break
- }
- }
- }
- if err != nil {
- break
- }
- }
- }
- out, err = ewf.CreateWriter(fmt.Sprintf("%v-edges", part))
- if err == nil {
- for _, k := range gm.EdgeKinds() {
- var tree *hash.HTree
- if tree, err = gm.getEdgeStorageHTree(part, k, false); err == nil {
- gm.mutex.RLock()
- it := hash.NewHTreeIterator(tree)
- gm.mutex.RUnlock()
- if err = it.LastError; err == nil {
- for it.HasNext() {
- gm.mutex.RLock()
- binaryKey, _ := it.Next()
- gm.mutex.RUnlock()
- if prefix := binaryKey[:len(PrefixNSAttrs)]; string(prefix) != PrefixNSAttrs {
- continue
- }
- key := string(binaryKey[len(PrefixNSAttrs):])
- if err = it.LastError; err == nil {
- var node data.Node
- if node, err = gm.FetchEdge(part, key, k); err == nil {
- var jsonBytes []byte
- if jsonBytes, err = json.Marshal(node.Data()); err == nil {
- _, err = out.Write(jsonBytes)
- fmt.Fprintln(out)
- }
- }
- }
- if err != nil {
- break
- }
- }
- }
- }
- if err != nil {
- break
- }
- }
- }
- return err
- }
- /*
- ImportReaderFactory produces a named reader.
- */
- type ImportReaderFactory interface {
- Readers() []string
- CreateReader(name string) (io.Reader, error)
- }
- /*
- LargeScaleImportPartition dumps the contents of a partition into multiple io.Writer in line-delimited JSON format:
- { <attr> : <value>, ... },
- { <attr> : <value>, ... },
- ...
- */
- func LargeScaleImportPartition(irf ImportReaderFactory, part string, gm *Manager) error {
- var err error
- readers := irf.Readers()
- trans := NewRollingTrans(NewGraphTrans(gm), 1000, gm, NewGraphTrans)
- for _, r := range readers {
- var in io.Reader
- if in, err = irf.CreateReader(r); err == nil {
- isNode := strings.HasSuffix(r, "-nodes")
- scanner := bufio.NewScanner(in)
- for scanner.Scan() {
- var nodeData map[string]interface{}
- if err = json.Unmarshal(scanner.Bytes(), &nodeData); err == nil {
- if isNode {
- err = gm.StoreNode(part, data.NewGraphNodeFromMap(nodeData))
- } else {
- err = gm.StoreEdge(part, data.NewGraphEdgeFromNode(data.NewGraphNodeFromMap(nodeData)))
- }
- }
- if err != nil {
- break
- }
- }
- }
- if err != nil {
- break
- }
- }
- if err == nil {
- err = trans.Commit()
- }
- return err
- }
|