123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package v1
- import (
- "encoding/json"
- "fmt"
- "net/http"
- "sync"
- "time"
- "github.com/gorilla/websocket"
- "devt.de/krotik/common/stringutil"
- "devt.de/krotik/eliasdb/api"
- "devt.de/krotik/eliasdb/graphql"
- )
- /*
- EndpointGraphQLSubscriptions is the GraphQL endpoint URL for subscriptions (rooted). Handles websockets under graphql-subscriptions/
- */
- const EndpointGraphQLSubscriptions = api.APIRoot + APIv1 + "/graphql-subscriptions/"
- /*
- upgrader can upgrade normal requests to websocket communications
- */
- var upgrader = websocket.Upgrader{
- Subprotocols: []string{"graphql-subscriptions"},
- ReadBufferSize: 1024,
- WriteBufferSize: 1024,
- }
- var subscriptionCallbackError error
- /*
- GraphQLSubscriptionsEndpointInst creates a new endpoint handler.
- */
- func GraphQLSubscriptionsEndpointInst() api.RestEndpointHandler {
- return &graphQLSubscriptionsEndpoint{}
- }
- /*
- Handler object for GraphQL operations.
- */
- type graphQLSubscriptionsEndpoint struct {
- *api.DefaultEndpointHandler
- }
- /*
- HandleGET handles GraphQL subscription queries.
- */
- func (e *graphQLSubscriptionsEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
- // Update the incomming connection to a websocket
- // If the upgrade fails then the client gets an HTTP error response.
- conn, err := upgrader.Upgrade(w, r, nil)
- // Websocket connections support one concurrent reader and one concurrent writer.
- // See: https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency
- connRMutex := &sync.Mutex{}
- connWMutex := &sync.Mutex{}
- if err != nil {
- // We give details here on what went wrong
- w.Write([]byte(err.Error()))
- return
- }
- subID := ""
- // Ensure we have a partition to query
- partition := r.URL.Query().Get("partition")
- if partition == "" && len(resources) > 0 {
- partition = resources[0]
- }
- if partition == "" {
- connWMutex.Lock()
- e.WriteError(conn, subID, "Need a 'partition' in path or as url parameter", true)
- connWMutex.Unlock()
- return
- }
- connWMutex.Lock()
- conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"init_success","payload":{}}`))
- connWMutex.Unlock()
- // Create the callback handler for the subscription
- callbackHandler := &subscriptionCallbackHandler{
- finished: false,
- publish: func(data map[string]interface{}, err error) {
- var res []byte
- // Error for unit testing
- err = subscriptionCallbackError
- // This is called if data im the datastore changes
- if err == nil {
- res, err = json.Marshal(map[string]interface{}{
- "id": subID,
- "type": "subscription_data",
- "payload": data,
- })
- }
- if err != nil {
- connWMutex.Lock()
- e.WriteError(conn, subID, err.Error(), true)
- connWMutex.Unlock()
- return
- }
- connWMutex.Lock()
- conn.WriteMessage(websocket.TextMessage, res)
- connWMutex.Unlock()
- },
- }
- for {
- // Read websocket message
- connRMutex.Lock()
- _, msg, err := conn.ReadMessage()
- connRMutex.Unlock()
- if err != nil {
- // Unregister the callback handler
- callbackHandler.finished = true
- // If the client is still listening write the error message
- // This is a NOP if the client hang up
- connWMutex.Lock()
- e.WriteError(conn, subID, err.Error(), true)
- connWMutex.Unlock()
- return
- }
- data := make(map[string]interface{})
- if err := json.Unmarshal(msg, &data); err != nil {
- connWMutex.Lock()
- e.WriteError(conn, subID, err.Error(), false)
- connWMutex.Unlock()
- continue
- }
- // Check we got a message with a type
- if msgType, ok := data["type"]; ok {
- // Check if the user wants to start a new subscription
- if _, ok := data["query"]; msgType == "subscription_start" && ok {
- var res []byte
- subID = fmt.Sprint(data["id"])
- if _, ok := data["variables"]; !ok {
- data["variables"] = nil
- }
- if _, ok := data["operationName"]; !ok {
- data["operationName"] = nil
- }
- resData, err := graphql.RunQuery(stringutil.CreateDisplayString(partition)+" query",
- partition, data, api.GM, callbackHandler, false)
- if err == nil {
- res, err = json.Marshal(map[string]interface{}{
- "id": subID,
- "type": "subscription_data",
- "payload": resData,
- })
- }
- if err != nil {
- connWMutex.Lock()
- e.WriteError(conn, subID, err.Error(), false)
- connWMutex.Unlock()
- continue
- }
- connWMutex.Lock()
- conn.WriteMessage(websocket.TextMessage, []byte(
- fmt.Sprintf(`{"id":"%s","type":"subscription_success","payload":{}}`, subID)))
- conn.WriteMessage(websocket.TextMessage, res)
- connWMutex.Unlock()
- }
- }
- }
- }
- /*
- WriteError writes an error message to the websocket.
- */
- func (e *graphQLSubscriptionsEndpoint) WriteError(conn *websocket.Conn,
- subID string, msg string, close bool) {
- // Write the error as cleartext message
- data, _ := json.Marshal(map[string]interface{}{
- "id": subID,
- "type": "subscription_fail",
- "payload": map[string]interface{}{
- "errors": []string{msg},
- },
- })
- conn.WriteMessage(websocket.TextMessage, data)
- if close {
- // Write error as closing control message
- conn.WriteControl(websocket.CloseMessage,
- websocket.FormatCloseMessage(
- websocket.CloseUnsupportedData, msg), time.Now().Add(10*time.Second))
- conn.Close()
- }
- }
- /*
- SwaggerDefs is used to describe the endpoint in swagger.
- */
- func (e *graphQLSubscriptionsEndpoint) SwaggerDefs(s map[string]interface{}) {
- // No swagger definitions for this endpoint as it only handles websocket requests
- }
- // Callback Handler
- // ================
- /*
- subscriptionCallbackHandler pushes new events to a subscription client via a websocket.
- */
- type subscriptionCallbackHandler struct {
- finished bool
- publish func(data map[string]interface{}, err error)
- }
- func (ch *subscriptionCallbackHandler) Publish(data map[string]interface{}, err error) {
- ch.publish(data, err)
- }
- func (ch *subscriptionCallbackHandler) IsFinished() bool {
- return ch.finished
- }
|