From a0f93554f311959c27ece092e20cc39e2ac475dd Mon Sep 17 00:00:00 2001 From: David Dollar Date: Thu, 30 Jan 2020 09:29:01 -0500 Subject: [PATCH] gcp: use elastic for logs due to draconian rate limiting on stackdriver (#96) --- pkg/elastic/elastic.go | 163 ++++++++++++ provider/azure/azure.go | 8 +- provider/azure/log.go | 251 +----------------- provider/do/do.go | 8 +- provider/do/log.go | 131 +-------- provider/gcp/gcp.go | 32 +-- provider/gcp/log.go | 113 +------- provider/gcp/object.go | 10 +- terraform/api/azure/main.tf | 2 +- terraform/api/do/main.tf | 22 +- terraform/api/gcp/main.tf | 41 ++- .../fluentd/elasticsearch/target.conf.tpl | 2 +- 12 files changed, 248 insertions(+), 535 deletions(-) create mode 100644 pkg/elastic/elastic.go diff --git a/pkg/elastic/elastic.go b/pkg/elastic/elastic.go new file mode 100644 index 0000000..b69c3a0 --- /dev/null +++ b/pkg/elastic/elastic.go @@ -0,0 +1,163 @@ +package elastic + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "sort" + "strings" + "time" + + "github.com/convox/convox/pkg/common" + "github.com/convox/convox/pkg/structs" + "github.com/elastic/go-elasticsearch/v6" +) + +type Client struct { + client *elasticsearch.Client +} + +type result struct { + Hits struct { + Hits []struct { + Index string `json:"_index"` + Source struct { + Log string + Stream string + Timestamp time.Time `json:"@timestamp"` + } `json:"_source"` + } + } +} + +func New(url string) (*Client, error) { + ec, err := elasticsearch.NewClient(elasticsearch.Config{ + Addresses: []string{url}, + }) + if err != nil { + return nil, err + } + + c := &Client{ + client: ec, + } + + return c, nil +} + +func (c *Client) Stream(ctx context.Context, w io.WriteCloser, index string, opts structs.LogsOptions) { + defer w.Close() + + follow := common.DefaultBool(opts.Follow, true) + now := time.Now().UTC() + since := time.Time{} + + if opts.Since != nil { + since = time.Now().UTC().Add(*opts.Since * -1) + } + + for { + // check for closed writer + if _, err := w.Write([]byte{}); err != nil { + return + } + + select { + case <-ctx.Done(): + return + default: + timestamp := map[string]interface{}{ + "gt": since.UTC().Format(time.RFC3339), + } + + if !follow { + timestamp["lt"] = now.Format(time.RFC3339) + } + + body := map[string]interface{}{ + "query": map[string]interface{}{ + "range": map[string]interface{}{ + "@timestamp": timestamp, + }, + }, + } + + data, err := json.Marshal(body) + if err != nil { + fmt.Printf("err: %+v\n", err) + return + } + + res, err := c.client.Search( + c.client.Search.WithIndex(index), + c.client.Search.WithSize(5000), + c.client.Search.WithBody(bytes.NewReader(data)), + ) + if err != nil { + fmt.Fprintf(w, "error: %v\n", err) + return + } + defer res.Body.Close() + + data, err = ioutil.ReadAll(res.Body) + if err != nil { + fmt.Fprintf(w, "error: %v\n", err) + return + } + + var sres result + + if err := json.Unmarshal(data, &sres); err != nil { + fmt.Fprintf(w, "error: %v\n", err) + return + } + + sort.Slice(sres.Hits.Hits, func(i, j int) bool { + return sres.Hits.Hits[i].Source.Timestamp.Before(sres.Hits.Hits[j].Source.Timestamp) + }) + + if len(sres.Hits.Hits) == 0 && !follow { + return + } + + for _, log := range sres.Hits.Hits { + prefix := "" + + if common.DefaultBool(opts.Prefix, false) { + prefix = fmt.Sprintf("%s %s ", log.Source.Timestamp.Format(time.RFC3339), strings.ReplaceAll(log.Source.Stream, ".", "/")) + } + + fmt.Fprintf(w, "%s%s", prefix, log.Source.Log) + + since = log.Source.Timestamp + } + + time.Sleep(1 * time.Second) + } + } +} + +func (c *Client) Write(index string, ts time.Time, message string, tags map[string]string) error { + body := map[string]interface{}{ + "log": fmt.Sprintf("%s\n", message), + "@timestamp": ts.Format(time.RFC3339Nano), + } + + for k, v := range tags { + body[k] = v + } + + data, err := json.Marshal(body) + if err != nil { + return err + } + + if _, err := c.client.Index(index, bytes.NewReader(data)); err != nil { + return err + } + + return nil +} diff --git a/provider/azure/azure.go b/provider/azure/azure.go index 355b08c..2b87ee8 100644 --- a/provider/azure/azure.go +++ b/provider/azure/azure.go @@ -11,9 +11,9 @@ import ( "github.com/Azure/azure-storage-file-go/azfile" "github.com/Azure/go-autorest/autorest" "github.com/Azure/go-autorest/autorest/azure/auth" + "github.com/convox/convox/pkg/elastic" "github.com/convox/convox/pkg/structs" "github.com/convox/convox/provider/k8s" - "github.com/elastic/go-elasticsearch/v6" "k8s.io/apimachinery/pkg/util/runtime" ) @@ -30,7 +30,7 @@ type Provider struct { Subscription string Workspace string - elastic *elasticsearch.Client + elastic *elastic.Client insightLogs *operationalinsights.QueryClient storageDirectory *azfile.DirectoryURL } @@ -80,12 +80,12 @@ func (p *Provider) WithContext(ctx context.Context) structs.Provider { } func (p *Provider) initializeAzureServices() error { - es, err := elasticsearch.NewDefaultClient() + ec, err := elastic.New(os.Getenv("ELASTIC_URL")) if err != nil { return err } - p.elastic = es + p.elastic = ec il, err := p.azureInsightLogs() if err != nil { diff --git a/provider/azure/log.go b/provider/azure/log.go index 893457b..620e9a4 100644 --- a/provider/azure/log.go +++ b/provider/azure/log.go @@ -1,51 +1,22 @@ package azure import ( - "bytes" - "context" - "encoding/json" "fmt" "io" - "io/ioutil" - "sort" - "strings" - "sync" "time" - "github.com/convox/convox/pkg/common" "github.com/convox/convox/pkg/structs" ) -var sequenceTokens sync.Map - -type elasticSearchResult struct { - Hits struct { - Hits []struct { - Index string `json:"_index"` - Source struct { - Log string - Stream string - Timestamp time.Time `json:"@timestamp"` - } `json:"_source"` - } - } -} - func (p *Provider) Log(app, stream string, ts time.Time, message string) error { index := fmt.Sprintf("convox.%s.%s", p.Name, app) - body := map[string]interface{}{ - "log": fmt.Sprintf("%s\n", message), - "stream": stream, - "@timestamp": ts.Format(time.RFC3339Nano), + tags := map[string]string{ + "app": app, + "stream": stream, } - data, err := json.Marshal(body) - if err != nil { - return err - } - - if _, err := p.elastic.Index(index, bytes.NewReader(data)); err != nil { + if err := p.elastic.Write(index, ts, message, tags); err != nil { return err } @@ -55,7 +26,7 @@ func (p *Provider) Log(app, stream string, ts time.Time, message string) error { func (p *Provider) AppLogs(name string, opts structs.LogsOptions) (io.ReadCloser, error) { r, w := io.Pipe() - go p.streamLogs(p.Context(), w, fmt.Sprintf("convox.%s.%s", p.Name, name), opts) + go p.elastic.Stream(p.Context(), w, fmt.Sprintf("convox.%s.%s", p.Name, name), opts) return r, nil } @@ -63,215 +34,3 @@ func (p *Provider) AppLogs(name string, opts structs.LogsOptions) (io.ReadCloser func (p *Provider) SystemLogs(opts structs.LogsOptions) (io.ReadCloser, error) { return p.AppLogs("system", opts) } - -func (p *Provider) streamLogs(ctx context.Context, w io.WriteCloser, index string, opts structs.LogsOptions) { - defer w.Close() - - follow := common.DefaultBool(opts.Follow, true) - now := time.Now().UTC() - since := time.Time{} - - if opts.Since != nil { - since = time.Now().UTC().Add(*opts.Since * -1) - } - - for { - // check for closed writer - if _, err := w.Write([]byte{}); err != nil { - return - } - - select { - case <-ctx.Done(): - return - default: - timestamp := map[string]interface{}{ - "gt": since.UTC().Format(time.RFC3339), - } - - if !follow { - timestamp["lt"] = now.Format(time.RFC3339) - } - - body := map[string]interface{}{ - "query": map[string]interface{}{ - "range": map[string]interface{}{ - "@timestamp": timestamp, - }, - }, - } - - data, err := json.Marshal(body) - if err != nil { - fmt.Printf("err: %+v\n", err) - return - } - - res, err := p.elastic.Search( - p.elastic.Search.WithIndex(index), - p.elastic.Search.WithSize(5000), - p.elastic.Search.WithBody(bytes.NewReader(data)), - ) - if err != nil { - fmt.Printf("err: %+v\n", err) - return - } - defer res.Body.Close() - - data, err = ioutil.ReadAll(res.Body) - if err != nil { - fmt.Printf("err: %+v\n", err) - return - } - - var sres elasticSearchResult - - if err := json.Unmarshal(data, &sres); err != nil { - fmt.Printf("err: %+v\n", err) - return - } - - sort.Slice(sres.Hits.Hits, func(i, j int) bool { - return sres.Hits.Hits[i].Source.Timestamp.Before(sres.Hits.Hits[j].Source.Timestamp) - }) - - if len(sres.Hits.Hits) == 0 && !follow { - return - } - - for _, log := range sres.Hits.Hits { - prefix := "" - - if common.DefaultBool(opts.Prefix, false) { - prefix = fmt.Sprintf("%s %s ", log.Source.Timestamp.Format(time.RFC3339), strings.ReplaceAll(log.Source.Stream, ".", "/")) - } - - fmt.Fprintf(w, "%s%s", prefix, log.Source.Log) - - since = log.Source.Timestamp - } - - time.Sleep(1 * time.Second) - } - } -} - -// import ( -// "context" -// "encoding/json" -// "fmt" -// "io" -// "strings" -// "time" - -// "github.com/Azure/azure-sdk-for-go/services/operationalinsights/v1/operationalinsights" -// "github.com/convox/convox/pkg/common" -// "github.com/convox/convox/pkg/options" -// "github.com/convox/convox/pkg/structs" -// ) - -// // var sequenceTokens sync.Map - -// func (p *Provider) Log(app, stream string, ts time.Time, message string) error { -// return nil -// } - -// func (p *Provider) AppLogs(name string, opts structs.LogsOptions) (io.ReadCloser, error) { -// r, w := io.Pipe() - -// go p.insightContainerLogs(p.Context(), w, p.AppNamespace(name), opts) - -// return r, nil -// } - -// func (p *Provider) SystemLogs(opts structs.LogsOptions) (io.ReadCloser, error) { -// return p.AppLogs("system", opts) -// } - -// func (p *Provider) insightContainerLogs(ctx context.Context, w io.WriteCloser, namespace string, opts structs.LogsOptions) { -// defer w.Close() - -// since := common.DefaultDuration(opts.Since, 0) -// start := time.Now().Add(-1 * since) - -// for { -// select { -// case <-ctx.Done(): -// return -// default: -// // check for closed writer -// if _, err := w.Write([]byte{}); err != nil { -// return -// } - -// query := operationalinsights.QueryBody{ -// Query: options.String(fmt.Sprintf("KubePodInventory | join kind=innerunique ContainerLog on ContainerID | project Timestamp=TimeGenerated1,Message=LogEntry,Namespace,Pod=Name,Labels=PodLabel | where Namespace==%q and Timestamp > datetime(%s) | order by Timestamp asc | limit 100", namespace, start.Format("2006-01-02 15:04:05.000"))), -// Timespan: options.String("P7D"), -// } - -// res, err := p.insightLogs.Execute(context.Background(), p.Workspace, query) -// if err != nil { -// fmt.Printf("err: %+v\n", err) -// return -// } -// if len(*res.Tables) < 1 { -// fmt.Println("no tables") -// return -// } - -// t := (*res.Tables)[0] - -// if len(*t.Rows) == 0 && !common.DefaultBool(opts.Follow, true) { -// return -// } - -// for _, row := range *t.Rows { -// attrs := parseRow(row, *t.Columns) - -// ts, err := time.Parse("2006-01-02T15:04:05.999Z", attrs["Timestamp"]) -// if err != nil { -// fmt.Printf("err: %+v\n", err) -// continue -// } - -// if ts.After(start) { -// start = ts -// } - -// var labels map[string]string - -// if err := json.Unmarshal([]byte(strings.Trim(attrs["Labels"], "[]")), &labels); err != nil { -// fmt.Printf("err: %+v\n", err) -// continue -// } - -// service := labels["service"] -// pod := attrs["Pod"] - -// prefix := "" - -// if common.DefaultBool(opts.Prefix, false) { -// prefix = fmt.Sprintf("%s service/%s/%s ", ts.Format(time.RFC3339), service, pod) -// } - -// if _, err := w.Write([]byte(fmt.Sprintf("%s%s\n", prefix, attrs["Message"]))); err != nil { -// fmt.Printf("err: %+v\n", err) -// } -// } - -// time.Sleep(5 * time.Second) -// } -// } -// } - -// func parseRow(row []interface{}, cols []operationalinsights.Column) map[string]string { -// attrs := map[string]string{} - -// for i, c := range cols { -// if v, ok := row[i].(string); ok && c.Name != nil { -// attrs[*c.Name] = v -// } -// } - -// return attrs -// } diff --git a/provider/do/do.go b/provider/do/do.go index cbc9395..3bf4aa1 100644 --- a/provider/do/do.go +++ b/provider/do/do.go @@ -9,9 +9,9 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/convox/convox/pkg/elastic" "github.com/convox/convox/pkg/structs" "github.com/convox/convox/provider/k8s" - "github.com/elastic/go-elasticsearch/v6" ) type Provider struct { @@ -25,7 +25,7 @@ type Provider struct { SpacesEndpoint string SpacesSecret string - elastic *elasticsearch.Client + elastic *elastic.Client s3 s3iface.S3API } @@ -70,12 +70,12 @@ func (p *Provider) WithContext(ctx context.Context) structs.Provider { } func (p *Provider) initializeDOServices() error { - es, err := elasticsearch.NewDefaultClient() + ec, err := elastic.New(os.Getenv("ELASTIC_URL")) if err != nil { return err } - p.elastic = es + p.elastic = ec s, err := session.NewSession(&aws.Config{ Region: aws.String(p.Region), diff --git a/provider/do/log.go b/provider/do/log.go index c6234c0..bef8526 100644 --- a/provider/do/log.go +++ b/provider/do/log.go @@ -1,51 +1,22 @@ package do import ( - "bytes" - "context" - "encoding/json" "fmt" "io" - "io/ioutil" - "sort" - "strings" - "sync" "time" - "github.com/convox/convox/pkg/common" "github.com/convox/convox/pkg/structs" ) -var sequenceTokens sync.Map - -type elasticSearchResult struct { - Hits struct { - Hits []struct { - Index string `json:"_index"` - Source struct { - Log string - Stream string - Timestamp time.Time `json:"@timestamp"` - } `json:"_source"` - } - } -} - func (p *Provider) Log(app, stream string, ts time.Time, message string) error { index := fmt.Sprintf("convox.%s.%s", p.Name, app) - body := map[string]interface{}{ - "log": fmt.Sprintf("%s\n", message), - "stream": stream, - "@timestamp": ts.Format(time.RFC3339Nano), + tags := map[string]string{ + "app": app, + "stream": stream, } - data, err := json.Marshal(body) - if err != nil { - return err - } - - if _, err := p.elastic.Index(index, bytes.NewReader(data)); err != nil { + if err := p.elastic.Write(index, ts, message, tags); err != nil { return err } @@ -55,7 +26,7 @@ func (p *Provider) Log(app, stream string, ts time.Time, message string) error { func (p *Provider) AppLogs(name string, opts structs.LogsOptions) (io.ReadCloser, error) { r, w := io.Pipe() - go p.streamLogs(p.Context(), w, fmt.Sprintf("convox.%s.%s", p.Name, name), opts) + go p.elastic.Stream(p.Context(), w, fmt.Sprintf("convox.%s.%s", p.Name, name), opts) return r, nil } @@ -63,95 +34,3 @@ func (p *Provider) AppLogs(name string, opts structs.LogsOptions) (io.ReadCloser func (p *Provider) SystemLogs(opts structs.LogsOptions) (io.ReadCloser, error) { return p.AppLogs("system", opts) } - -func (p *Provider) streamLogs(ctx context.Context, w io.WriteCloser, index string, opts structs.LogsOptions) { - defer w.Close() - - follow := common.DefaultBool(opts.Follow, true) - now := time.Now().UTC() - since := time.Time{} - - if opts.Since != nil { - since = time.Now().UTC().Add(*opts.Since * -1) - } - - for { - // check for closed writer - if _, err := w.Write([]byte{}); err != nil { - return - } - - select { - case <-ctx.Done(): - return - default: - timestamp := map[string]interface{}{ - "gt": since.UTC().Format(time.RFC3339), - } - - if !follow { - timestamp["lt"] = now.Format(time.RFC3339) - } - - body := map[string]interface{}{ - "query": map[string]interface{}{ - "range": map[string]interface{}{ - "@timestamp": timestamp, - }, - }, - } - - data, err := json.Marshal(body) - if err != nil { - fmt.Printf("err: %+v\n", err) - return - } - - res, err := p.elastic.Search( - p.elastic.Search.WithIndex(index), - p.elastic.Search.WithSize(5000), - p.elastic.Search.WithBody(bytes.NewReader(data)), - ) - if err != nil { - fmt.Printf("err: %+v\n", err) - return - } - defer res.Body.Close() - - data, err = ioutil.ReadAll(res.Body) - if err != nil { - fmt.Printf("err: %+v\n", err) - return - } - - var sres elasticSearchResult - - if err := json.Unmarshal(data, &sres); err != nil { - fmt.Printf("err: %+v\n", err) - return - } - - sort.Slice(sres.Hits.Hits, func(i, j int) bool { - return sres.Hits.Hits[i].Source.Timestamp.Before(sres.Hits.Hits[j].Source.Timestamp) - }) - - if len(sres.Hits.Hits) == 0 && !follow { - return - } - - for _, log := range sres.Hits.Hits { - prefix := "" - - if common.DefaultBool(opts.Prefix, false) { - prefix = fmt.Sprintf("%s %s ", log.Source.Timestamp.Format(time.RFC3339), strings.ReplaceAll(log.Source.Stream, ".", "/")) - } - - fmt.Fprintf(w, "%s%s", prefix, log.Source.Log) - - since = log.Source.Timestamp - } - - time.Sleep(1 * time.Second) - } - } -} diff --git a/provider/gcp/gcp.go b/provider/gcp/gcp.go index d329400..3b8b5d7 100644 --- a/provider/gcp/gcp.go +++ b/provider/gcp/gcp.go @@ -5,9 +5,8 @@ import ( "encoding/base64" "os" - "cloud.google.com/go/logging" - "cloud.google.com/go/logging/logadmin" "cloud.google.com/go/storage" + "github.com/convox/convox/pkg/elastic" "github.com/convox/convox/pkg/structs" "github.com/convox/convox/pkg/templater" "github.com/convox/convox/provider/k8s" @@ -23,10 +22,8 @@ type Provider struct { Region string Registry string - LogAdmin *logadmin.Client - Logging *logging.Client - Storage *storage.Client - + elastic *elastic.Client + storage *storage.Client templater *templater.Templater } @@ -77,28 +74,21 @@ func (p *Provider) WithContext(ctx context.Context) structs.Provider { } func (p *Provider) initializeGcpServices() error { + ec, err := elastic.New(os.Getenv("ELASTIC_URL")) + if err != nil { + return err + } + + p.elastic = ec + ctx := context.Background() - l, err := logging.NewClient(ctx, p.Project) - if err != nil { - return err - } - - p.Logging = l - - la, err := logadmin.NewClient(ctx, p.Project) - if err != nil { - return err - } - - p.LogAdmin = la - s, err := storage.NewClient(ctx) if err != nil { return err } - p.Storage = s + p.storage = s return nil } diff --git a/provider/gcp/log.go b/provider/gcp/log.go index 621a5be..8080767 100644 --- a/provider/gcp/log.go +++ b/provider/gcp/log.go @@ -1,40 +1,21 @@ package gcp import ( - "context" "fmt" "io" - "strings" - "sync" "time" - "cloud.google.com/go/logging" - "cloud.google.com/go/logging/logadmin" - "github.com/convox/convox/pkg/common" "github.com/convox/convox/pkg/structs" - "google.golang.org/api/iterator" ) -const ( - logTimeFormat = "2006-01-02T15:04:05.999999999Z" -) - -var sequenceTokens sync.Map - func (p *Provider) Log(app, stream string, ts time.Time, message string) error { - logger := p.Logging.Logger("system") + index := fmt.Sprintf("convox.%s.%s", p.Name, app) - logger.Log(logging.Entry{ - Labels: map[string]string{ - "container.googleapis.com/namespace_name": p.AppNamespace(app), - "stream": stream, - }, - Payload: message, - Severity: logging.Info, - }) + tags := map[string]string{ + "stream": stream, + } - if err := logger.Flush(); err != nil { - fmt.Printf("err: %+v\n", err) + if err := p.elastic.Write(index, ts, message, tags); err != nil { return err } @@ -44,7 +25,7 @@ func (p *Provider) Log(app, stream string, ts time.Time, message string) error { func (p *Provider) AppLogs(name string, opts structs.LogsOptions) (io.ReadCloser, error) { r, w := io.Pipe() - go p.logFilter(p.Context(), w, p.logFilters(name), opts) + go p.elastic.Stream(p.Context(), w, fmt.Sprintf("convox.%s.%s", p.Name, name), opts) return r, nil } @@ -52,85 +33,3 @@ func (p *Provider) AppLogs(name string, opts structs.LogsOptions) (io.ReadCloser func (p *Provider) SystemLogs(opts structs.LogsOptions) (io.ReadCloser, error) { return p.AppLogs("system", opts) } - -func (p *Provider) logFilter(ctx context.Context, w io.WriteCloser, filter string, opts structs.LogsOptions) { - defer w.Close() - - var since time.Time - - if opts.Since != nil { - since = time.Now().UTC().Add(-1 * *opts.Since) - } - - now := time.Now().UTC() - follow := common.DefaultBool(opts.Follow, true) - -Iteration: - - for { - where := fmt.Sprintf("%s AND timestamp > %q", filter, since.Format(logTimeFormat)) - - if !follow { - where = fmt.Sprintf("%s AND timestamp < %q", where, now.Format(logTimeFormat)) - } - - it := p.LogAdmin.Entries(ctx, logadmin.Filter(where)) - - for { - // check for closed writer - if _, err := w.Write([]byte{}); err != nil { - return - } - - select { - case <-ctx.Done(): - return - default: - entry, err := it.Next() - if err == iterator.Done { - if !follow { - return - } - time.Sleep(2 * time.Second) - continue Iteration - } - if err != nil { - fmt.Fprintf(w, "ERROR: %s\n", err) - return - } - - prefix := "" - - labels := entry.Resource.GetLabels() - - typ := common.CoalesceString(entry.Labels["k8s-pod/type"], "unknown") - name := common.CoalesceString(entry.Labels["k8s-pod/name"], "unknown") - - if common.DefaultBool(opts.Prefix, false) { - prefix = fmt.Sprintf("%s %s/%s/%s ", entry.Timestamp.Format(time.RFC3339), typ, name, labels["pod_name"]) - } - - switch t := entry.Payload.(type) { - case string: - if _, err := w.Write([]byte(fmt.Sprintf("%s%s\n", prefix, strings.TrimSuffix(t, "\n")))); err != nil { - fmt.Printf("err: %+v\n", err) - } - } - - if entry.Timestamp.After(since) { - since = entry.Timestamp - } - } - } - } -} - -func (p *Provider) logFilters(app string) string { - filters := []string{ - `resource.type="k8s_container"`, - fmt.Sprintf(`resource.labels.cluster_name=%q`, p.Name), - fmt.Sprintf(`resource.labels.namespace_name=%q`, p.AppNamespace(app)), - } - - return strings.Join(filters, " AND ") -} diff --git a/provider/gcp/object.go b/provider/gcp/object.go index e5449f3..d9e56a6 100644 --- a/provider/gcp/object.go +++ b/provider/gcp/object.go @@ -23,7 +23,7 @@ func (p *Provider) ObjectDelete(app, key string) error { return fmt.Errorf("object not found: %s", key) } - if err := p.Storage.Bucket(p.Bucket).Object(p.objectKey(app, key)).Delete(p.Context()); err != nil { + if err := p.storage.Bucket(p.Bucket).Object(p.objectKey(app, key)).Delete(p.Context()); err != nil { return err } @@ -31,7 +31,7 @@ func (p *Provider) ObjectDelete(app, key string) error { } func (p *Provider) ObjectExists(app, key string) (bool, error) { - _, err := p.Storage.Bucket(p.Bucket).Object(p.objectKey(app, key)).Attrs(p.Context()) + _, err := p.storage.Bucket(p.Bucket).Object(p.objectKey(app, key)).Attrs(p.Context()) if err == storage.ErrObjectNotExist { return false, nil } @@ -44,7 +44,7 @@ func (p *Provider) ObjectExists(app, key string) (bool, error) { // ObjectFetch fetches an Object func (p *Provider) ObjectFetch(app, key string) (io.ReadCloser, error) { - r, err := p.Storage.Bucket(p.Bucket).Object(p.objectKey(app, key)).NewReader(p.Context()) + r, err := p.storage.Bucket(p.Bucket).Object(p.objectKey(app, key)).NewReader(p.Context()) if err != nil { return nil, err } @@ -55,7 +55,7 @@ func (p *Provider) ObjectFetch(app, key string) (io.ReadCloser, error) { func (p *Provider) ObjectList(app, prefix string) ([]string, error) { os := []string{} - it := p.Storage.Bucket(p.Bucket).Objects(p.Context(), nil) + it := p.storage.Bucket(p.Bucket).Objects(p.Context(), nil) for { attrs, err := it.Next() @@ -82,7 +82,7 @@ func (p *Provider) ObjectStore(app, key string, r io.Reader, opts structs.Object key = k } - o := p.Storage.Bucket(p.Bucket).Object(p.objectKey(app, key)) + o := p.storage.Bucket(p.Bucket).Object(p.objectKey(app, key)) w := o.NewWriter(p.Context()) diff --git a/terraform/api/azure/main.tf b/terraform/api/azure/main.tf index 3b54ab0..e8bb19b 100644 --- a/terraform/api/azure/main.tf +++ b/terraform/api/azure/main.tf @@ -85,7 +85,7 @@ module "k8s" { AZURE_CLIENT_SECRET = azuread_service_principal_password.api.value AZURE_SUBSCRIPTION_ID = data.azurerm_subscription.current.subscription_id AZURE_TENANT_ID = data.azurerm_client_config.current.tenant_id - ELASTICSEARCH_URL = module.elasticsearch.url + ELASTIC_URL = module.elasticsearch.url PROVIDER = "azure" REGION = var.region REGISTRY = azurerm_container_registry.registry.login_server diff --git a/terraform/api/do/main.tf b/terraform/api/do/main.tf index 84b9a42..3fbc789 100644 --- a/terraform/api/do/main.tf +++ b/terraform/api/do/main.tf @@ -54,16 +54,16 @@ module "k8s" { annotations = {} env = { - BUCKET = digitalocean_spaces_bucket.storage.name - ELASTICSEARCH_URL = module.elasticsearch.url - PROVIDER = "do" - REGION = var.region - REGISTRY = "registry.${var.domain}" - RESOLVER = var.resolver - ROUTER = var.router - SECRET = var.secret - SPACES_ACCESS = var.access_id - SPACES_ENDPOINT = "https://${var.region}.digitaloceanspaces.com" - SPACES_SECRET = var.secret_key + BUCKET = digitalocean_spaces_bucket.storage.name + ELASTIC_URL = module.elasticsearch.url + PROVIDER = "do" + REGION = var.region + REGISTRY = "registry.${var.domain}" + RESOLVER = var.resolver + ROUTER = var.router + SECRET = var.secret + SPACES_ACCESS = var.access_id + SPACES_ENDPOINT = "https://${var.region}.digitaloceanspaces.com" + SPACES_SECRET = var.secret_key } } diff --git a/terraform/api/gcp/main.tf b/terraform/api/gcp/main.tf index 04bcc46..d144d8b 100644 --- a/terraform/api/gcp/main.tf +++ b/terraform/api/gcp/main.tf @@ -19,6 +19,28 @@ locals { } } +module "elasticsearch" { + source = "../../elasticsearch/k8s" + + providers = { + kubernetes = kubernetes + } + + namespace = var.namespace +} + +module "fluentd" { + source = "../../fluentd/elasticsearch" + + providers = { + kubernetes = kubernetes + } + + elasticsearch = module.elasticsearch.host + namespace = var.namespace + name = var.name +} + module "k8s" { source = "../k8s" @@ -37,14 +59,15 @@ module "k8s" { } env = { - BUCKET = google_storage_bucket.storage.name - KEY = google_service_account_key.api.private_key - PROJECT = data.google_client_config.current.project, - PROVIDER = "gcp" - REGION = data.google_client_config.current.region - REGISTRY = data.google_container_registry_repository.registry.repository_url - RESOLVER = var.resolver - ROUTER = var.router - SOCKET = "/var/run/docker.sock" + BUCKET = google_storage_bucket.storage.name + ELASTIC_URL = module.elasticsearch.url + KEY = google_service_account_key.api.private_key + PROJECT = data.google_client_config.current.project, + PROVIDER = "gcp" + REGION = data.google_client_config.current.region + REGISTRY = data.google_container_registry_repository.registry.repository_url + RESOLVER = var.resolver + ROUTER = var.router + SOCKET = "/var/run/docker.sock" } } diff --git a/terraform/fluentd/elasticsearch/target.conf.tpl b/terraform/fluentd/elasticsearch/target.conf.tpl index 8d00207..272482f 100644 --- a/terraform/fluentd/elasticsearch/target.conf.tpl +++ b/terraform/fluentd/elasticsearch/target.conf.tpl @@ -4,7 +4,7 @@ enable_ruby true index convox.$${record["kubernetes"]["namespace_labels"]["rack"]}.$${record["kubernetes"]["namespace_labels"]["app"]} - stream service.$${record["kubernetes"]["labels"]["service"]}.$${record["kubernetes"]["pod_name"]} + stream $${record["kubernetes"]["labels"]["type"]}.$${record["kubernetes"]["labels"]["name"]}.$${record["kubernetes"]["pod_name"]}