123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359 |
- /*
- * Rufs - Remote Union File System
- *
- * Copyright 2017 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the MIT
- * License, If a copy of the MIT License was not distributed with this
- * file, You can obtain one at https://opensource.org/licenses/MIT.
- */
- /*
- Package node contains the network communication code for Rufs via RPC calls.
- */
- package node
- import (
- "crypto/tls"
- "encoding/gob"
- "fmt"
- "io"
- "net"
- "net/rpc"
- "os"
- "sort"
- "strings"
- "sync"
- "time"
- )
- func init() {
- // Make sure we can use the relevant types in a gob operation
- gob.Register(&RufsNodeToken{})
- gob.Register(map[string]string{})
- }
- /*
- DialTimeout is the dial timeout for RPC connections
- */
- var DialTimeout = 10 * time.Second
- /*
- RufsNodeToken is used to authenticate a node in the network to other nodes
- */
- type RufsNodeToken struct {
- NodeName string
- NodeAuth string
- }
- /*
- Client is the client for the RPC API of a node.
- */
- type Client struct {
- token *RufsNodeToken // Token to be send to other nodes for authentication
- rpc string // This client's rpc network interface (may be empty in case of pure clients)
- peers map[string]string // Map of node names to their rpc network interface
- conns map[string]*rpc.Client // Map of node names to network connections
- fingerprints map[string]string // Map of expected server certificate fingerprints
- cert *tls.Certificate // Client certificate
- maplock *sync.RWMutex // Lock for maps
- redial bool // Flag if this client is attempting a redial
- }
- /*
- SSLFingerprint returns the SSL fingerprint of the client.
- */
- func (c *Client) SSLFingerprint() string {
- var ret string
- if c.cert != nil && c.cert.Certificate[0] != nil {
- ret = fingerprint(c.cert.Certificate[0])
- }
- return ret
- }
- /*
- Shutdown closes all stored connections.
- */
- func (c *Client) Shutdown() {
- c.maplock.Lock()
- defer c.maplock.Unlock()
- for _, c := range c.conns {
- c.Close()
- }
- c.conns = make(map[string]*rpc.Client)
- }
- /*
- RegisterPeer registers a new peer to communicate with. An empty fingerprint
- means that the client will accept any certificate from the server.
- */
- func (c *Client) RegisterPeer(node string, rpc string, fingerprint string) error {
- if _, ok := c.peers[node]; ok {
- return fmt.Errorf("Peer already registered: %v", node)
- } else if rpc == "" {
- return fmt.Errorf("RPC interface must not be empty")
- }
- c.maplock.Lock()
- c.peers[node] = rpc
- delete(c.conns, node)
- c.fingerprints[node] = fingerprint
- c.maplock.Unlock()
- return nil
- }
- /*
- Peers returns all registered peers and their expected fingerprints.
- */
- func (c *Client) Peers() ([]string, []string) {
- ret := make([]string, 0, len(c.peers))
- fps := make([]string, len(c.peers))
- c.maplock.Lock()
- defer c.maplock.Unlock()
- for k := range c.peers {
- ret = append(ret, k)
- }
- sort.Strings(ret)
- for i, node := range ret {
- fps[i] = c.fingerprints[node]
- }
- return ret, fps
- }
- /*
- RemovePeer removes a registered peer.
- */
- func (c *Client) RemovePeer(node string) {
- c.maplock.Lock()
- delete(c.peers, node)
- delete(c.conns, node)
- delete(c.fingerprints, node)
- c.maplock.Unlock()
- }
- /*
- SendPing sends a ping to a node and returns the result. Second argument is
- optional if the target member is not a known peer. Should be an empty string
- in all other cases. Returns the answer, the fingerprint of the presented server
- certificate and any errors.
- */
- func (c *Client) SendPing(node string, rpc string) ([]string, string, error) {
- var ret []string
- var fp string
- if _, ok := c.peers[node]; !ok && rpc != "" {
- // Add member temporary if it was not registered
- c.maplock.Lock()
- c.peers[node] = rpc
- c.maplock.Unlock()
- defer func() {
- c.maplock.Lock()
- delete(c.peers, node)
- delete(c.conns, node)
- delete(c.fingerprints, node)
- c.maplock.Unlock()
- }()
- }
- res, err := c.SendRequest(node, RPCPing, nil)
- if res != nil && err == nil {
- ret = res.([]string)
- c.maplock.Lock()
- fp = c.fingerprints[node]
- c.maplock.Unlock()
- }
- return ret, fp, err
- }
- /*
- SendData sends a portion of data and some control information to a node and
- returns the result.
- */
- func (c *Client) SendData(node string, ctrl map[string]string, data []byte) ([]byte, error) {
- if _, ok := c.peers[node]; !ok {
- return nil, fmt.Errorf("Unknown peer: %v", node)
- }
- res, err := c.SendRequest(node, RPCData, map[RequestArgument]interface{}{
- RequestCTRL: ctrl,
- RequestDATA: data,
- })
- if res != nil {
- return res.([]byte), err
- }
- return nil, err
- }
- /*
- SendRequest sends a request to another node.
- */
- func (c *Client) SendRequest(node string, remoteCall RPCFunction,
- args map[RequestArgument]interface{}) (interface{}, error) {
- var err error
- // Function to categorize errors
- handleError := func(err error) error {
- if _, ok := err.(net.Error); ok {
- return &Error{ErrNodeComm, err.Error(), false}
- }
- // Wrap remote errors in a proper error object
- if err != nil && !strings.HasPrefix(err.Error(), "RufsError: ") {
- // Check if the error is known to report that a file or directory
- // does not exist.
- err = &Error{ErrRemoteAction, err.Error(), err.Error() == os.ErrNotExist.Error()}
- }
- return err
- }
- c.maplock.Lock()
- laddr, ok := c.peers[node]
- c.maplock.Unlock()
- if ok {
- // Get network connection to the node
- c.maplock.Lock()
- conn, ok := c.conns[node]
- c.maplock.Unlock()
- if !ok {
- // Create a new connection if necessary
- nconn, err := net.DialTimeout("tcp", laddr, DialTimeout)
- if err != nil {
- LogDebug(c.token.NodeName, ": ",
- fmt.Sprintf("- %v.%v (laddr=%v err=%v)",
- node, remoteCall, laddr, err))
- return nil, handleError(err)
- }
- if c.cert != nil && c.cert.Certificate[0] != nil {
- // Wrap the conn in a TLS client
- config := tls.Config{
- Certificates: []tls.Certificate{*c.cert},
- InsecureSkipVerify: true,
- }
- tlsconn := tls.Client(nconn, &config)
- // Do the handshake and look at the server certificate
- tlsconn.Handshake()
- rfp := fingerprint(tlsconn.ConnectionState().PeerCertificates[0].Raw)
- c.maplock.Lock()
- expected, _ := c.fingerprints[node]
- c.maplock.Unlock()
- if expected == "" {
- // Accept the certificate and store it
- c.maplock.Lock()
- c.fingerprints[node] = rfp
- c.maplock.Unlock()
- } else if expected != rfp {
- // Fingerprint was NOT verified
- LogDebug(c.token.NodeName, ": ",
- fmt.Sprintf("Not trusting %v (laddr=%v) presented fingerprint: %v expected fingerprint: %v", node, laddr, rfp, expected))
- return nil, &Error{ErrUntrustedTarget, node, false}
- }
- LogDebug(c.token.NodeName, ": ",
- fmt.Sprintf("%v (laddr=%v) has SSL fingerprint %v ", node, laddr, rfp))
- nconn = tlsconn
- }
- conn = rpc.NewClient(nconn)
- // Store the connection so it can be reused
- c.maplock.Lock()
- c.conns[node] = conn
- c.maplock.Unlock()
- }
- // Assemble the request
- request := map[RequestArgument]interface{}{
- RequestTARGET: node,
- RequestTOKEN: c.token,
- }
- if args != nil {
- for k, v := range args {
- request[k] = v
- }
- }
- var response interface{}
- LogDebug(c.token.NodeName, ": ",
- fmt.Sprintf("> %v.%v (laddr=%v)", node, remoteCall, laddr))
- err = conn.Call("RufsServer."+string(remoteCall), request, &response)
- if !c.redial && (err == rpc.ErrShutdown || err == io.EOF || err == io.ErrUnexpectedEOF) {
- // Delete the closed connection and retry the request
- c.maplock.Lock()
- delete(c.conns, node)
- c.redial = true // Set the redial flag to avoid a forever loop
- c.maplock.Unlock()
- return c.SendRequest(node, remoteCall, args)
- }
- // Reset redial flag
- c.maplock.Lock()
- c.redial = false
- c.maplock.Unlock()
- LogDebug(c.token.NodeName, ": ",
- fmt.Sprintf("< %v.%v (err=%v)", node, remoteCall, err))
- return response, handleError(err)
- }
- return nil, &Error{ErrUnknownTarget, node, false}
- }
|