mirror of
https://github.com/FlipsideCrypto/go-data-bridge-client.git
synced 2026-02-06 10:48:12 +00:00
Merge pull request #5 from FlipsideCrypto/check-for-unread-count
add consumer requests
This commit is contained in:
commit
1aeecc0799
15
client.go
15
client.go
@ -25,18 +25,3 @@ func NewClient(config Config) (Client, error) {
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// GetNextRecord returns the topic's next record. Will return nil without an error when there are no more records.
|
||||
func (c Client) GetNextRecord() (*Record, error) {
|
||||
return getNextRecord(c)
|
||||
}
|
||||
|
||||
// CompleteRecord allows the record to be marked as completed
|
||||
func (c Client) CompleteRecord(r Record) error {
|
||||
return r.updateRecordState(c, "completed")
|
||||
}
|
||||
|
||||
// FailRecord allows the record to be marked as failed
|
||||
func (c Client) FailRecord(r Record) error {
|
||||
return r.updateRecordState(c, "failed")
|
||||
}
|
||||
|
||||
@ -1,22 +1,78 @@
|
||||
package databridge
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestClient_NewClient(t *testing.T) {
|
||||
|
||||
config := Config{APIKey: "api-key", TopicSlug: "my-topic-slug", ConsumerID: "consumer-id"}
|
||||
client, err := NewClient(config)
|
||||
|
||||
func TestClient_GetUnreadCount(t *testing.T) {
|
||||
client := getClient(t)
|
||||
c, err := client.GetUnreadCount()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if c == nil {
|
||||
t.Fatal("count is nil")
|
||||
}
|
||||
|
||||
require.Equal(t, "api-key", client.APIKey)
|
||||
require.Equal(t, "consumer-id", client.ConsumerID)
|
||||
require.Equal(t, "my-topic-slug", client.TopicSlug)
|
||||
require.Equal(t, "https://data-bridge.flipsidecrypto.com/api/v1", client.BaseURL)
|
||||
fmt.Fprintln(os.Stdout, "GetUnreadCount")
|
||||
fmt.Fprintln(os.Stdout, *c)
|
||||
}
|
||||
|
||||
func TestClient_GetNextRecord(t *testing.T) {
|
||||
client := getClient(t)
|
||||
r, err := client.GetNextRecord()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if r == nil {
|
||||
t.Fatal("Result is nil")
|
||||
}
|
||||
|
||||
record := string(*r)
|
||||
fmt.Fprintln(os.Stdout, "GetNextRecord")
|
||||
fmt.Fprintln(os.Stdout, record)
|
||||
}
|
||||
|
||||
func TestClient_GetRegisteredConsumers(t *testing.T) {
|
||||
client := getClient(t)
|
||||
|
||||
consumers, err := client.GetRegisteredConsumers()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
fmt.Fprintln(os.Stdout, "GetRegisteredConsumers")
|
||||
fmt.Fprintln(os.Stdout, consumers)
|
||||
}
|
||||
|
||||
func TestClient_GetAvailableConsumers(t *testing.T) {
|
||||
client := getClient(t)
|
||||
|
||||
consumers, err := client.GetAvailableConsumers()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
fmt.Fprintln(os.Stdout, "GetConsumers")
|
||||
fmt.Fprintln(os.Stdout, consumers)
|
||||
}
|
||||
|
||||
func TestClient_RegisterConsumer(t *testing.T) {
|
||||
client := getClient(t)
|
||||
|
||||
consumer, err := client.RegisterConsumer()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
fmt.Fprintln(os.Stdout, "RegisterConsumer")
|
||||
fmt.Fprintln(os.Stdout, consumer)
|
||||
}
|
||||
|
||||
func getClient(t *testing.T) Client {
|
||||
config := Config{APIKey: "15d4fbbc-d12e-4b08-a4d6-b55b92200649", TopicSlug: "dev-alert-fcas-events", ConsumerID: "84d3cba6-8be8-4848-b759-59093f2e74e6"}
|
||||
client, err := NewClient(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
106
consumer.go
Normal file
106
consumer.go
Normal file
@ -0,0 +1,106 @@
|
||||
package databridge
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// Consumer are processes that you run to iterate over the records stored in a topic. In order to retrieve records to process, you must register a Consumer with the Data Bridge.
|
||||
// Upon registration, the Data Bridge will generate a "consumer_id". You must include this "consumer_id" when ever requesting to view a topic-level record.
|
||||
// Topics are a lot like append-only lists. As you traverse a list you need to keep track of your index in the list. The Data Bridge conveniently utilizes your "consumer_id" to keep track of what topic level records you've seen and have not seen.
|
||||
// It is possible to register multiple consumers, per account, if you would like to process topic records in a parralell fashion.
|
||||
type Consumer struct {
|
||||
ID string `json:"id"`
|
||||
}
|
||||
|
||||
// GetRegisteredConsumers get a list of consumers that have registered with the data bridge under this api key
|
||||
func (c Client) GetRegisteredConsumers() (*[]Consumer, error) {
|
||||
url := fmt.Sprintf("%s/consumers?api_key=%s", c.BaseURL, c.APIKey)
|
||||
req, _ := http.NewRequest("GET", url, nil)
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if res.StatusCode != 200 {
|
||||
return nil, errors.New(fmt.Sprintf("databridge responded with non-200 for %s", url))
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
body, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var consumers []Consumer
|
||||
|
||||
err = json.Unmarshal([]byte(body), &consumers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &consumers, nil
|
||||
}
|
||||
|
||||
// GetAvailableConsumers returns the consumers associated with this api key that are not assigned to a record.
|
||||
func (c Client) GetAvailableConsumers() (*[]Consumer, error) {
|
||||
url := fmt.Sprintf("%s/consumers/available?api_key=%s", c.BaseURL, c.APIKey)
|
||||
req, _ := http.NewRequest("GET", url, nil)
|
||||
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if res.StatusCode != 200 {
|
||||
return nil, errors.New(fmt.Sprintf("databridge responded with non-200 for %s", url))
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
body, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var consumers []Consumer
|
||||
|
||||
err = json.Unmarshal([]byte(body), &consumers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &consumers, nil
|
||||
}
|
||||
|
||||
// RegisterConsumer registers a consumer with the Data Bridge to use when consuming topic-level records.
|
||||
func (c Client) RegisterConsumer() (*Consumer, error) {
|
||||
url := fmt.Sprintf("%s/consumers?api_key=%s", c.BaseURL, c.APIKey)
|
||||
req, _ := http.NewRequest("POST", url, nil)
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if res.StatusCode != 200 {
|
||||
return nil, errors.New(fmt.Sprintf("databridge responded with non-200 for %s", url))
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
body, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var consumer Consumer
|
||||
|
||||
err = json.Unmarshal([]byte(body), &consumer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &consumer, nil
|
||||
}
|
||||
28
record.go
28
record.go
@ -15,7 +15,9 @@ type Record struct {
|
||||
Data []map[string]interface{} `json:"data"`
|
||||
}
|
||||
|
||||
func getUnreadCount(c Client) (*int32, error) {
|
||||
// GetUnreadCount returns the count of unread records in the given topic in the context of the api key
|
||||
func (c Client) GetUnreadCount() (*int32, error) {
|
||||
|
||||
url := fmt.Sprintf("%s/topics/%s?api_key=%s", c.BaseURL, c.TopicSlug, c.APIKey)
|
||||
req, _ := http.NewRequest("GET", url, nil)
|
||||
req.Header.Add("content-type", "application/json")
|
||||
@ -37,7 +39,7 @@ func getUnreadCount(c Client) (*int32, error) {
|
||||
}
|
||||
|
||||
var js struct {
|
||||
Count int32 `json:"unread_count"`
|
||||
Count int32 `json:"unread_records"`
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(body), &js)
|
||||
@ -48,8 +50,9 @@ func getUnreadCount(c Client) (*int32, error) {
|
||||
return &js.Count, nil
|
||||
}
|
||||
|
||||
func getNextRecord(c Client) (*Record, error) {
|
||||
count, err := getUnreadCount(c)
|
||||
// GetNextRecord returns the topic's next record. Will return nil without an error when there are no more records.
|
||||
func (c Client) GetNextRecord() (*json.RawMessage, error) {
|
||||
count, err := c.GetUnreadCount()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if *count == 0 {
|
||||
@ -76,13 +79,24 @@ func getNextRecord(c Client) (*Record, error) {
|
||||
return nil, errors.Wrap(err, "error trying to read databridge response body")
|
||||
}
|
||||
|
||||
var record Record
|
||||
err = json.Unmarshal([]byte(body), &record)
|
||||
var result json.RawMessage
|
||||
|
||||
err = json.Unmarshal([]byte(body), &result)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error trying to unmarshal response body to json")
|
||||
}
|
||||
|
||||
return &record, nil
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// CompleteRecord allows the record to be marked as completed
|
||||
func (c Client) CompleteRecord(r Record) error {
|
||||
return r.updateRecordState(c, "completed")
|
||||
}
|
||||
|
||||
// FailRecord allows the record to be marked as failed
|
||||
func (c Client) FailRecord(r Record) error {
|
||||
return r.updateRecordState(c, "failed")
|
||||
}
|
||||
|
||||
func (r Record) updateRecordState(c Client, state string) error {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user