rpcserver: add batched request support (json 2.0)

This commit is contained in:
Donald Adu-Poku 2017-09-01 23:53:48 +00:00 committed by Dave Collins
parent 1755f5190f
commit a0438c81c3
21 changed files with 794 additions and 284 deletions

View File

@ -128,7 +128,7 @@ func main() {
// Marshal the command into a JSON-RPC byte slice in preparation for
// sending it to the RPC server.
marshalledJSON, err := dcrjson.MarshalCmd(1, cmd)
marshalledJSON, err := dcrjson.MarshalCmd("1.0", 1, cmd)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)

View File

@ -142,7 +142,7 @@ func TestBtcdExtCmds(t *testing.T) {
for i, test := range tests {
// Marshal the command as created by the new static command
// creation function.
marshalled, err := dcrjson.MarshalCmd(testID, test.staticCmd())
marshalled, err := dcrjson.MarshalCmd("1.0", testID, test.staticCmd())
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)
@ -166,7 +166,7 @@ func TestBtcdExtCmds(t *testing.T) {
// Marshal the command as created by the generic new command
// creation function.
marshalled, err = dcrjson.MarshalCmd(testID, cmd)
marshalled, err = dcrjson.MarshalCmd("1.0", testID, cmd)
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)

View File

@ -119,7 +119,7 @@ func TestBtcWalletExtCmds(t *testing.T) {
for i, test := range tests {
// Marshal the command as created by the new static command
// creation function.
marshalled, err := dcrjson.MarshalCmd(testID, test.staticCmd())
marshalled, err := dcrjson.MarshalCmd("1.0", testID, test.staticCmd())
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)
@ -143,7 +143,7 @@ func TestBtcWalletExtCmds(t *testing.T) {
// Marshal the command as created by the generic new command
// creation function.
marshalled, err = dcrjson.MarshalCmd(testID, cmd)
marshalled, err = dcrjson.MarshalCmd("1.0", testID, cmd)
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)

View File

@ -983,7 +983,7 @@ func TestChainSvrCmds(t *testing.T) {
for i, test := range tests {
// Marshal the command as created by the new static command
// creation function.
marshalled, err := dcrjson.MarshalCmd(testID, test.staticCmd())
marshalled, err := dcrjson.MarshalCmd("1.0", testID, test.staticCmd())
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)
@ -1008,7 +1008,7 @@ func TestChainSvrCmds(t *testing.T) {
// Marshal the command as created by the generic new command
// creation function.
marshalled, err = dcrjson.MarshalCmd(testID, cmd)
marshalled, err = dcrjson.MarshalCmd("1.0", testID, cmd)
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)

View File

@ -163,7 +163,7 @@ func TestChainSvrWsCmds(t *testing.T) {
for i, test := range tests {
// Marshal the command as created by the new static command
// creation function.
marshalled, err := dcrjson.MarshalCmd(testID, test.staticCmd())
marshalled, err := dcrjson.MarshalCmd("1.0", testID, test.staticCmd())
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)
@ -187,7 +187,7 @@ func TestChainSvrWsCmds(t *testing.T) {
// Marshal the command as created by the generic new command
// creation function.
marshalled, err = dcrjson.MarshalCmd(testID, cmd)
marshalled, err = dcrjson.MarshalCmd("1.0", testID, cmd)
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)

View File

@ -119,7 +119,7 @@ func TestChainSvrWsNtfns(t *testing.T) {
for i, test := range tests {
// Marshal the notification as created by the new static
// creation function. The ID is nil for notifications.
marshalled, err := dcrjson.MarshalCmd(nil, test.staticNtfn())
marshalled, err := dcrjson.MarshalCmd("1.0", nil, test.staticNtfn())
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)
@ -144,7 +144,7 @@ func TestChainSvrWsNtfns(t *testing.T) {
// Marshal the notification as created by the generic new
// notification creation function. The ID is nil for
// notifications.
marshalled, err = dcrjson.MarshalCmd(nil, cmd)
marshalled, err = dcrjson.MarshalCmd("1.0", nil, cmd)
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)

View File

@ -36,7 +36,7 @@ func makeParams(rt reflect.Type, rv reflect.Value) []interface{} {
// is suitable for transmission to an RPC server. The provided command type
// must be a registered type. All commands provided by this package are
// registered by default.
func MarshalCmd(id interface{}, cmd interface{}) ([]byte, error) {
func MarshalCmd(rpcVersion string, id interface{}, cmd interface{}) ([]byte, error) {
// Look up the cmd type and error out if not registered.
rt := reflect.TypeOf(cmd)
registerLock.RLock()
@ -60,7 +60,7 @@ func MarshalCmd(id interface{}, cmd interface{}) ([]byte, error) {
params := makeParams(rt.Elem(), rv.Elem())
// Generate and marshal the final JSON-RPC request.
rawCmd, err := NewRequest(id, method, params)
rawCmd, err := NewRequest(rpcVersion, id, method, params)
if err != nil {
return nil, err
}

View File

@ -434,7 +434,7 @@ func TestMarshalCmdErrors(t *testing.T) {
t.Logf("Running %d tests", len(tests))
for i, test := range tests {
_, err := dcrjson.MarshalCmd(test.id, test.cmd)
_, err := dcrjson.MarshalCmd("1.0", test.id, test.cmd)
if reflect.TypeOf(err) != reflect.TypeOf(test.err) {
t.Errorf("Test #%d (%s) wrong error type - got `%T` (%v), want `%T`",
i, test.name, err, err, test.err)

View File

@ -75,7 +75,7 @@ func TestDcrdCmds(t *testing.T) {
for i, test := range tests {
// Marshal the command as created by the new static command
// creation function.
marshalled, err := dcrjson.MarshalCmd(testID, test.staticCmd())
marshalled, err := dcrjson.MarshalCmd("1.0", testID, test.staticCmd())
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)
@ -99,7 +99,7 @@ func TestDcrdCmds(t *testing.T) {
// Marshal the command as created by the generic new command
// creation function.
marshalled, err = dcrjson.MarshalCmd(testID, cmd)
marshalled, err = dcrjson.MarshalCmd("1.0", testID, cmd)
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)

View File

@ -67,7 +67,7 @@ func TestDcrWalletExtCmds(t *testing.T) {
for i, test := range tests {
// Marshal the command as created by the new static command
// creation function.
marshalled, err := dcrjson.MarshalCmd(testID, test.staticCmd())
marshalled, err := dcrjson.MarshalCmd("1.0", testID, test.staticCmd())
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)
@ -91,7 +91,7 @@ func TestDcrWalletExtCmds(t *testing.T) {
// Marshal the command as created by the generic new command
// creation function.
marshalled, err = dcrjson.MarshalCmd(testID, cmd)
marshalled, err = dcrjson.MarshalCmd("1.0", testID, cmd)
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)

View File

@ -126,7 +126,7 @@ func TestDcrwalletChainSvrWsNtfns(t *testing.T) {
for i, test := range tests {
// Marshal the notification as created by the new static
// creation function. The ID is nil for notifications.
marshalled, err := dcrjson.MarshalCmd(nil, test.staticNtfn())
marshalled, err := dcrjson.MarshalCmd("1.0", nil, test.staticNtfn())
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)
@ -151,7 +151,7 @@ func TestDcrwalletChainSvrWsNtfns(t *testing.T) {
// Marshal the notification as created by the generic new
// notification creation function. The ID is nil for
// notifications.
marshalled, err = dcrjson.MarshalCmd(nil, cmd)
marshalled, err = dcrjson.MarshalCmd("1.0", nil, cmd)
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)

View File

@ -28,7 +28,7 @@ func ExampleMarshalCmd() {
// server. Typically the client would increment the id here which is
// request so the response can be identified.
id := 1
marshalledBytes, err := dcrjson.MarshalCmd(id, gbCmd)
marshalledBytes, err := dcrjson.MarshalCmd("1.0", id, gbCmd)
if err != nil {
fmt.Println(err)
return
@ -98,7 +98,7 @@ func ExampleUnmarshalCmd() {
func ExampleMarshalResponse() {
// Marshal a new JSON-RPC response. For example, this is a response
// to a getblockheight request.
marshalledBytes, err := dcrjson.MarshalResponse(1, 350001, nil)
marshalledBytes, err := dcrjson.MarshalResponse("1.0", 1, 350001, nil)
if err != nil {
fmt.Println(err)
return
@ -110,7 +110,7 @@ func ExampleMarshalResponse() {
fmt.Printf("%s\n", marshalledBytes)
// Output:
// {"result":350001,"error":null,"id":1}
// {"jsonrpc":"1.0","result":350001,"error":null,"id":1}
}
// This example demonstrates how to unmarshal a JSON-RPC response and then

View File

@ -61,12 +61,12 @@ func IsValidIDType(id interface{}) bool {
}
}
// Request is a type for raw JSON-RPC 1.0 requests. The Method field identifies
// the specific command type which in turns leads to different parameters.
// Callers typically will not use this directly since this package provides a
// statically typed command infrastructure which handles creation of these
// requests, however this struct it being exported in case the caller wants to
// construct raw requests for some reason.
// Request represents raw JSON-RPC requests. The Method field identifies the
// specific command type which in turn leads to different parameters. Callers
// typically will not use this directly since this package provides a statically
// typed command infrastructure which handles creation of these requests,
// however this struct is being exported in case the caller wants to construct
// raw requests for some reason.
type Request struct {
Jsonrpc string `json:"jsonrpc"`
Method string `json:"method"`
@ -74,15 +74,65 @@ type Request struct {
ID interface{} `json:"id"`
}
// NewRequest returns a new JSON-RPC 1.0 request object given the provided id,
// method, and parameters. The parameters are marshalled into a json.RawMessage
// for the Params field of the returned request object. This function is only
// provided in case the caller wants to construct raw requests for some reason.
//
// Typically callers will instead want to create a registered concrete command
// type with the NewCmd or New<Foo>Cmd functions and call the MarshalCmd
// function with that command to generate the marshalled JSON-RPC request.
func NewRequest(id interface{}, method string, params []interface{}) (*Request, error) {
// UnmarshalJSON is a custom unmarshal func for the Request struct. The param
// field defaults to an empty json.RawMessage array it is omitted by the request
// or nil if the supplied value is invalid.
func (request *Request) UnmarshalJSON(b []byte) error {
var data map[string]interface{}
err := json.Unmarshal(b, &data)
if err != nil {
return err
}
request.ID = data["id"]
methodValue, hasMethod := data["method"]
if hasMethod {
request.Method = methodValue.(string)
}
jsonrpcValue, hasJsonrpc := data["jsonrpc"]
if hasJsonrpc {
request.Jsonrpc = jsonrpcValue.(string)
}
paramsValue, hasParams := data["params"]
if !hasParams {
// set the request param to an empty array if it is ommited in the request
request.Params = []json.RawMessage{}
}
if hasParams {
// assert the request params is an array of data
params, paramsOk := paramsValue.([]interface{})
if paramsOk {
rawParams := make([]json.RawMessage, 0, len(params))
for _, param := range params {
marshalledParam, err := json.Marshal(param)
if err != nil {
return err
}
rawMessage := json.RawMessage(marshalledParam)
rawParams = append(rawParams, rawMessage)
}
request.Params = rawParams
}
}
return nil
}
// NewRequest returns a new JSON-RPC request object given the provided rpc
// version, id, method, and parameters. The parameters are marshalled into a
// json.RawMessage for the Params field of the returned request object. This
// function is only provided in case the caller wants to construct raw requests
// for some reason. Typically callers will instead want to create a registered
// concrete command type with the NewCmd or New<Foo>Cmd functions and call the
// MarshalCmd function with that command to generate the marshalled JSON-RPC
// request.
func NewRequest(rpcVersion string, id interface{}, method string, params []interface{}) (*Request, error) {
// default to JSON-RPC 1.0 if RPC type is not specified
if rpcVersion != "2.0" && rpcVersion != "1.0" {
rpcVersion = "1.0"
}
if !IsValidIDType(id) {
str := fmt.Sprintf("the id of type '%T' is invalid", id)
return nil, makeError(ErrInvalidType, str)
@ -99,30 +149,34 @@ func NewRequest(id interface{}, method string, params []interface{}) (*Request,
}
return &Request{
Jsonrpc: "1.0",
Jsonrpc: rpcVersion,
ID: id,
Method: method,
Params: rawParams,
}, nil
}
// Response is the general form of a JSON-RPC response. The type of the Result
// field varies from one command to the next, so it is implemented as an
// interface. The ID field has to be a pointer for Go to put a null in it when
// Response is the general form of a JSON-RPC response. The type of the
// Result field varies from one command to the next, so it is implemented as an
// interface. The ID field has to be a pointer to allow for a nil value when
// empty.
type Response struct {
Result json.RawMessage `json:"result"`
Error *RPCError `json:"error"`
ID *interface{} `json:"id"`
Jsonrpc string `json:"jsonrpc"`
Result json.RawMessage `json:"result"`
Error *RPCError `json:"error"`
ID *interface{} `json:"id"`
}
// NewResponse returns a new JSON-RPC response object given the provided id,
// marshalled result, and RPC error. This function is only provided in case the
// caller wants to construct raw responses for some reason.
//
// NewResponse returns a new JSON-RPC response object given the provided rpc
// version, id, marshalled result, and RPC error. This function is only
// provided in case the caller wants to construct raw responses for some reason.
// Typically callers will instead want to create the fully marshalled JSON-RPC
// response to send over the wire with the MarshalResponse function.
func NewResponse(id interface{}, marshalledResult []byte, rpcErr *RPCError) (*Response, error) {
func NewResponse(rpcVersion string, id interface{}, marshalledResult []byte, rpcErr *RPCError) (*Response, error) {
if rpcVersion != "2.0" && rpcVersion != "1.0" {
rpcVersion = "1.0"
}
if !IsValidIDType(id) {
str := fmt.Sprintf("the id of type '%T' is invalid", id)
return nil, makeError(ErrInvalidType, str)
@ -130,20 +184,26 @@ func NewResponse(id interface{}, marshalledResult []byte, rpcErr *RPCError) (*Re
pid := &id
return &Response{
Result: marshalledResult,
Error: rpcErr,
ID: pid,
Jsonrpc: rpcVersion,
Result: marshalledResult,
Error: rpcErr,
ID: pid,
}, nil
}
// MarshalResponse marshals the passed id, result, and RPCError to a JSON-RPC
// response byte slice that is suitable for transmission to a JSON-RPC client.
func MarshalResponse(id interface{}, result interface{}, rpcErr *RPCError) ([]byte, error) {
// MarshalResponse marshals the passed rpc version, id, result, and RPCError to
// a JSON-RPC response byte slice that is suitable for transmission to a
// JSON-RPC client.
func MarshalResponse(rpcVersion string, id interface{}, result interface{}, rpcErr *RPCError) ([]byte, error) {
if rpcVersion != "2.0" && rpcVersion != "1.0" {
rpcVersion = "1.0"
}
marshalledResult, err := json.Marshal(result)
if err != nil {
return nil, err
}
response, err := NewResponse(id, marshalledResult, rpcErr)
response, err := NewResponse(rpcVersion, id, marshalledResult, rpcErr)
if err != nil {
return nil, err
}

View File

@ -16,7 +16,6 @@ import (
// TestIsValidIDType ensures the IsValidIDType function behaves as expected.
func TestIsValidIDType(t *testing.T) {
t.Parallel()
tests := []struct {
name string
id interface{}
@ -57,7 +56,6 @@ func TestIsValidIDType(t *testing.T) {
// TestMarshalResponse ensures the MarshalResponse function works as expected.
func TestMarshalResponse(t *testing.T) {
t.Parallel()
testID := 1
tests := []struct {
name string
@ -69,7 +67,7 @@ func TestMarshalResponse(t *testing.T) {
name: "ordinary bool result with no error",
result: true,
jsonErr: nil,
expected: []byte(`{"result":true,"error":null,"id":1}`),
expected: []byte(`{"jsonrpc":"1.0","result":true,"error":null,"id":1}`),
},
{
name: "result with error",
@ -77,14 +75,14 @@ func TestMarshalResponse(t *testing.T) {
jsonErr: func() *dcrjson.RPCError {
return dcrjson.NewRPCError(dcrjson.ErrRPCBlockNotFound, "123 not found")
}(),
expected: []byte(`{"result":null,"error":{"code":-5,"message":"123 not found"},"id":1}`),
expected: []byte(`{"jsonrpc":"1.0","result":null,"error":{"code":-5,"message":"123 not found"},"id":1}`),
},
}
t.Logf("Running %d tests", len(tests))
for i, test := range tests {
_, _ = i, test
marshalled, err := dcrjson.MarshalResponse(testID, test.result, test.jsonErr)
marshalled, err := dcrjson.MarshalResponse("1.0", testID, test.result, test.jsonErr)
if err != nil {
t.Errorf("Test #%d (%s) unexpected error: %v", i,
test.name, err)
@ -102,10 +100,9 @@ func TestMarshalResponse(t *testing.T) {
// TestMiscErrors tests a few error conditions not covered elsewhere.
func TestMiscErrors(t *testing.T) {
t.Parallel()
// Force an error in NewRequest by giving it a parameter type that is
// not supported.
_, err := dcrjson.NewRequest(nil, "test", []interface{}{make(chan int)})
_, err := dcrjson.NewRequest("1.0", nil, "test", []interface{}{make(chan int)})
if err == nil {
t.Error("NewRequest: did not receive error")
return
@ -114,7 +111,7 @@ func TestMiscErrors(t *testing.T) {
// Force an error in MarshalResponse by giving it an id type that is not
// supported.
wantErr := dcrjson.Error{Code: dcrjson.ErrInvalidType}
_, err = dcrjson.MarshalResponse(make(chan int), nil, nil)
_, err = dcrjson.MarshalResponse("", make(chan int), nil, nil)
if jerr, ok := err.(dcrjson.Error); !ok || jerr.Code != wantErr.Code {
t.Errorf("MarshalResult: did not receive expected error - got "+
"%v (%[1]T), want %v (%[2]T)", err, wantErr)
@ -123,7 +120,7 @@ func TestMiscErrors(t *testing.T) {
// Force an error in MarshalResponse by giving it a result type that
// can't be marshalled.
_, err = dcrjson.MarshalResponse(1, make(chan int), nil)
_, err = dcrjson.MarshalResponse("1.0", 1, make(chan int), nil)
if _, ok := err.(*json.UnsupportedTypeError); !ok {
wantErr := &json.UnsupportedTypeError{}
t.Errorf("MarshalResult: did not receive expected error - got "+
@ -135,7 +132,6 @@ func TestMiscErrors(t *testing.T) {
// TestRPCError tests the error output for the RPCError type.
func TestRPCError(t *testing.T) {
t.Parallel()
tests := []struct {
in *dcrjson.RPCError
want string

View File

@ -1116,7 +1116,7 @@ func TestWalletSvrCmds(t *testing.T) {
for i, test := range tests {
// Marshal the command as created by the new static command
// creation function.
marshalled, err := dcrjson.MarshalCmd(testID, test.staticCmd())
marshalled, err := dcrjson.MarshalCmd("1.0", testID, test.staticCmd())
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)
@ -1140,7 +1140,7 @@ func TestWalletSvrCmds(t *testing.T) {
// Marshal the command as created by the generic new command
// creation function.
marshalled, err = dcrjson.MarshalCmd(testID, cmd)
marshalled, err = dcrjson.MarshalCmd("1.0", testID, cmd)
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)

View File

@ -196,7 +196,7 @@ func TestWalletSvrWsCmds(t *testing.T) {
for i, test := range tests {
// Marshal the command as created by the new static command
// creation function.
marshalled, err := dcrjson.MarshalCmd(testID, test.staticCmd())
marshalled, err := dcrjson.MarshalCmd("1.0", testID, test.staticCmd())
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)
@ -220,7 +220,7 @@ func TestWalletSvrWsCmds(t *testing.T) {
// Marshal the command as created by the generic new command
// creation function.
marshalled, err = dcrjson.MarshalCmd(testID, cmd)
marshalled, err = dcrjson.MarshalCmd("1.0", testID, cmd)
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)

View File

@ -117,7 +117,7 @@ func TestWalletSvrWsNtfns(t *testing.T) {
for i, test := range tests {
// Marshal the notification as created by the new static
// creation function. The ID is nil for notifications.
marshalled, err := dcrjson.MarshalCmd(nil, test.staticNtfn())
marshalled, err := dcrjson.MarshalCmd("1.0", nil, test.staticNtfn())
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)
@ -142,7 +142,7 @@ func TestWalletSvrWsNtfns(t *testing.T) {
// Marshal the notification as created by the generic new
// notification creation function. The ID is nil for
// notifications.
marshalled, err = dcrjson.MarshalCmd(nil, cmd)
marshalled, err = dcrjson.MarshalCmd("1.0", nil, cmd)
if err != nil {
t.Errorf("MarshalCmd #%d (%s) unexpected error: %v", i,
test.name, err)

View File

@ -75,7 +75,7 @@ JSON-RPC API are:
| |HTTP POST Requests|Websockets|
|---|------------------|----------|
|Allows multiple requests across a single connection|No|Yes|
|Allows multiple requests across a single connection|Yes|Yes|
|Supports asynchronous notifications|No|Yes|
|Scales well with large numbers of requests|No|Yes|

View File

@ -881,7 +881,7 @@ func (c *Client) sendCmd(cmd interface{}) chan *response {
// Marshal the command.
id := c.NextID()
marshalledJSON, err := dcrjson.MarshalCmd(id, cmd)
marshalledJSON, err := dcrjson.MarshalCmd("1.0", id, cmd)
if err != nil {
return newFutureError(err)
}

View File

@ -135,6 +135,9 @@ var (
// declared here to avoid the overhead of creating the slice on every
// invocation for constant data.
gbtCapabilities = []string{"proposal"}
// JSON 2.0 batched request prefix
batchedRequestPrefix = []byte("[")
)
// Errors
@ -5861,10 +5864,11 @@ func (s *rpcServer) checkAuth(r *http.Request, require bool) (bool, bool, error)
// a known concrete command along with any error that might have happened while
// parsing it.
type parsedRPCCmd struct {
id interface{}
method string
cmd interface{}
err *dcrjson.RPCError
jsonrpc string
id interface{}
method string
cmd interface{}
err *dcrjson.RPCError
}
// standardCmdResult checks that a parsed command is a standard Bitcoin
@ -5888,7 +5892,6 @@ func (s *rpcServer) standardCmdResult(cmd *parsedRPCCmd, closeChan <-chan struct
}
return nil, dcrjson.ErrRPCMethodNotFound
handled:
return handler(s, cmd.cmd, closeChan)
}
@ -5897,9 +5900,11 @@ handled:
// is suitable for use in replies if the command is invalid in some way such as
// an unregistered command or invalid parameters.
func parseCmd(request *dcrjson.Request) *parsedRPCCmd {
var parsedCmd parsedRPCCmd
parsedCmd.id = request.ID
parsedCmd.method = request.Method
parsedCmd := parsedRPCCmd{
jsonrpc: request.Jsonrpc,
id: request.ID,
method: request.Method,
}
cmd, err := dcrjson.UnmarshalCmd(request)
if err != nil {
@ -5907,14 +5912,13 @@ func parseCmd(request *dcrjson.Request) *parsedRPCCmd {
// produce a method not found RPC error.
if jerr, ok := err.(dcrjson.Error); ok &&
jerr.Code == dcrjson.ErrUnregisteredMethod {
parsedCmd.err = dcrjson.ErrRPCMethodNotFound
return &parsedCmd
}
// Otherwise, some type of invalid parameters is the cause, so
// produce the equivalent RPC error.
parsedCmd.err = rpcInvalidError("Parse error: %v", err)
parsedCmd.err = rpcInvalidError("Failed to parse request: %v", err)
return &parsedCmd
}
@ -5923,9 +5927,9 @@ func parseCmd(request *dcrjson.Request) *parsedRPCCmd {
}
// createMarshalledReply returns a new marshalled JSON-RPC response given the
// passed parameters. It will automatically convert errors that are not of
// the type *dcrjson.RPCError to the appropriate type as needed.
func createMarshalledReply(id, result interface{}, replyErr error) ([]byte, error) {
// passed parameters. It will automatically convert errors that are not of the
// type *dcrjson.RPCError to the appropriate type as needed.
func createMarshalledReply(rpcVersion string, id interface{}, result interface{}, replyErr error) ([]byte, error) {
var jsonErr *dcrjson.RPCError
if replyErr != nil {
if jErr, ok := replyErr.(*dcrjson.RPCError); ok {
@ -5935,7 +5939,60 @@ func createMarshalledReply(id, result interface{}, replyErr error) ([]byte, erro
}
}
return dcrjson.MarshalResponse(id, result, jsonErr)
return dcrjson.MarshalResponse(rpcVersion, id, result, jsonErr)
}
// processRequest determines the incoming request type (single or batched),
// parses it and returns a marshalled response.
func (s *rpcServer) processRequest(request *dcrjson.Request, isAdmin bool, closeChan <-chan struct{}) []byte {
var result interface{}
var jsonErr error
if !isAdmin {
if _, ok := rpcLimited[request.Method]; !ok {
jsonErr = rpcInvalidError("limited user not " +
"authorized for this method")
}
}
if jsonErr == nil {
if request.Method == "" || request.Params == nil {
jsonErr = &dcrjson.RPCError{
Code: dcrjson.ErrRPCInvalidRequest.Code,
Message: fmt.Sprintf("Invalid request: malformed"),
}
msg, err := createMarshalledReply(request.Jsonrpc, request.ID, result, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply: %v", err)
return nil
}
return msg
}
// Valid requests with no ID (notifications) must not have a response
// per the JSON-RPC spec.
if request.ID == nil {
return nil
}
// Attempt to parse the JSON-RPC request into a known
// concrete command.
parsedCmd := parseCmd(request)
if parsedCmd.err != nil {
jsonErr = parsedCmd.err
} else {
result, jsonErr = s.standardCmdResult(parsedCmd,
closeChan)
}
}
// Marshal the response.
msg, err := createMarshalledReply(request.Jsonrpc, request.ID, result, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply: %v", err)
return nil
}
return msg
}
// jsonRPCRead handles reading and responding to RPC messages.
@ -5970,6 +6027,7 @@ func (s *rpcServer) jsonRPCRead(w http.ResponseWriter, r *http.Request, isAdmin
errCode)
return
}
conn, buf, err := hj.Hijack()
if err != nil {
rpcsLog.Warnf("Failed to hijack HTTP connection: %v", err)
@ -5978,70 +6036,169 @@ func (s *rpcServer) jsonRPCRead(w http.ResponseWriter, r *http.Request, isAdmin
err.Error(), errCode)
return
}
defer conn.Close()
defer buf.Flush()
conn.SetReadDeadline(timeZeroVal)
// Attempt to parse the raw body into a JSON-RPC request.
var responseID interface{}
var jsonErr error
var result interface{}
var request dcrjson.Request
if err := json.Unmarshal(body, &request); err != nil {
jsonErr = &dcrjson.RPCError{
Code: dcrjson.ErrRPCParse.Code,
Message: fmt.Sprintf("Failed to parse request: %v",
err),
// Setup a close notifier. Since the connection is hijacked,
// the CloseNotifer on the ResponseWriter is not available.
closeChan := make(chan struct{}, 1)
go func() {
_, err = conn.Read(make([]byte, 1))
if err != nil {
close(closeChan)
}
}()
var results []json.RawMessage
var batchSize int
var batchedRequest bool
// Determine request type
if bytes.HasPrefix(body, batchedRequestPrefix) {
batchedRequest = true
}
if jsonErr == nil {
// Requests with no ID (notifications) must not have a response
// per the JSON-RPC spec.
if request.ID == nil {
return
}
// The parse was at least successful enough to have an ID so
// set it for the response.
responseID = request.ID
// Setup a close notifier. Since the connection is hijacked,
// the CloseNotifer on the ResponseWriter is not available.
closeChan := make(chan struct{}, 1)
go func() {
_, err := conn.Read(make([]byte, 1))
// Process a single request
if !batchedRequest {
var req dcrjson.Request
var resp json.RawMessage
err = json.Unmarshal(body, &req)
if err != nil {
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCParse.Code,
Message: fmt.Sprintf("Failed to parse request: %v",
err),
}
resp, err = dcrjson.MarshalResponse("1.0", nil, nil, jsonErr)
if err != nil {
close(closeChan)
}
}()
// Check if the user is limited and set error if method
// unauthorized
if !isAdmin {
if _, ok := rpcLimited[request.Method]; !ok {
jsonErr = rpcInvalidError("limited user not " +
"authorized for this method")
rpcsLog.Errorf("Failed to create reply: %v", err)
}
}
if jsonErr == nil {
// Attempt to parse the JSON-RPC request into a known
// concrete command.
parsedCmd := parseCmd(&request)
if parsedCmd.err != nil {
jsonErr = parsedCmd.err
} else {
result, jsonErr = s.standardCmdResult(parsedCmd,
closeChan)
if err == nil {
resp = s.processRequest(&req, isAdmin, closeChan)
}
if resp != nil {
results = append(results, resp)
}
}
// Process a batched request
if batchedRequest {
var batchedRequests []interface{}
var resp json.RawMessage
err = json.Unmarshal(body, &batchedRequests)
if err != nil {
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCParse.Code,
Message: fmt.Sprintf("Failed to parse request: %v",
err),
}
resp, err = dcrjson.MarshalResponse("2.0", nil, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to create reply: %v", err)
}
if resp != nil {
results = append(results, resp)
}
}
if err == nil {
// Response with an empty batch error if the batch size is zero
if len(batchedRequests) == 0 {
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCInvalidRequest.Code,
Message: fmt.Sprint("Invalid request: empty batch"),
}
resp, err = dcrjson.MarshalResponse("2.0", nil, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply: %v", err)
}
if resp != nil {
results = append(results, resp)
}
}
// Process each batch entry individually
if len(batchedRequests) > 0 {
batchSize = len(batchedRequests)
for _, entry := range batchedRequests {
var reqBytes []byte
reqBytes, err = json.Marshal(entry)
if err != nil {
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCInvalidRequest.Code,
Message: fmt.Sprintf("Invalid request: %v",
err),
}
resp, err = dcrjson.MarshalResponse("2.0", nil, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to create reply: %v", err)
}
if resp != nil {
results = append(results, resp)
}
continue
}
var req dcrjson.Request
err := json.Unmarshal(reqBytes, &req)
if err != nil {
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCInvalidRequest.Code,
Message: fmt.Sprintf("Invalid request: %v",
err),
}
resp, err = dcrjson.MarshalResponse("", nil, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to create reply: %v", err)
}
if resp != nil {
results = append(results, resp)
}
continue
}
resp = s.processRequest(&req, isAdmin, closeChan)
if resp != nil {
results = append(results, resp)
}
}
}
}
}
// Marshal the response.
msg, err := createMarshalledReply(responseID, result, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply: %v", err)
return
var msg = []byte{}
if batchedRequest && batchSize > 0 {
if len(results) > 0 {
// Form the batched response json
var buffer bytes.Buffer
buffer.WriteByte('[')
for idx, reply := range results {
if idx == len(results)-1 {
buffer.Write(reply)
buffer.WriteByte(']')
break
}
buffer.Write(reply)
buffer.WriteByte(',')
}
msg = buffer.Bytes()
}
}
if !batchedRequest || batchSize == 0 {
// Respond with the first results entry for single requests
if len(results) > 0 {
msg = results[0]
}
}
// Write the response.

View File

@ -752,7 +752,7 @@ func (m *wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*
ntfn.SubscribedTxs = subscribedTxs[quitChan]
// Marshal and queue notification.
marshalledJSON, err := dcrjson.MarshalCmd(nil, &ntfn)
marshalledJSON, err := dcrjson.MarshalCmd("1.0", nil, &ntfn)
if err != nil {
rpcsLog.Errorf("Failed to marshal block connected "+
"notification: %v", err)
@ -784,7 +784,7 @@ func (*wsNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]
ntfn := dcrjson.BlockDisconnectedNtfn{
Header: hex.EncodeToString(headerBytes),
}
marshalledJSON, err := dcrjson.MarshalCmd(nil, &ntfn)
marshalledJSON, err := dcrjson.MarshalCmd("1.0", nil, &ntfn)
if err != nil {
rpcsLog.Errorf("Failed to marshal block disconnected "+
"notification: %v", err)
@ -809,7 +809,7 @@ func (m *wsNotificationManager) notifyReorganization(clients map[chan struct{}]*
int32(rd.OldHeight),
rd.NewHash.String(),
int32(rd.NewHeight))
marshalledJSON, err := dcrjson.MarshalCmd(nil, ntfn)
marshalledJSON, err := dcrjson.MarshalCmd("1.0", nil, ntfn)
if err != nil {
rpcsLog.Errorf("Failed to marshal reorganization "+
"notification: %v", err)
@ -847,7 +847,7 @@ func (*wsNotificationManager) notifyWinningTickets(
ntfn := dcrjson.NewWinningTicketsNtfn(wtnd.BlockHash.String(),
int32(wtnd.BlockHeight), ticketMap)
marshalledJSON, err := dcrjson.MarshalCmd(nil, ntfn)
marshalledJSON, err := dcrjson.MarshalCmd("1.0", nil, ntfn)
if err != nil {
rpcsLog.Errorf("Failed to marshal winning tickets notification: "+
"%v", err)
@ -889,7 +889,7 @@ func (*wsNotificationManager) notifySpentAndMissedTickets(
ntfn := dcrjson.NewSpentAndMissedTicketsNtfn(tnd.Hash.String(),
int32(tnd.Height), tnd.StakeDifficulty, ticketMap)
marshalledJSON, err := dcrjson.MarshalCmd(nil, ntfn)
marshalledJSON, err := dcrjson.MarshalCmd("1.0", nil, ntfn)
if err != nil {
rpcsLog.Errorf("Failed to marshal spent and missed tickets "+
"notification: %v", err)
@ -940,7 +940,7 @@ func (*wsNotificationManager) notifyNewTickets(clients map[chan struct{}]*wsClie
ntfn := dcrjson.NewNewTicketsNtfn(tnd.Hash.String(), int32(tnd.Height),
tnd.StakeDifficulty, tickets)
marshalledJSON, err := dcrjson.MarshalCmd(nil, ntfn)
marshalledJSON, err := dcrjson.MarshalCmd("1.0", nil, ntfn)
if err != nil {
rpcsLog.Errorf("Failed to marshal new tickets notification: "+
"%v", err)
@ -962,7 +962,7 @@ func (*wsNotificationManager) notifyStakeDifficulty(
int32(sdnd.BlockHeight),
sdnd.StakeDifficulty)
marshalledJSON, err := dcrjson.MarshalCmd(nil, ntfn)
marshalledJSON, err := dcrjson.MarshalCmd("1.0", nil, ntfn)
if err != nil {
rpcsLog.Errorf("Failed to marshal stake difficulty notification: "+
"%v", err)
@ -998,7 +998,7 @@ func (m *wsNotificationManager) notifyForNewTx(clients map[chan struct{}]*wsClie
ntfn := dcrjson.NewTxAcceptedNtfn(txHashStr,
dcrutil.Amount(amount).ToCoin())
marshalledJSON, err := dcrjson.MarshalCmd(nil, ntfn)
marshalledJSON, err := dcrjson.MarshalCmd("1.0", nil, ntfn)
if err != nil {
rpcsLog.Errorf("Failed to marshal tx notification: %s",
err.Error())
@ -1022,7 +1022,7 @@ func (m *wsNotificationManager) notifyForNewTx(clients map[chan struct{}]*wsClie
}
verboseNtfn = dcrjson.NewTxAcceptedVerboseNtfn(*rawTx)
marshalledJSONVerbose, err = dcrjson.MarshalCmd(nil,
marshalledJSONVerbose, err = dcrjson.MarshalCmd("1.0", nil,
verboseNtfn)
if err != nil {
rpcsLog.Errorf("Failed to marshal verbose tx "+
@ -1102,7 +1102,7 @@ func (m *wsNotificationManager) notifyRelevantTxAccepted(tx *dcrutil.Tx,
if len(clientsToNotify) != 0 {
n := dcrjson.NewRelevantTxAcceptedNtfn(txHexString(msgTx))
marshalled, err := dcrjson.MarshalCmd(nil, n)
marshalled, err := dcrjson.MarshalCmd("1.0", nil, n)
if err != nil {
rpcsLog.Errorf("Failed to marshal notification: %v", err)
return
@ -1166,21 +1166,18 @@ type wsResponse struct {
doneChan chan bool
}
// wsClient provides an abstraction for handling a websocket client. The
// overall data flow is split into 3 main goroutines, a possible 4th goroutine
// for long-running operations (only started if request is made), and a
// websocket manager which is used to allow things such as broadcasting
// requested notifications to all connected websocket clients. Inbound
// messages are read via the inHandler goroutine and generally dispatched to
// their own handler. However, certain potentially long-running operations such
// as rescans, are sent to the asyncHander goroutine and are limited to one at a
// time. There are two outbound message types - one for responding to client
// requests and another for async notifications. Responses to client requests
// use SendMessage which employs a buffered channel thereby limiting the number
// of outstanding requests that can be made. Notifications are sent via
// QueueNotification which implements a queue via notificationQueueHandler to
// ensure sending notifications from other subsystems can't block. Ultimately,
// all messages are sent via the outHandler.
// wsClient provides an abstraction for handling a websocket client. The overall
// data flow is split into 3 main goroutines. A websocket manager is used to
// allow things such as broadcasting requested notifications to all connected
// websocket clients. Inbound messages are read via the inHandler goroutine and
// generally dispatched to their own handler. There are two outbound message
// types - one for responding to client requests and another for async
// notifications. Responses to client requests use SendMessage which employs a
// buffered channel thereby limiting the number of outstanding requests that can
// be made. Notifications are sent via QueueNotification which implements a
// queue via notificationQueueHandler to ensure sending notifications from other
// subsystems can't block. Ultimately, all messages are sent via the
// outHandler.
type wsClient struct {
sync.Mutex
@ -1247,137 +1244,436 @@ out:
break out
}
var request dcrjson.Request
err = json.Unmarshal(msg, &request)
if err != nil {
if !c.authenticated {
break out
}
var batchedRequest bool
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCParse.Code,
Message: "Failed to parse request: " + err.Error(),
}
reply, err := createMarshalledReply(nil, nil, jsonErr)
// Determine request type
if bytes.HasPrefix(msg, batchedRequestPrefix) {
batchedRequest = true
}
// Process a single request
if !batchedRequest {
var req dcrjson.Request
var reply json.RawMessage
err = json.Unmarshal(msg, &req)
if err != nil {
rpcsLog.Errorf("Failed to marshal parse failure "+
"reply: %v", err)
continue
}
c.SendMessage(reply, nil)
continue
}
// Requests with no ID (notifications) must not have a response per the
// JSON-RPC spec.
if request.ID == nil {
if !c.authenticated {
break out
}
continue
}
cmd := parseCmd(&request)
if cmd.err != nil {
if !c.authenticated {
break out
}
reply, err := createMarshalledReply(cmd.id, nil, cmd.err)
if err != nil {
rpcsLog.Errorf("Failed to marshal parse failure "+
"reply: %v", err)
continue
}
c.SendMessage(reply, nil)
continue
}
rpcsLog.Debugf("Received command <%s> from %s", cmd.method, c.addr)
// Check auth. The client is immediately disconnected if the
// first request of an unauthentiated websocket client is not
// the authenticate request, an authenticate request is received
// when the client is already authenticated, or incorrect
// authentication credentials are provided in the request.
switch authCmd, ok := cmd.cmd.(*dcrjson.AuthenticateCmd); {
case c.authenticated && ok:
rpcsLog.Warnf("Websocket client %s is already authenticated",
c.addr)
break out
case !c.authenticated && !ok:
rpcsLog.Warnf("Unauthenticated websocket message " +
"received")
break out
case !c.authenticated:
// Check credentials.
login := authCmd.Username + ":" + authCmd.Passphrase
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
authSha := sha256.Sum256([]byte(auth))
cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:])
limitcmp := subtle.ConstantTimeCompare(authSha[:], c.server.limitauthsha[:])
if cmp != 1 && limitcmp != 1 {
rpcsLog.Warnf("Auth failure.")
break out
}
c.authenticated = true
c.isAdmin = cmp == 1
// Marshal and send response.
reply, err := createMarshalledReply(cmd.id, nil, nil)
if err != nil {
rpcsLog.Errorf("Failed to marshal authenticate reply: "+
"%v", err.Error())
continue
}
c.SendMessage(reply, nil)
continue
}
// Check if the client is using limited RPC credentials and
// error when not authorized to call this RPC.
if !c.isAdmin {
if _, ok := rpcLimited[request.Method]; !ok {
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCInvalidParams.Code,
Message: "limited user not authorized for this method",
// only process requests from authenticated clients
if !c.authenticated {
break out
}
// Marshal and send response.
reply, err := createMarshalledReply(request.ID, nil, jsonErr)
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCParse.Code,
Message: "Failed to parse request: " + err.Error(),
}
reply, err = createMarshalledReply("1.0", nil, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal parse failure "+
"reply: %v", err)
rpcsLog.Errorf("Failed to marshal reply: %v", err)
continue
}
c.SendMessage(reply, nil)
continue
}
if req.Method == "" || req.Params == nil {
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCInvalidRequest.Code,
Message: fmt.Sprintf("Invalid request: malformed"),
}
reply, err := createMarshalledReply(req.Jsonrpc, req.ID, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply: %v", err)
continue
}
c.SendMessage(reply, nil)
continue
}
// Valid requests with no ID (notifications) must not have a response
// per the JSON-RPC spec.
if req.ID == nil {
if !c.authenticated {
break out
}
continue
}
cmd := parseCmd(&req)
if cmd.err != nil {
// Only process requests from authenticated clients
if !c.authenticated {
break out
}
reply, err = createMarshalledReply(cmd.jsonrpc, cmd.id, nil, cmd.err)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply: %v", err)
continue
}
c.SendMessage(reply, nil)
continue
}
rpcsLog.Debugf("Received command <%s> from %s", cmd.method, c.addr)
// Check auth. The client is immediately disconnected if the
// first request of an unauthentiated websocket client is not
// the authenticate request, an authenticate request is received
// when the client is already authenticated, or incorrect
// authentication credentials are provided in the request.
switch authCmd, ok := cmd.cmd.(*dcrjson.AuthenticateCmd); {
case c.authenticated && ok:
rpcsLog.Warnf("Websocket client %s is already authenticated",
c.addr)
break out
case !c.authenticated && !ok:
rpcsLog.Warnf("Unauthenticated websocket message " +
"received")
break out
case !c.authenticated:
// Check credentials.
login := authCmd.Username + ":" + authCmd.Passphrase
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
authSha := sha256.Sum256([]byte(auth))
cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:])
limitcmp := subtle.ConstantTimeCompare(authSha[:], c.server.limitauthsha[:])
if cmp != 1 && limitcmp != 1 {
rpcsLog.Warnf("Auth failure.")
break out
}
c.authenticated = true
c.isAdmin = cmp == 1
// Marshal and send response.
reply, err = createMarshalledReply(cmd.jsonrpc, cmd.id, nil, nil)
if err != nil {
rpcsLog.Errorf("Failed to marshal authenticate reply: "+
"%v", err.Error())
continue
}
c.SendMessage(reply, nil)
continue
}
// Check if the client is using limited RPC credentials and
// error when not authorized to call the supplied RPC.
if !c.isAdmin {
if _, ok := rpcLimited[req.Method]; !ok {
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCInvalidParams.Code,
Message: "limited user not authorized for this method",
}
// Marshal and send response.
reply, err = createMarshalledReply("", req.ID, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal parse failure "+
"reply: %v", err)
continue
}
c.SendMessage(reply, nil)
continue
}
}
// Asynchronously handle the request. A semaphore is used to
// limit the number of concurrent requests currently being
// serviced. If the semaphore can not be acquired, simply wait
// until a request finished before reading the next RPC request
// from the websocket client.
//
// This could be a little fancier by timing out and erroring
// when it takes too long to service the request, but if that is
// done, the read of the next request should not be blocked by
// this semaphore, otherwise the next request will be read and
// will probably sit here for another few seconds before timing
// out as well. This will cause the total timeout duration for
// later requests to be much longer than the check here would
// imply.
//
// If a timeout is added, the semaphore acquiring should be
// moved inside of the new goroutine with a select statement
// that also reads a time.After channel. This will unblock the
// read of the next request from the websocket client and allow
// many requests to be waited on concurrently.
c.serviceRequestSem.acquire()
go func() {
c.serviceRequest(cmd)
c.serviceRequestSem.release()
}()
}
// Asynchronously handle the request. A semaphore is used to
// limit the number of concurrent requests currently being
// serviced. If the semaphore can not be acquired, simply wait
// until a request finished before reading the next RPC request
// from the websocket client.
//
// This could be a little fancier by timing out and erroring
// when it takes too long to service the request, but if that is
// done, the read of the next request should not be blocked by
// this semaphore, otherwise the next request will be read and
// will probably sit here for another few seconds before timing
// out as well. This will cause the total timeout duration for
// later requests to be much longer than the check here would
// imply.
//
// If a timeout is added, the semaphore acquiring should be
// moved inside of the new goroutine with a select statement
// that also reads a time.After channel. This will unblock the
// read of the next request from the websocket client and allow
// many requests to be waited on concurrently.
c.serviceRequestSem.acquire()
go func() {
c.serviceRequest(cmd)
// Process a batched request
if batchedRequest {
var batchedRequests []interface{}
var results []json.RawMessage
var batchSize int
var reply json.RawMessage
c.serviceRequestSem.acquire()
err = json.Unmarshal(msg, &batchedRequests)
if err != nil {
// Only process requests from authenticated clients
if !c.authenticated {
break out
}
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCParse.Code,
Message: fmt.Sprintf("Failed to parse request: %v",
err),
}
reply, err = dcrjson.MarshalResponse("2.0", nil, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to create reply: %v", err)
}
if reply != nil {
results = append(results, reply)
}
}
if err == nil {
// Response with an empty batch error if the batch size is zero
if len(batchedRequests) == 0 {
if !c.authenticated {
break out
}
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCInvalidRequest.Code,
Message: fmt.Sprint("Invalid request: empty batch"),
}
reply, err = dcrjson.MarshalResponse("2.0", nil, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply: %v", err)
}
if reply != nil {
results = append(results, reply)
}
}
// Process each batch entry individually
if len(batchedRequests) > 0 {
batchSize = len(batchedRequests)
for _, entry := range batchedRequests {
var reqBytes []byte
reqBytes, err = json.Marshal(entry)
if err != nil {
// Only process requests from authenticated clients
if !c.authenticated {
break out
}
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCInvalidRequest.Code,
Message: fmt.Sprintf("Invalid request: %v",
err),
}
reply, err = dcrjson.MarshalResponse("2.0", nil, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to create reply: %v", err)
continue
}
if reply != nil {
results = append(results, reply)
}
continue
}
var req dcrjson.Request
err := json.Unmarshal(reqBytes, &req)
if err != nil {
// Only process requests from authenticated clients
if !c.authenticated {
break out
}
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCInvalidRequest.Code,
Message: fmt.Sprintf("Invalid request: %v",
err),
}
reply, err = dcrjson.MarshalResponse("2.0", nil, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to create reply: %v", err)
continue
}
if reply != nil {
results = append(results, reply)
}
continue
}
if req.Method == "" || req.Params == nil {
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCInvalidRequest.Code,
Message: fmt.Sprintf("Invalid request: malformed"),
}
reply, err := createMarshalledReply(req.Jsonrpc, req.ID, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply: %v", err)
continue
}
if reply != nil {
results = append(results, reply)
}
continue
}
// Valid requests with no ID (notifications) must not have a response
// per the JSON-RPC spec.
if req.ID == nil {
if !c.authenticated {
break out
}
continue
}
cmd := parseCmd(&req)
if cmd.err != nil {
// Only process requests from authenticated clients
if !c.authenticated {
break out
}
reply, err = createMarshalledReply(cmd.jsonrpc, cmd.id, nil, cmd.err)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply: %v", err)
continue
}
if reply != nil {
results = append(results, reply)
}
continue
}
rpcsLog.Debugf("Received command <%s> from %s", cmd.method, c.addr)
// Check auth. The client is immediately disconnected if the
// first request of an unauthentiated websocket client is not
// the authenticate request, an authenticate request is received
// when the client is already authenticated, or incorrect
// authentication credentials are provided in the request.
switch authCmd, ok := cmd.cmd.(*dcrjson.AuthenticateCmd); {
case c.authenticated && ok:
rpcsLog.Warnf("Websocket client %s is already authenticated",
c.addr)
break out
case !c.authenticated && !ok:
rpcsLog.Warnf("Unauthenticated websocket message " +
"received")
break out
case !c.authenticated:
// Check credentials.
login := authCmd.Username + ":" + authCmd.Passphrase
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
authSha := sha256.Sum256([]byte(auth))
cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:])
limitcmp := subtle.ConstantTimeCompare(authSha[:], c.server.limitauthsha[:])
if cmp != 1 && limitcmp != 1 {
rpcsLog.Warnf("Auth failure.")
break out
}
c.authenticated = true
c.isAdmin = cmp == 1
// Marshal and send response.
reply, err = createMarshalledReply(cmd.jsonrpc, cmd.id, nil, nil)
if err != nil {
rpcsLog.Errorf("Failed to marshal authenticate reply: "+
"%v", err.Error())
continue
}
if reply != nil {
results = append(results, reply)
}
continue
}
// Check if the client is using limited RPC credentials and
// error when not authorized to call the supplied RPC.
if !c.isAdmin {
if _, ok := rpcLimited[req.Method]; !ok {
jsonErr := &dcrjson.RPCError{
Code: dcrjson.ErrRPCInvalidParams.Code,
Message: "limited user not authorized for this method",
}
// Marshal and send response.
reply, err = createMarshalledReply(req.Jsonrpc, req.ID, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal parse failure "+
"reply: %v", err)
continue
}
if reply != nil {
results = append(results, reply)
}
continue
}
}
// Lookup the websocket extension for the command, if it doesn't
// exist fallback to handling the command as a standard command.
var resp interface{}
wsHandler, ok := wsHandlers[cmd.method]
if ok {
resp, err = wsHandler(c, cmd.cmd)
} else {
resp, err = c.server.standardCmdResult(cmd, nil)
}
// Marshal request output.
reply, err := createMarshalledReply(cmd.jsonrpc, cmd.id, resp, err)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply for <%s> "+
"command: %v", cmd.method, err)
return
}
if reply != nil {
results = append(results, reply)
}
}
}
}
// generate reply
var payload = []byte{}
if batchedRequest && batchSize > 0 {
if len(results) > 0 {
// Form the batched response json
var buffer bytes.Buffer
buffer.WriteByte('[')
for idx, marshalledReply := range results {
if idx == len(results)-1 {
buffer.Write(marshalledReply)
buffer.WriteByte(']')
break
}
buffer.Write(marshalledReply)
buffer.WriteByte(',')
}
payload = buffer.Bytes()
}
}
if !batchedRequest || batchSize == 0 {
// Respond with the first results entry for single requests
if len(results) > 0 {
payload = results[0]
}
}
c.SendMessage(payload, nil)
c.serviceRequestSem.release()
}()
}
}
// Ensure the connection is closed.
@ -1403,12 +1699,13 @@ func (c *wsClient) serviceRequest(r *parsedRPCCmd) {
} else {
result, err = c.server.standardCmdResult(r, nil)
}
reply, err := createMarshalledReply(r.id, result, err)
reply, err := createMarshalledReply(r.jsonrpc, r.id, result, err)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply for <%s> "+
"command: %v", r.method, err)
return
}
c.SendMessage(reply, nil)
}