| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 | /* * EliasDB * * Copyright 2016 Matthias Ladkau. All rights reserved. * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */package v1import (	"encoding/json"	"fmt"	"net/http"	"sync"	"time"	"github.com/gorilla/websocket"	"devt.de/krotik/common/stringutil"	"devt.de/krotik/eliasdb/api"	"devt.de/krotik/eliasdb/graphql")/*EndpointGraphQLSubscriptions is the GraphQL endpoint URL for subscriptions (rooted). Handles websockets under graphql-subscriptions/*/const EndpointGraphQLSubscriptions = api.APIRoot + APIv1 + "/graphql-subscriptions/"/*upgrader can upgrade normal requests to websocket communications*/var upgrader = websocket.Upgrader{	Subprotocols:    []string{"graphql-subscriptions"},	ReadBufferSize:  1024,	WriteBufferSize: 1024,}var subscriptionCallbackError error/*GraphQLSubscriptionsEndpointInst creates a new endpoint handler.*/func GraphQLSubscriptionsEndpointInst() api.RestEndpointHandler {	return &graphQLSubscriptionsEndpoint{}}/*Handler object for GraphQL operations.*/type graphQLSubscriptionsEndpoint struct {	*api.DefaultEndpointHandler}/*HandleGET handles GraphQL subscription queries.*/func (e *graphQLSubscriptionsEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {	// Update the incomming connection to a websocket	// If the upgrade fails then the client gets an HTTP error response.	conn, err := upgrader.Upgrade(w, r, nil)	// Websocket connections support one concurrent reader and one concurrent writer.	// See: https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency	connRMutex := &sync.Mutex{}	connWMutex := &sync.Mutex{}	if err != nil {		// We give details here on what went wrong		w.Write([]byte(err.Error()))		return	}	subID := ""	// Ensure we have a partition to query	partition := r.URL.Query().Get("partition")	if partition == "" && len(resources) > 0 {		partition = resources[0]	}	if partition == "" {		connWMutex.Lock()		e.WriteError(conn, subID, "Need a 'partition' in path or as url parameter", true)		connWMutex.Unlock()		return	}	connWMutex.Lock()	conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"init_success","payload":{}}`))	connWMutex.Unlock()	// Create the callback handler for the subscription	callbackHandler := &subscriptionCallbackHandler{		finished: false,		publish: func(data map[string]interface{}, err error) {			var res []byte			// Error for unit testing			err = subscriptionCallbackError			// This is called if data im the datastore changes			if err == nil {				res, err = json.Marshal(map[string]interface{}{					"id":      subID,					"type":    "subscription_data",					"payload": data,				})			}			if err != nil {				connWMutex.Lock()				e.WriteError(conn, subID, err.Error(), true)				connWMutex.Unlock()				return			}			connWMutex.Lock()			conn.WriteMessage(websocket.TextMessage, res)			connWMutex.Unlock()		},	}	for {		// Read websocket message		connRMutex.Lock()		_, msg, err := conn.ReadMessage()		connRMutex.Unlock()		if err != nil {			// Unregister the callback handler			callbackHandler.finished = true			// If the client is still listening write the error message			// This is a NOP if the client hang up			connWMutex.Lock()			e.WriteError(conn, subID, err.Error(), true)			connWMutex.Unlock()			return		}		data := make(map[string]interface{})		if err := json.Unmarshal(msg, &data); err != nil {			connWMutex.Lock()			e.WriteError(conn, subID, err.Error(), false)			connWMutex.Unlock()			continue		}		// Check we got a message with a type		if msgType, ok := data["type"]; ok {			// Check if the user wants to start a new subscription			if _, ok := data["query"]; msgType == "subscription_start" && ok {				var res []byte				subID = fmt.Sprint(data["id"])				if _, ok := data["variables"]; !ok {					data["variables"] = nil				}				if _, ok := data["operationName"]; !ok {					data["operationName"] = nil				}				resData, err := graphql.RunQuery(stringutil.CreateDisplayString(partition)+" query",					partition, data, api.GM, callbackHandler, false)				if err == nil {					res, err = json.Marshal(map[string]interface{}{						"id":      subID,						"type":    "subscription_data",						"payload": resData,					})				}				if err != nil {					connWMutex.Lock()					e.WriteError(conn, subID, err.Error(), false)					connWMutex.Unlock()					continue				}				connWMutex.Lock()				conn.WriteMessage(websocket.TextMessage, []byte(					fmt.Sprintf(`{"id":"%s","type":"subscription_success","payload":{}}`, subID)))				conn.WriteMessage(websocket.TextMessage, res)				connWMutex.Unlock()			}		}	}}/*WriteError writes an error message to the websocket.*/func (e *graphQLSubscriptionsEndpoint) WriteError(conn *websocket.Conn,	subID string, msg string, close bool) {	// Write the error as cleartext message	data, _ := json.Marshal(map[string]interface{}{		"id":   subID,		"type": "subscription_fail",		"payload": map[string]interface{}{			"errors": []string{msg},		},	})	conn.WriteMessage(websocket.TextMessage, data)	if close {		// Write error as closing control message		conn.WriteControl(websocket.CloseMessage,			websocket.FormatCloseMessage(				websocket.CloseUnsupportedData, msg), time.Now().Add(10*time.Second))		conn.Close()	}}/*SwaggerDefs is used to describe the endpoint in swagger.*/func (e *graphQLSubscriptionsEndpoint) SwaggerDefs(s map[string]interface{}) {	// No swagger definitions for this endpoint as it only handles websocket requests}// Callback Handler// ================/*subscriptionCallbackHandler pushes new events to a subscription client via a websocket.*/type subscriptionCallbackHandler struct {	finished bool	publish  func(data map[string]interface{}, err error)}func (ch *subscriptionCallbackHandler) Publish(data map[string]interface{}, err error) {	ch.publish(data, err)}func (ch *subscriptionCallbackHandler) IsFinished() bool {	return ch.finished}
 |