add consumer requests

This commit is contained in:
Don Cote 2019-09-16 12:19:56 -04:00
parent e5a9b7fa61
commit 6113f74d2d
6 changed files with 199 additions and 33 deletions

View File

@ -25,18 +25,3 @@ func NewClient(config Config) (Client, error) {
return c, nil
}
// GetNextRecord returns the topic's next record. Will return nil without an error when there are no more records.
func (c Client) GetNextRecord() (*Record, error) {
return getNextRecord(c)
}
// CompleteRecord allows the record to be marked as completed
func (c Client) CompleteRecord(r Record) error {
return r.updateRecordState(c, "completed")
}
// FailRecord allows the record to be marked as failed
func (c Client) FailRecord(r Record) error {
return r.updateRecordState(c, "failed")
}

View File

@ -1,22 +1,78 @@
package databridge
import (
"fmt"
"os"
"testing"
"github.com/stretchr/testify/require"
)
func TestClient_NewClient(t *testing.T) {
config := Config{APIKey: "api-key", TopicSlug: "my-topic-slug", ConsumerID: "consumer-id"}
client, err := NewClient(config)
func TestClient_GetUnreadCount(t *testing.T) {
client := getClient(t)
c, err := client.GetUnreadCount()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
require.Equal(t, "api-key", client.APIKey)
require.Equal(t, "consumer-id", client.ConsumerID)
require.Equal(t, "my-topic-slug", client.TopicSlug)
require.Equal(t, "https://data-bridge.flipsidecrypto.com/api/v1", client.BaseURL)
if c == nil {
t.Fatal("count is nil")
}
fmt.Fprintln(os.Stdout, "GetUnreadCount")
fmt.Fprintln(os.Stdout, *c)
}
func TestClient_GetNextRecord(t *testing.T) {
client := getClient(t)
r, err := client.GetNextRecord()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if r == nil {
t.Fatal("Result is nil")
}
record := string(*r)
fmt.Fprintln(os.Stdout, "GetNextRecord")
fmt.Fprintln(os.Stdout, record)
}
func TestClient_GetRegisteredConsumers(t *testing.T) {
client := getClient(t)
consumers, err := client.GetRegisteredConsumers()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fmt.Fprintln(os.Stdout, "GetRegisteredConsumers")
fmt.Fprintln(os.Stdout, consumers)
}
func TestClient_GetAvailableConsumers(t *testing.T) {
client := getClient(t)
consumers, err := client.GetAvailableConsumers()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fmt.Fprintln(os.Stdout, "GetConsumers")
fmt.Fprintln(os.Stdout, consumers)
}
func TestClient_RegisterConsumer(t *testing.T) {
client := getClient(t)
consumer, err := client.RegisterConsumer()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fmt.Fprintln(os.Stdout, "RegisterConsumer")
fmt.Fprintln(os.Stdout, consumer)
}
func getClient(t *testing.T) Client {
config := Config{APIKey: "15d4fbbc-d12e-4b08-a4d6-b55b92200649", TopicSlug: "dev-alert-fcas-events", ConsumerID: "84d3cba6-8be8-4848-b759-59093f2e74e6"}
client, err := NewClient(config)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
return client
}

106
consumer.go Normal file
View File

@ -0,0 +1,106 @@
package databridge
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"github.com/pkg/errors"
)
// Consumer are processes that you run to iterate over the records stored in a topic. In order to retrieve records to process, you must register a Consumer with the Data Bridge.
// Upon registration, the Data Bridge will generate a "consumer_id". You must include this "consumer_id" when ever requesting to view a topic-level record.
// Topics are a lot like append-only lists. As you traverse a list you need to keep track of your index in the list. The Data Bridge conveniently utilizes your "consumer_id" to keep track of what topic level records you've seen and have not seen.
// It is possible to register multiple consumers, per account, if you would like to process topic records in a parralell fashion.
type Consumer struct {
ID string `json:"id"`
}
// GetRegisteredConsumers get a list of consumers that have registered with the data bridge under this api key
func (c Client) GetRegisteredConsumers() (*[]Consumer, error) {
url := fmt.Sprintf("%s/consumers?api_key=%s", c.BaseURL, c.APIKey)
req, _ := http.NewRequest("GET", url, nil)
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
if res.StatusCode != 200 {
return nil, errors.New(fmt.Sprintf("databridge responded with non-200 for %s", url))
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
var consumers []Consumer
err = json.Unmarshal([]byte(body), &consumers)
if err != nil {
return nil, err
}
return &consumers, nil
}
// GetAvailableConsumers returns the consumers associated with this api key that are not assigned to a record.
func (c Client) GetAvailableConsumers() (*[]Consumer, error) {
url := fmt.Sprintf("%s/consumers/available?api_key=%s", c.BaseURL, c.APIKey)
req, _ := http.NewRequest("GET", url, nil)
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
if res.StatusCode != 200 {
return nil, errors.New(fmt.Sprintf("databridge responded with non-200 for %s", url))
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
var consumers []Consumer
err = json.Unmarshal([]byte(body), &consumers)
if err != nil {
return nil, err
}
return &consumers, nil
}
// RegisterConsumer registers a consumer with the Data Bridge to use when consuming topic-level records.
func (c Client) RegisterConsumer() (*Consumer, error) {
url := fmt.Sprintf("%s/consumers?api_key=%s", c.BaseURL, c.APIKey)
req, _ := http.NewRequest("POST", url, nil)
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
if res.StatusCode != 200 {
return nil, errors.New(fmt.Sprintf("databridge responded with non-200 for %s", url))
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
var consumer Consumer
err = json.Unmarshal([]byte(body), &consumer)
if err != nil {
return nil, err
}
return &consumer, nil
}

1
go.mod
View File

@ -3,6 +3,7 @@ module github.com/FlipsideCrypto/go-data-bridge-client
go 1.12
require (
github.com/pkg/errors v0.8.1
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.4.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect

2
go.sum
View File

@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

View File

@ -5,6 +5,8 @@ import (
"fmt"
"io/ioutil"
"net/http"
"github.com/pkg/errors"
)
// Record represents a Data Bridge data record
@ -13,7 +15,9 @@ type Record struct {
Data []map[string]interface{} `json:"data"`
}
func getUnreadCount(c Client) (*int32, error) {
// GetUnreadCount returns the count of unread records in the given topic in the context of the api key
func (c Client) GetUnreadCount() (*int32, error) {
url := fmt.Sprintf("%s/topics/%s?api_key=%s", c.BaseURL, c.TopicSlug, c.APIKey)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Add("content-type", "application/json")
@ -35,7 +39,7 @@ func getUnreadCount(c Client) (*int32, error) {
}
var js struct {
Count int32 `json:"unread_count"`
Count int32 `json:"unread_records"`
}
err = json.Unmarshal([]byte(body), &js)
@ -46,8 +50,9 @@ func getUnreadCount(c Client) (*int32, error) {
return &js.Count, nil
}
func getNextRecord(c Client) (*Record, error) {
count, err := getUnreadCount(c)
// GetNextRecord returns the topic's next record. Will return nil without an error when there are no more records.
func (c Client) GetNextRecord() (*json.RawMessage, error) {
count, err := c.GetUnreadCount()
if err != nil {
return nil, err
} else if *count == 0 {
@ -74,13 +79,24 @@ func getNextRecord(c Client) (*Record, error) {
return nil, err
}
var record Record
err = json.Unmarshal([]byte(body), &record)
var result json.RawMessage
err = json.Unmarshal([]byte(body), &result)
if err != nil {
return nil, err
}
return &record, nil
return &result, nil
}
// CompleteRecord allows the record to be marked as completed
func (c Client) CompleteRecord(r Record) error {
return r.updateRecordState(c, "completed")
}
// FailRecord allows the record to be marked as failed
func (c Client) FailRecord(r Record) error {
return r.updateRecordState(c, "failed")
}
func (r Record) updateRecordState(c Client, state string) error {