| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 | /* * EliasDB * * Copyright 2016 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */package v1import (	"encoding/json"	"fmt"	"net/http"	"devt.de/krotik/eliasdb/api")/*EndpointClusterQuery is the cluster endpoint URL (rooted). Handles everything under cluster/...*/const EndpointClusterQuery = api.APIRoot + APIv1 + "/cluster/"/*ClusterEndpointInst creates a new endpoint handler.*/func ClusterEndpointInst() api.RestEndpointHandler {	return &clusterEndpoint{}}/*Handler object for cluster queries.*/type clusterEndpoint struct {	*api.DefaultEndpointHandler}/*HandleGET handles a cluster query REST call.*/func (ce *clusterEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {	var data interface{}	// Check clustering is enabled	if api.DD == nil || api.DDLog == nil {		http.Error(w, "Clustering is not enabled on this instance", http.StatusServiceUnavailable)		return	}	if len(resources) == 1 && resources[0] == "log" {		// Cluster logs are requested		data = api.DDLog.StringSlice()	} else if len(resources) == 1 && resources[0] == "memberinfos" {		// Cluster member infos are requested		data = api.DD.MemberManager.MemberInfoCluster()	} else {		// By default the cluster state is returned		data = api.DD.MemberManager.StateInfo().Map()	}	// Write data	w.Header().Set("content-type", "application/json; charset=utf-8")	ret := json.NewEncoder(w)	ret.Encode(data)}/*HandlePUT handles a REST call to join/eject/ping members of the cluster.*/func (ce *clusterEndpoint) HandlePUT(w http.ResponseWriter, r *http.Request, resources []string) {	// Check parameters	if !checkResources(w, resources, 1, 1, "Need a command either: join or eject") {		return	}	dec := json.NewDecoder(r.Body)	args := make(map[string]string)	if err := dec.Decode(&args); err != nil {		http.Error(w, "Could not decode arguments: "+err.Error(), http.StatusBadRequest)		return	}	// Function to check arguments	getArg := func(arg string) (string, bool) {		v, ok := args[arg]		if !ok {			http.Error(w, fmt.Sprintf("Required argument %v missing in body arguments", arg), http.StatusBadRequest)		}		return v, ok	}	if resources[0] == "join" {		// Get required args		name, ok := getArg("name")		if ok {			rpc, ok := getArg("netaddr")			if ok {				err := api.DD.MemberManager.JoinCluster(name, rpc)				if err != nil {					http.Error(w, "Could not join the cluster: "+err.Error(), http.StatusForbidden)				}			}		}	} else if resources[0] == "eject" {		// Get required args		name, ok := getArg("name")		if ok {			err := api.DD.MemberManager.EjectMember(name)			if err != nil {				http.Error(w, "Could not eject "+name+" from cluster: "+err.Error(), http.StatusForbidden)			}		}	} else if resources[0] == "ping" {		// Get required args		name, ok := getArg("name")		if ok {			rpc, ok := getArg("netaddr")			if ok {				res, err := api.DD.MemberManager.Client.SendPing(name, rpc)				if err != nil {					http.Error(w, "Ping returned an error: "+err.Error(), http.StatusForbidden)				} else {					w.Header().Set("content-type", "application/json; charset=utf-8")					ret := json.NewEncoder(w)					ret.Encode(res)				}			}		}	} else {		http.Error(w, "Unknown command: "+resources[0], http.StatusBadRequest)	}}/*HandleDELETE handles a cluster delete REST call.*/func (ce *clusterEndpoint) HandleDELETE(w http.ResponseWriter, r *http.Request, resources []string) {	// Check clustering is enabled	if api.DD == nil || api.DDLog == nil {		http.Error(w, "Clustering is not enabled on this instance", http.StatusServiceUnavailable)		return	}	if len(resources) == 1 && resources[0] == "log" {		// Cluster log should be reset		api.DDLog.Reset()		return	}	http.Error(w, "Request had no effect", http.StatusBadRequest)}/*SwaggerDefs is used to describe the endpoint in swagger.*/func (ce *clusterEndpoint) SwaggerDefs(s map[string]interface{}) {	s["paths"].(map[string]interface{})["/v1/cluster"] = map[string]interface{}{		"get": map[string]interface{}{			"summary":     "Return cluster specific information.",			"description": "The cluster endpoint returns the cluster state info which contains cluster members and their state.",			"produces": []string{				"text/plain",				"application/json",			},			"responses": map[string]interface{}{				"200": map[string]interface{}{					"description": "A key-value map.",				},				"default": map[string]interface{}{					"description": "Error response",					"schema": map[string]interface{}{						"$ref": "#/definitions/Error",					},				},			},		},	}	s["paths"].(map[string]interface{})["/v1/cluster/{command}"] = map[string]interface{}{		"put": map[string]interface{}{			"summary":     "Commands can be given to the cluster by using PUT requests.",			"description": "The cluster can be controlled via this command endpoint on any member.",			"consumes": []string{				"application/json",			},			"produces": []string{				"text/plain",				"application/json",			},			"parameters": []map[string]interface{}{				{					"name":        "command",					"in":          "path",					"description": "Valid commands are: ping, join and eject.",					"required":    true,					"type":        "string",				},				{					"name":        "args",					"in":          "body",					"description": "Arguments for a command",					"required":    true,					"schema": map[string]interface{}{						"type": "object",						"properties": map[string]interface{}{							"name": map[string]interface{}{								"description": "Name of a cluster member (ping/join=member to contact, eject=member to eject).",								"type":        "string",							},							"netaddr": map[string]interface{}{								"description": "Network address of a member e.g. localhost:9030 (ping/join=member address to contact)",								"type":        "string",							},						},					},				},			},			"responses": map[string]interface{}{				"200": map[string]interface{}{					"description": "Only the ping command returns its result. All other positive responses are empty.",				},				"default": map[string]interface{}{					"description": "Error response",					"schema": map[string]interface{}{						"$ref": "#/definitions/Error",					},				},			},		},	}	s["paths"].(map[string]interface{})["/v1/cluster/memberinfos"] = map[string]interface{}{		"get": map[string]interface{}{			"summary":     "Return static member info of every known cluster member.",			"description": "The memberinfos returns the static member info of every known cluster member. If a member is not reachable its info contains a single key-value pair with the key error and an error message as value.",			"produces": []string{				"text/plain",				"application/json",			},			"responses": map[string]interface{}{				"200": map[string]interface{}{					"description": "A map of memberinfos (keys are member names).",				},				"default": map[string]interface{}{					"description": "Error response",					"schema": map[string]interface{}{						"$ref": "#/definitions/Error",					},				},			},		},	}	s["paths"].(map[string]interface{})["/v1/cluster/log"] = map[string]interface{}{		"get": map[string]interface{}{			"summary":     "Return latest cluster related log messages.",			"description": "The cluster log endpoint returns the latest cluster related log messages from a memory ring buffer.",			"produces": []string{				"text/plain",				"application/json",			},			"responses": map[string]interface{}{				"200": map[string]interface{}{					"description": "A list of log messages.",				},				"default": map[string]interface{}{					"description": "Error response",					"schema": map[string]interface{}{						"$ref": "#/definitions/Error",					},				},			},		},		"delete": map[string]interface{}{			"summary":     "Reset the cluster log.",			"description": "A delete call to the log endpoint resets the cluster related log and clears the ring buffer in memory.",			"produces": []string{				"text/plain",				"application/json",			},			"responses": map[string]interface{}{				"200": map[string]interface{}{					"description": "Cluster related log was reset.",				},				"default": map[string]interface{}{					"description": "Error response",					"schema": map[string]interface{}{						"$ref": "#/definitions/Error",					},				},			},		},	}	// Add generic error object to definition	s["definitions"].(map[string]interface{})["Error"] = map[string]interface{}{		"description": "A human readable error mesage.",		"type":        "string",	}}
 |