mirror of
https://github.com/FlipsideCrypto/convox.git
synced 2026-02-06 10:56:56 +00:00
gcp: use elastic for logs due to draconian rate limiting on stackdriver (#96)
This commit is contained in:
parent
12ded08b3d
commit
a0f93554f3
163
pkg/elastic/elastic.go
Normal file
163
pkg/elastic/elastic.go
Normal file
@ -0,0 +1,163 @@
|
||||
package elastic
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/convox/convox/pkg/common"
|
||||
"github.com/convox/convox/pkg/structs"
|
||||
"github.com/elastic/go-elasticsearch/v6"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
client *elasticsearch.Client
|
||||
}
|
||||
|
||||
type result struct {
|
||||
Hits struct {
|
||||
Hits []struct {
|
||||
Index string `json:"_index"`
|
||||
Source struct {
|
||||
Log string
|
||||
Stream string
|
||||
Timestamp time.Time `json:"@timestamp"`
|
||||
} `json:"_source"`
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func New(url string) (*Client, error) {
|
||||
ec, err := elasticsearch.NewClient(elasticsearch.Config{
|
||||
Addresses: []string{url},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := &Client{
|
||||
client: ec,
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *Client) Stream(ctx context.Context, w io.WriteCloser, index string, opts structs.LogsOptions) {
|
||||
defer w.Close()
|
||||
|
||||
follow := common.DefaultBool(opts.Follow, true)
|
||||
now := time.Now().UTC()
|
||||
since := time.Time{}
|
||||
|
||||
if opts.Since != nil {
|
||||
since = time.Now().UTC().Add(*opts.Since * -1)
|
||||
}
|
||||
|
||||
for {
|
||||
// check for closed writer
|
||||
if _, err := w.Write([]byte{}); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
timestamp := map[string]interface{}{
|
||||
"gt": since.UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
if !follow {
|
||||
timestamp["lt"] = now.Format(time.RFC3339)
|
||||
}
|
||||
|
||||
body := map[string]interface{}{
|
||||
"query": map[string]interface{}{
|
||||
"range": map[string]interface{}{
|
||||
"@timestamp": timestamp,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
data, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
fmt.Printf("err: %+v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
res, err := c.client.Search(
|
||||
c.client.Search.WithIndex(index),
|
||||
c.client.Search.WithSize(5000),
|
||||
c.client.Search.WithBody(bytes.NewReader(data)),
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Fprintf(w, "error: %v\n", err)
|
||||
return
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
data, err = ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
fmt.Fprintf(w, "error: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
var sres result
|
||||
|
||||
if err := json.Unmarshal(data, &sres); err != nil {
|
||||
fmt.Fprintf(w, "error: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
sort.Slice(sres.Hits.Hits, func(i, j int) bool {
|
||||
return sres.Hits.Hits[i].Source.Timestamp.Before(sres.Hits.Hits[j].Source.Timestamp)
|
||||
})
|
||||
|
||||
if len(sres.Hits.Hits) == 0 && !follow {
|
||||
return
|
||||
}
|
||||
|
||||
for _, log := range sres.Hits.Hits {
|
||||
prefix := ""
|
||||
|
||||
if common.DefaultBool(opts.Prefix, false) {
|
||||
prefix = fmt.Sprintf("%s %s ", log.Source.Timestamp.Format(time.RFC3339), strings.ReplaceAll(log.Source.Stream, ".", "/"))
|
||||
}
|
||||
|
||||
fmt.Fprintf(w, "%s%s", prefix, log.Source.Log)
|
||||
|
||||
since = log.Source.Timestamp
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Write(index string, ts time.Time, message string, tags map[string]string) error {
|
||||
body := map[string]interface{}{
|
||||
"log": fmt.Sprintf("%s\n", message),
|
||||
"@timestamp": ts.Format(time.RFC3339Nano),
|
||||
}
|
||||
|
||||
for k, v := range tags {
|
||||
body[k] = v
|
||||
}
|
||||
|
||||
data, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := c.client.Index(index, bytes.NewReader(data)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -11,9 +11,9 @@ import (
|
||||
"github.com/Azure/azure-storage-file-go/azfile"
|
||||
"github.com/Azure/go-autorest/autorest"
|
||||
"github.com/Azure/go-autorest/autorest/azure/auth"
|
||||
"github.com/convox/convox/pkg/elastic"
|
||||
"github.com/convox/convox/pkg/structs"
|
||||
"github.com/convox/convox/provider/k8s"
|
||||
"github.com/elastic/go-elasticsearch/v6"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
)
|
||||
|
||||
@ -30,7 +30,7 @@ type Provider struct {
|
||||
Subscription string
|
||||
Workspace string
|
||||
|
||||
elastic *elasticsearch.Client
|
||||
elastic *elastic.Client
|
||||
insightLogs *operationalinsights.QueryClient
|
||||
storageDirectory *azfile.DirectoryURL
|
||||
}
|
||||
@ -80,12 +80,12 @@ func (p *Provider) WithContext(ctx context.Context) structs.Provider {
|
||||
}
|
||||
|
||||
func (p *Provider) initializeAzureServices() error {
|
||||
es, err := elasticsearch.NewDefaultClient()
|
||||
ec, err := elastic.New(os.Getenv("ELASTIC_URL"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.elastic = es
|
||||
p.elastic = ec
|
||||
|
||||
il, err := p.azureInsightLogs()
|
||||
if err != nil {
|
||||
|
||||
@ -1,51 +1,22 @@
|
||||
package azure
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/convox/convox/pkg/common"
|
||||
"github.com/convox/convox/pkg/structs"
|
||||
)
|
||||
|
||||
var sequenceTokens sync.Map
|
||||
|
||||
type elasticSearchResult struct {
|
||||
Hits struct {
|
||||
Hits []struct {
|
||||
Index string `json:"_index"`
|
||||
Source struct {
|
||||
Log string
|
||||
Stream string
|
||||
Timestamp time.Time `json:"@timestamp"`
|
||||
} `json:"_source"`
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) Log(app, stream string, ts time.Time, message string) error {
|
||||
index := fmt.Sprintf("convox.%s.%s", p.Name, app)
|
||||
|
||||
body := map[string]interface{}{
|
||||
"log": fmt.Sprintf("%s\n", message),
|
||||
"stream": stream,
|
||||
"@timestamp": ts.Format(time.RFC3339Nano),
|
||||
tags := map[string]string{
|
||||
"app": app,
|
||||
"stream": stream,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := p.elastic.Index(index, bytes.NewReader(data)); err != nil {
|
||||
if err := p.elastic.Write(index, ts, message, tags); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -55,7 +26,7 @@ func (p *Provider) Log(app, stream string, ts time.Time, message string) error {
|
||||
func (p *Provider) AppLogs(name string, opts structs.LogsOptions) (io.ReadCloser, error) {
|
||||
r, w := io.Pipe()
|
||||
|
||||
go p.streamLogs(p.Context(), w, fmt.Sprintf("convox.%s.%s", p.Name, name), opts)
|
||||
go p.elastic.Stream(p.Context(), w, fmt.Sprintf("convox.%s.%s", p.Name, name), opts)
|
||||
|
||||
return r, nil
|
||||
}
|
||||
@ -63,215 +34,3 @@ func (p *Provider) AppLogs(name string, opts structs.LogsOptions) (io.ReadCloser
|
||||
func (p *Provider) SystemLogs(opts structs.LogsOptions) (io.ReadCloser, error) {
|
||||
return p.AppLogs("system", opts)
|
||||
}
|
||||
|
||||
func (p *Provider) streamLogs(ctx context.Context, w io.WriteCloser, index string, opts structs.LogsOptions) {
|
||||
defer w.Close()
|
||||
|
||||
follow := common.DefaultBool(opts.Follow, true)
|
||||
now := time.Now().UTC()
|
||||
since := time.Time{}
|
||||
|
||||
if opts.Since != nil {
|
||||
since = time.Now().UTC().Add(*opts.Since * -1)
|
||||
}
|
||||
|
||||
for {
|
||||
// check for closed writer
|
||||
if _, err := w.Write([]byte{}); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
timestamp := map[string]interface{}{
|
||||
"gt": since.UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
if !follow {
|
||||
timestamp["lt"] = now.Format(time.RFC3339)
|
||||
}
|
||||
|
||||
body := map[string]interface{}{
|
||||
"query": map[string]interface{}{
|
||||
"range": map[string]interface{}{
|
||||
"@timestamp": timestamp,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
data, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
fmt.Printf("err: %+v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
res, err := p.elastic.Search(
|
||||
p.elastic.Search.WithIndex(index),
|
||||
p.elastic.Search.WithSize(5000),
|
||||
p.elastic.Search.WithBody(bytes.NewReader(data)),
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Printf("err: %+v\n", err)
|
||||
return
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
data, err = ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
fmt.Printf("err: %+v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
var sres elasticSearchResult
|
||||
|
||||
if err := json.Unmarshal(data, &sres); err != nil {
|
||||
fmt.Printf("err: %+v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
sort.Slice(sres.Hits.Hits, func(i, j int) bool {
|
||||
return sres.Hits.Hits[i].Source.Timestamp.Before(sres.Hits.Hits[j].Source.Timestamp)
|
||||
})
|
||||
|
||||
if len(sres.Hits.Hits) == 0 && !follow {
|
||||
return
|
||||
}
|
||||
|
||||
for _, log := range sres.Hits.Hits {
|
||||
prefix := ""
|
||||
|
||||
if common.DefaultBool(opts.Prefix, false) {
|
||||
prefix = fmt.Sprintf("%s %s ", log.Source.Timestamp.Format(time.RFC3339), strings.ReplaceAll(log.Source.Stream, ".", "/"))
|
||||
}
|
||||
|
||||
fmt.Fprintf(w, "%s%s", prefix, log.Source.Log)
|
||||
|
||||
since = log.Source.Timestamp
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// import (
|
||||
// "context"
|
||||
// "encoding/json"
|
||||
// "fmt"
|
||||
// "io"
|
||||
// "strings"
|
||||
// "time"
|
||||
|
||||
// "github.com/Azure/azure-sdk-for-go/services/operationalinsights/v1/operationalinsights"
|
||||
// "github.com/convox/convox/pkg/common"
|
||||
// "github.com/convox/convox/pkg/options"
|
||||
// "github.com/convox/convox/pkg/structs"
|
||||
// )
|
||||
|
||||
// // var sequenceTokens sync.Map
|
||||
|
||||
// func (p *Provider) Log(app, stream string, ts time.Time, message string) error {
|
||||
// return nil
|
||||
// }
|
||||
|
||||
// func (p *Provider) AppLogs(name string, opts structs.LogsOptions) (io.ReadCloser, error) {
|
||||
// r, w := io.Pipe()
|
||||
|
||||
// go p.insightContainerLogs(p.Context(), w, p.AppNamespace(name), opts)
|
||||
|
||||
// return r, nil
|
||||
// }
|
||||
|
||||
// func (p *Provider) SystemLogs(opts structs.LogsOptions) (io.ReadCloser, error) {
|
||||
// return p.AppLogs("system", opts)
|
||||
// }
|
||||
|
||||
// func (p *Provider) insightContainerLogs(ctx context.Context, w io.WriteCloser, namespace string, opts structs.LogsOptions) {
|
||||
// defer w.Close()
|
||||
|
||||
// since := common.DefaultDuration(opts.Since, 0)
|
||||
// start := time.Now().Add(-1 * since)
|
||||
|
||||
// for {
|
||||
// select {
|
||||
// case <-ctx.Done():
|
||||
// return
|
||||
// default:
|
||||
// // check for closed writer
|
||||
// if _, err := w.Write([]byte{}); err != nil {
|
||||
// return
|
||||
// }
|
||||
|
||||
// query := operationalinsights.QueryBody{
|
||||
// Query: options.String(fmt.Sprintf("KubePodInventory | join kind=innerunique ContainerLog on ContainerID | project Timestamp=TimeGenerated1,Message=LogEntry,Namespace,Pod=Name,Labels=PodLabel | where Namespace==%q and Timestamp > datetime(%s) | order by Timestamp asc | limit 100", namespace, start.Format("2006-01-02 15:04:05.000"))),
|
||||
// Timespan: options.String("P7D"),
|
||||
// }
|
||||
|
||||
// res, err := p.insightLogs.Execute(context.Background(), p.Workspace, query)
|
||||
// if err != nil {
|
||||
// fmt.Printf("err: %+v\n", err)
|
||||
// return
|
||||
// }
|
||||
// if len(*res.Tables) < 1 {
|
||||
// fmt.Println("no tables")
|
||||
// return
|
||||
// }
|
||||
|
||||
// t := (*res.Tables)[0]
|
||||
|
||||
// if len(*t.Rows) == 0 && !common.DefaultBool(opts.Follow, true) {
|
||||
// return
|
||||
// }
|
||||
|
||||
// for _, row := range *t.Rows {
|
||||
// attrs := parseRow(row, *t.Columns)
|
||||
|
||||
// ts, err := time.Parse("2006-01-02T15:04:05.999Z", attrs["Timestamp"])
|
||||
// if err != nil {
|
||||
// fmt.Printf("err: %+v\n", err)
|
||||
// continue
|
||||
// }
|
||||
|
||||
// if ts.After(start) {
|
||||
// start = ts
|
||||
// }
|
||||
|
||||
// var labels map[string]string
|
||||
|
||||
// if err := json.Unmarshal([]byte(strings.Trim(attrs["Labels"], "[]")), &labels); err != nil {
|
||||
// fmt.Printf("err: %+v\n", err)
|
||||
// continue
|
||||
// }
|
||||
|
||||
// service := labels["service"]
|
||||
// pod := attrs["Pod"]
|
||||
|
||||
// prefix := ""
|
||||
|
||||
// if common.DefaultBool(opts.Prefix, false) {
|
||||
// prefix = fmt.Sprintf("%s service/%s/%s ", ts.Format(time.RFC3339), service, pod)
|
||||
// }
|
||||
|
||||
// if _, err := w.Write([]byte(fmt.Sprintf("%s%s\n", prefix, attrs["Message"]))); err != nil {
|
||||
// fmt.Printf("err: %+v\n", err)
|
||||
// }
|
||||
// }
|
||||
|
||||
// time.Sleep(5 * time.Second)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// func parseRow(row []interface{}, cols []operationalinsights.Column) map[string]string {
|
||||
// attrs := map[string]string{}
|
||||
|
||||
// for i, c := range cols {
|
||||
// if v, ok := row[i].(string); ok && c.Name != nil {
|
||||
// attrs[*c.Name] = v
|
||||
// }
|
||||
// }
|
||||
|
||||
// return attrs
|
||||
// }
|
||||
|
||||
@ -9,9 +9,9 @@ import (
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3iface"
|
||||
"github.com/convox/convox/pkg/elastic"
|
||||
"github.com/convox/convox/pkg/structs"
|
||||
"github.com/convox/convox/provider/k8s"
|
||||
"github.com/elastic/go-elasticsearch/v6"
|
||||
)
|
||||
|
||||
type Provider struct {
|
||||
@ -25,7 +25,7 @@ type Provider struct {
|
||||
SpacesEndpoint string
|
||||
SpacesSecret string
|
||||
|
||||
elastic *elasticsearch.Client
|
||||
elastic *elastic.Client
|
||||
s3 s3iface.S3API
|
||||
}
|
||||
|
||||
@ -70,12 +70,12 @@ func (p *Provider) WithContext(ctx context.Context) structs.Provider {
|
||||
}
|
||||
|
||||
func (p *Provider) initializeDOServices() error {
|
||||
es, err := elasticsearch.NewDefaultClient()
|
||||
ec, err := elastic.New(os.Getenv("ELASTIC_URL"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.elastic = es
|
||||
p.elastic = ec
|
||||
|
||||
s, err := session.NewSession(&aws.Config{
|
||||
Region: aws.String(p.Region),
|
||||
|
||||
@ -1,51 +1,22 @@
|
||||
package do
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/convox/convox/pkg/common"
|
||||
"github.com/convox/convox/pkg/structs"
|
||||
)
|
||||
|
||||
var sequenceTokens sync.Map
|
||||
|
||||
type elasticSearchResult struct {
|
||||
Hits struct {
|
||||
Hits []struct {
|
||||
Index string `json:"_index"`
|
||||
Source struct {
|
||||
Log string
|
||||
Stream string
|
||||
Timestamp time.Time `json:"@timestamp"`
|
||||
} `json:"_source"`
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) Log(app, stream string, ts time.Time, message string) error {
|
||||
index := fmt.Sprintf("convox.%s.%s", p.Name, app)
|
||||
|
||||
body := map[string]interface{}{
|
||||
"log": fmt.Sprintf("%s\n", message),
|
||||
"stream": stream,
|
||||
"@timestamp": ts.Format(time.RFC3339Nano),
|
||||
tags := map[string]string{
|
||||
"app": app,
|
||||
"stream": stream,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := p.elastic.Index(index, bytes.NewReader(data)); err != nil {
|
||||
if err := p.elastic.Write(index, ts, message, tags); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -55,7 +26,7 @@ func (p *Provider) Log(app, stream string, ts time.Time, message string) error {
|
||||
func (p *Provider) AppLogs(name string, opts structs.LogsOptions) (io.ReadCloser, error) {
|
||||
r, w := io.Pipe()
|
||||
|
||||
go p.streamLogs(p.Context(), w, fmt.Sprintf("convox.%s.%s", p.Name, name), opts)
|
||||
go p.elastic.Stream(p.Context(), w, fmt.Sprintf("convox.%s.%s", p.Name, name), opts)
|
||||
|
||||
return r, nil
|
||||
}
|
||||
@ -63,95 +34,3 @@ func (p *Provider) AppLogs(name string, opts structs.LogsOptions) (io.ReadCloser
|
||||
func (p *Provider) SystemLogs(opts structs.LogsOptions) (io.ReadCloser, error) {
|
||||
return p.AppLogs("system", opts)
|
||||
}
|
||||
|
||||
func (p *Provider) streamLogs(ctx context.Context, w io.WriteCloser, index string, opts structs.LogsOptions) {
|
||||
defer w.Close()
|
||||
|
||||
follow := common.DefaultBool(opts.Follow, true)
|
||||
now := time.Now().UTC()
|
||||
since := time.Time{}
|
||||
|
||||
if opts.Since != nil {
|
||||
since = time.Now().UTC().Add(*opts.Since * -1)
|
||||
}
|
||||
|
||||
for {
|
||||
// check for closed writer
|
||||
if _, err := w.Write([]byte{}); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
timestamp := map[string]interface{}{
|
||||
"gt": since.UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
if !follow {
|
||||
timestamp["lt"] = now.Format(time.RFC3339)
|
||||
}
|
||||
|
||||
body := map[string]interface{}{
|
||||
"query": map[string]interface{}{
|
||||
"range": map[string]interface{}{
|
||||
"@timestamp": timestamp,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
data, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
fmt.Printf("err: %+v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
res, err := p.elastic.Search(
|
||||
p.elastic.Search.WithIndex(index),
|
||||
p.elastic.Search.WithSize(5000),
|
||||
p.elastic.Search.WithBody(bytes.NewReader(data)),
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Printf("err: %+v\n", err)
|
||||
return
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
data, err = ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
fmt.Printf("err: %+v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
var sres elasticSearchResult
|
||||
|
||||
if err := json.Unmarshal(data, &sres); err != nil {
|
||||
fmt.Printf("err: %+v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
sort.Slice(sres.Hits.Hits, func(i, j int) bool {
|
||||
return sres.Hits.Hits[i].Source.Timestamp.Before(sres.Hits.Hits[j].Source.Timestamp)
|
||||
})
|
||||
|
||||
if len(sres.Hits.Hits) == 0 && !follow {
|
||||
return
|
||||
}
|
||||
|
||||
for _, log := range sres.Hits.Hits {
|
||||
prefix := ""
|
||||
|
||||
if common.DefaultBool(opts.Prefix, false) {
|
||||
prefix = fmt.Sprintf("%s %s ", log.Source.Timestamp.Format(time.RFC3339), strings.ReplaceAll(log.Source.Stream, ".", "/"))
|
||||
}
|
||||
|
||||
fmt.Fprintf(w, "%s%s", prefix, log.Source.Log)
|
||||
|
||||
since = log.Source.Timestamp
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,9 +5,8 @@ import (
|
||||
"encoding/base64"
|
||||
"os"
|
||||
|
||||
"cloud.google.com/go/logging"
|
||||
"cloud.google.com/go/logging/logadmin"
|
||||
"cloud.google.com/go/storage"
|
||||
"github.com/convox/convox/pkg/elastic"
|
||||
"github.com/convox/convox/pkg/structs"
|
||||
"github.com/convox/convox/pkg/templater"
|
||||
"github.com/convox/convox/provider/k8s"
|
||||
@ -23,10 +22,8 @@ type Provider struct {
|
||||
Region string
|
||||
Registry string
|
||||
|
||||
LogAdmin *logadmin.Client
|
||||
Logging *logging.Client
|
||||
Storage *storage.Client
|
||||
|
||||
elastic *elastic.Client
|
||||
storage *storage.Client
|
||||
templater *templater.Templater
|
||||
}
|
||||
|
||||
@ -77,28 +74,21 @@ func (p *Provider) WithContext(ctx context.Context) structs.Provider {
|
||||
}
|
||||
|
||||
func (p *Provider) initializeGcpServices() error {
|
||||
ec, err := elastic.New(os.Getenv("ELASTIC_URL"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.elastic = ec
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
l, err := logging.NewClient(ctx, p.Project)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.Logging = l
|
||||
|
||||
la, err := logadmin.NewClient(ctx, p.Project)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.LogAdmin = la
|
||||
|
||||
s, err := storage.NewClient(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.Storage = s
|
||||
p.storage = s
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1,40 +1,21 @@
|
||||
package gcp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"cloud.google.com/go/logging"
|
||||
"cloud.google.com/go/logging/logadmin"
|
||||
"github.com/convox/convox/pkg/common"
|
||||
"github.com/convox/convox/pkg/structs"
|
||||
"google.golang.org/api/iterator"
|
||||
)
|
||||
|
||||
const (
|
||||
logTimeFormat = "2006-01-02T15:04:05.999999999Z"
|
||||
)
|
||||
|
||||
var sequenceTokens sync.Map
|
||||
|
||||
func (p *Provider) Log(app, stream string, ts time.Time, message string) error {
|
||||
logger := p.Logging.Logger("system")
|
||||
index := fmt.Sprintf("convox.%s.%s", p.Name, app)
|
||||
|
||||
logger.Log(logging.Entry{
|
||||
Labels: map[string]string{
|
||||
"container.googleapis.com/namespace_name": p.AppNamespace(app),
|
||||
"stream": stream,
|
||||
},
|
||||
Payload: message,
|
||||
Severity: logging.Info,
|
||||
})
|
||||
tags := map[string]string{
|
||||
"stream": stream,
|
||||
}
|
||||
|
||||
if err := logger.Flush(); err != nil {
|
||||
fmt.Printf("err: %+v\n", err)
|
||||
if err := p.elastic.Write(index, ts, message, tags); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -44,7 +25,7 @@ func (p *Provider) Log(app, stream string, ts time.Time, message string) error {
|
||||
func (p *Provider) AppLogs(name string, opts structs.LogsOptions) (io.ReadCloser, error) {
|
||||
r, w := io.Pipe()
|
||||
|
||||
go p.logFilter(p.Context(), w, p.logFilters(name), opts)
|
||||
go p.elastic.Stream(p.Context(), w, fmt.Sprintf("convox.%s.%s", p.Name, name), opts)
|
||||
|
||||
return r, nil
|
||||
}
|
||||
@ -52,85 +33,3 @@ func (p *Provider) AppLogs(name string, opts structs.LogsOptions) (io.ReadCloser
|
||||
func (p *Provider) SystemLogs(opts structs.LogsOptions) (io.ReadCloser, error) {
|
||||
return p.AppLogs("system", opts)
|
||||
}
|
||||
|
||||
func (p *Provider) logFilter(ctx context.Context, w io.WriteCloser, filter string, opts structs.LogsOptions) {
|
||||
defer w.Close()
|
||||
|
||||
var since time.Time
|
||||
|
||||
if opts.Since != nil {
|
||||
since = time.Now().UTC().Add(-1 * *opts.Since)
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
follow := common.DefaultBool(opts.Follow, true)
|
||||
|
||||
Iteration:
|
||||
|
||||
for {
|
||||
where := fmt.Sprintf("%s AND timestamp > %q", filter, since.Format(logTimeFormat))
|
||||
|
||||
if !follow {
|
||||
where = fmt.Sprintf("%s AND timestamp < %q", where, now.Format(logTimeFormat))
|
||||
}
|
||||
|
||||
it := p.LogAdmin.Entries(ctx, logadmin.Filter(where))
|
||||
|
||||
for {
|
||||
// check for closed writer
|
||||
if _, err := w.Write([]byte{}); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
entry, err := it.Next()
|
||||
if err == iterator.Done {
|
||||
if !follow {
|
||||
return
|
||||
}
|
||||
time.Sleep(2 * time.Second)
|
||||
continue Iteration
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Fprintf(w, "ERROR: %s\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
prefix := ""
|
||||
|
||||
labels := entry.Resource.GetLabels()
|
||||
|
||||
typ := common.CoalesceString(entry.Labels["k8s-pod/type"], "unknown")
|
||||
name := common.CoalesceString(entry.Labels["k8s-pod/name"], "unknown")
|
||||
|
||||
if common.DefaultBool(opts.Prefix, false) {
|
||||
prefix = fmt.Sprintf("%s %s/%s/%s ", entry.Timestamp.Format(time.RFC3339), typ, name, labels["pod_name"])
|
||||
}
|
||||
|
||||
switch t := entry.Payload.(type) {
|
||||
case string:
|
||||
if _, err := w.Write([]byte(fmt.Sprintf("%s%s\n", prefix, strings.TrimSuffix(t, "\n")))); err != nil {
|
||||
fmt.Printf("err: %+v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
if entry.Timestamp.After(since) {
|
||||
since = entry.Timestamp
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) logFilters(app string) string {
|
||||
filters := []string{
|
||||
`resource.type="k8s_container"`,
|
||||
fmt.Sprintf(`resource.labels.cluster_name=%q`, p.Name),
|
||||
fmt.Sprintf(`resource.labels.namespace_name=%q`, p.AppNamespace(app)),
|
||||
}
|
||||
|
||||
return strings.Join(filters, " AND ")
|
||||
}
|
||||
|
||||
@ -23,7 +23,7 @@ func (p *Provider) ObjectDelete(app, key string) error {
|
||||
return fmt.Errorf("object not found: %s", key)
|
||||
}
|
||||
|
||||
if err := p.Storage.Bucket(p.Bucket).Object(p.objectKey(app, key)).Delete(p.Context()); err != nil {
|
||||
if err := p.storage.Bucket(p.Bucket).Object(p.objectKey(app, key)).Delete(p.Context()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -31,7 +31,7 @@ func (p *Provider) ObjectDelete(app, key string) error {
|
||||
}
|
||||
|
||||
func (p *Provider) ObjectExists(app, key string) (bool, error) {
|
||||
_, err := p.Storage.Bucket(p.Bucket).Object(p.objectKey(app, key)).Attrs(p.Context())
|
||||
_, err := p.storage.Bucket(p.Bucket).Object(p.objectKey(app, key)).Attrs(p.Context())
|
||||
if err == storage.ErrObjectNotExist {
|
||||
return false, nil
|
||||
}
|
||||
@ -44,7 +44,7 @@ func (p *Provider) ObjectExists(app, key string) (bool, error) {
|
||||
|
||||
// ObjectFetch fetches an Object
|
||||
func (p *Provider) ObjectFetch(app, key string) (io.ReadCloser, error) {
|
||||
r, err := p.Storage.Bucket(p.Bucket).Object(p.objectKey(app, key)).NewReader(p.Context())
|
||||
r, err := p.storage.Bucket(p.Bucket).Object(p.objectKey(app, key)).NewReader(p.Context())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -55,7 +55,7 @@ func (p *Provider) ObjectFetch(app, key string) (io.ReadCloser, error) {
|
||||
func (p *Provider) ObjectList(app, prefix string) ([]string, error) {
|
||||
os := []string{}
|
||||
|
||||
it := p.Storage.Bucket(p.Bucket).Objects(p.Context(), nil)
|
||||
it := p.storage.Bucket(p.Bucket).Objects(p.Context(), nil)
|
||||
|
||||
for {
|
||||
attrs, err := it.Next()
|
||||
@ -82,7 +82,7 @@ func (p *Provider) ObjectStore(app, key string, r io.Reader, opts structs.Object
|
||||
key = k
|
||||
}
|
||||
|
||||
o := p.Storage.Bucket(p.Bucket).Object(p.objectKey(app, key))
|
||||
o := p.storage.Bucket(p.Bucket).Object(p.objectKey(app, key))
|
||||
|
||||
w := o.NewWriter(p.Context())
|
||||
|
||||
|
||||
@ -85,7 +85,7 @@ module "k8s" {
|
||||
AZURE_CLIENT_SECRET = azuread_service_principal_password.api.value
|
||||
AZURE_SUBSCRIPTION_ID = data.azurerm_subscription.current.subscription_id
|
||||
AZURE_TENANT_ID = data.azurerm_client_config.current.tenant_id
|
||||
ELASTICSEARCH_URL = module.elasticsearch.url
|
||||
ELASTIC_URL = module.elasticsearch.url
|
||||
PROVIDER = "azure"
|
||||
REGION = var.region
|
||||
REGISTRY = azurerm_container_registry.registry.login_server
|
||||
|
||||
@ -54,16 +54,16 @@ module "k8s" {
|
||||
annotations = {}
|
||||
|
||||
env = {
|
||||
BUCKET = digitalocean_spaces_bucket.storage.name
|
||||
ELASTICSEARCH_URL = module.elasticsearch.url
|
||||
PROVIDER = "do"
|
||||
REGION = var.region
|
||||
REGISTRY = "registry.${var.domain}"
|
||||
RESOLVER = var.resolver
|
||||
ROUTER = var.router
|
||||
SECRET = var.secret
|
||||
SPACES_ACCESS = var.access_id
|
||||
SPACES_ENDPOINT = "https://${var.region}.digitaloceanspaces.com"
|
||||
SPACES_SECRET = var.secret_key
|
||||
BUCKET = digitalocean_spaces_bucket.storage.name
|
||||
ELASTIC_URL = module.elasticsearch.url
|
||||
PROVIDER = "do"
|
||||
REGION = var.region
|
||||
REGISTRY = "registry.${var.domain}"
|
||||
RESOLVER = var.resolver
|
||||
ROUTER = var.router
|
||||
SECRET = var.secret
|
||||
SPACES_ACCESS = var.access_id
|
||||
SPACES_ENDPOINT = "https://${var.region}.digitaloceanspaces.com"
|
||||
SPACES_SECRET = var.secret_key
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,6 +19,28 @@ locals {
|
||||
}
|
||||
}
|
||||
|
||||
module "elasticsearch" {
|
||||
source = "../../elasticsearch/k8s"
|
||||
|
||||
providers = {
|
||||
kubernetes = kubernetes
|
||||
}
|
||||
|
||||
namespace = var.namespace
|
||||
}
|
||||
|
||||
module "fluentd" {
|
||||
source = "../../fluentd/elasticsearch"
|
||||
|
||||
providers = {
|
||||
kubernetes = kubernetes
|
||||
}
|
||||
|
||||
elasticsearch = module.elasticsearch.host
|
||||
namespace = var.namespace
|
||||
name = var.name
|
||||
}
|
||||
|
||||
module "k8s" {
|
||||
source = "../k8s"
|
||||
|
||||
@ -37,14 +59,15 @@ module "k8s" {
|
||||
}
|
||||
|
||||
env = {
|
||||
BUCKET = google_storage_bucket.storage.name
|
||||
KEY = google_service_account_key.api.private_key
|
||||
PROJECT = data.google_client_config.current.project,
|
||||
PROVIDER = "gcp"
|
||||
REGION = data.google_client_config.current.region
|
||||
REGISTRY = data.google_container_registry_repository.registry.repository_url
|
||||
RESOLVER = var.resolver
|
||||
ROUTER = var.router
|
||||
SOCKET = "/var/run/docker.sock"
|
||||
BUCKET = google_storage_bucket.storage.name
|
||||
ELASTIC_URL = module.elasticsearch.url
|
||||
KEY = google_service_account_key.api.private_key
|
||||
PROJECT = data.google_client_config.current.project,
|
||||
PROVIDER = "gcp"
|
||||
REGION = data.google_client_config.current.region
|
||||
REGISTRY = data.google_container_registry_repository.registry.repository_url
|
||||
RESOLVER = var.resolver
|
||||
ROUTER = var.router
|
||||
SOCKET = "/var/run/docker.sock"
|
||||
}
|
||||
}
|
||||
|
||||
@ -4,7 +4,7 @@
|
||||
enable_ruby true
|
||||
<record>
|
||||
index convox.$${record["kubernetes"]["namespace_labels"]["rack"]}.$${record["kubernetes"]["namespace_labels"]["app"]}
|
||||
stream service.$${record["kubernetes"]["labels"]["service"]}.$${record["kubernetes"]["pod_name"]}
|
||||
stream $${record["kubernetes"]["labels"]["type"]}.$${record["kubernetes"]["labels"]["name"]}.$${record["kubernetes"]["pod_name"]}
|
||||
</record>
|
||||
</filter>
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user