diff --git a/pkg/common/aws.go b/pkg/common/aws.go index eebe4fc..3fb0ad6 100644 --- a/pkg/common/aws.go +++ b/pkg/common/aws.go @@ -536,7 +536,7 @@ func writeLogEvents(w io.Writer, events []*cloudwatchlogs.FilteredLogEvent, opts prefix = fmt.Sprintf("%s %s ", t.Format(time.RFC3339), *e.LogStreamName) } - line := fmt.Sprintf("%s%s\n", prefix, *e.Message) + line := fmt.Sprintf("%s%s\n", prefix, strings.TrimSuffix(*e.Message, "\n")) if _, err := w.Write([]byte(line)); err != nil { return 0, err diff --git a/provider/aws/log.go b/provider/aws/log.go index 4dae20d..e70c412 100644 --- a/provider/aws/log.go +++ b/provider/aws/log.go @@ -117,7 +117,7 @@ func (p *Provider) SystemLogs(opts structs.LogsOptions) (io.ReadCloser, error) { } func (p *Provider) appLogGroup(app string) string { - return fmt.Sprintf("%s-%s", p.Name, app) + return fmt.Sprintf("/convox/%s/%s", p.Name, app) } func (p *Provider) createLogGroup(app string) error { diff --git a/provider/k8s/controller_pod.go b/provider/k8s/controller_pod.go index 6c2ec4f..5d096ca 100644 --- a/provider/k8s/controller_pod.go +++ b/provider/k8s/controller_pod.go @@ -1,12 +1,8 @@ package k8s import ( - "bufio" - "context" "fmt" "reflect" - "strings" - "sync" "time" "github.com/convox/convox/pkg/kctl" @@ -21,15 +17,15 @@ type PodController struct { Controller *kctl.Controller Provider *Provider - logger *podLogger - start time.Time + // logger *podLogger + start time.Time } func NewPodController(p *Provider) (*PodController, error) { pc := &PodController{ Provider: p, - logger: NewPodLogger(p), - start: time.Now().UTC(), + // logger: NewPodLogger(p), + start: time.Now().UTC(), } c, err := kctl.NewController(p.Namespace, "convox-k8s-pod", pc) @@ -84,8 +80,8 @@ func (c *PodController) Add(obj interface{}) error { switch p.Status.Phase { case "Succeeded", "Failed": go c.cleanupPod(p) - case "Pending", "Running": - c.logger.Start(p.ObjectMeta.Namespace, p.ObjectMeta.Name, c.start) + // case "Pending", "Running": + // c.logger.Start(p.ObjectMeta.Namespace, p.ObjectMeta.Name, c.start) } return nil @@ -151,121 +147,121 @@ func assertPod(v interface{}) (*ac.Pod, error) { // return nil // } -type podLogger struct { - provider *Provider - streams sync.Map -} +// type podLogger struct { +// provider *Provider +// streams sync.Map +// } -func NewPodLogger(p *Provider) *podLogger { - return &podLogger{provider: p} -} +// func NewPodLogger(p *Provider) *podLogger { +// return &podLogger{provider: p} +// } -func (l *podLogger) Start(namespace, pod string, start time.Time) { - key := fmt.Sprintf("%s:%s", namespace, pod) +// func (l *podLogger) Start(namespace, pod string, start time.Time) { +// key := fmt.Sprintf("%s:%s", namespace, pod) - ctx, cancel := context.WithCancel(context.Background()) +// ctx, cancel := context.WithCancel(context.Background()) - if _, exists := l.streams.LoadOrStore(key, cancel); !exists { - go l.watch(ctx, namespace, pod, start) - } -} +// if _, exists := l.streams.LoadOrStore(key, cancel); !exists { +// go l.watch(ctx, namespace, pod, start) +// } +// } -func (l *podLogger) Stop(namespace, pod string) { - key := fmt.Sprintf("%s:%s", namespace, pod) +// func (l *podLogger) Stop(namespace, pod string) { +// key := fmt.Sprintf("%s:%s", namespace, pod) - if cv, ok := l.streams.Load(key); ok { - if cfn, ok := cv.(context.CancelFunc); ok { - cfn() - } - l.streams.Delete(key) - } -} +// if cv, ok := l.streams.Load(key); ok { +// if cfn, ok := cv.(context.CancelFunc); ok { +// cfn() +// } +// l.streams.Delete(key) +// } +// } -func (l *podLogger) stream(ch chan string, namespace, pod string, start time.Time) { - defer close(ch) +// func (l *podLogger) stream(ch chan string, namespace, pod string, start time.Time) { +// defer close(ch) - since := am.NewTime(start) +// since := am.NewTime(start) - for { - lopts := &ac.PodLogOptions{ - Follow: true, - SinceTime: &since, - Timestamps: true, - } - r, err := l.provider.Cluster.CoreV1().Pods(namespace).GetLogs(pod, lopts).Stream() - if err != nil { - fmt.Printf("err = %+v\n", err) - break - } +// for { +// lopts := &ac.PodLogOptions{ +// Follow: true, +// SinceTime: &since, +// Timestamps: true, +// } +// r, err := l.provider.Cluster.CoreV1().Pods(namespace).GetLogs(pod, lopts).Stream() +// if err != nil { +// fmt.Printf("err = %+v\n", err) +// break +// } - s := bufio.NewScanner(r) +// s := bufio.NewScanner(r) - s.Buffer(make([]byte, ScannerStartSize), ScannerMaxSize) +// s.Buffer(make([]byte, ScannerStartSize), ScannerMaxSize) - for s.Scan() { - line := s.Text() +// for s.Scan() { +// line := s.Text() - if ts, err := time.Parse(time.RFC3339Nano, strings.Split(line, " ")[0]); err == nil { - since = am.NewTime(ts) - } +// if ts, err := time.Parse(time.RFC3339Nano, strings.Split(line, " ")[0]); err == nil { +// since = am.NewTime(ts) +// } - ch <- line - } +// ch <- line +// } - if err := s.Err(); err != nil { - fmt.Printf("err = %+v\n", err) - continue - } +// if err := s.Err(); err != nil { +// fmt.Printf("err = %+v\n", err) +// continue +// } - break - } -} +// break +// } +// } -func (l *podLogger) watch(ctx context.Context, namespace, pod string, start time.Time) { - defer l.Stop(namespace, pod) +// func (l *podLogger) watch(ctx context.Context, namespace, pod string, start time.Time) { +// defer l.Stop(namespace, pod) - ch := make(chan string) +// ch := make(chan string) - var p *ac.Pod - var err error +// var p *ac.Pod +// var err error - for { - p, err = l.provider.Cluster.CoreV1().Pods(namespace).Get(pod, am.GetOptions{}) - if err != nil { - fmt.Printf("err = %+v\n", err) - return - } +// for { +// p, err = l.provider.Cluster.CoreV1().Pods(namespace).Get(pod, am.GetOptions{}) +// if err != nil { +// fmt.Printf("err = %+v\n", err) +// return +// } - if p.Status.Phase != "Pending" { - break - } +// if p.Status.Phase != "Pending" { +// break +// } - time.Sleep(1 * time.Second) - } +// time.Sleep(1 * time.Second) +// } - app := p.ObjectMeta.Labels["app"] - typ := p.ObjectMeta.Labels["type"] - name := p.ObjectMeta.Labels["name"] +// app := p.ObjectMeta.Labels["app"] +// typ := p.ObjectMeta.Labels["type"] +// name := p.ObjectMeta.Labels["name"] - if typ == "process" { - typ = "service" - } +// if typ == "process" { +// typ = "service" +// } - go l.stream(ch, namespace, pod, start) +// go l.stream(ch, namespace, pod, start) - for { - select { - case <-ctx.Done(): - return - case log, ok := <-ch: - if !ok { - return - } - if parts := strings.SplitN(log, " ", 2); len(parts) == 2 { - if ts, err := time.Parse(time.RFC3339Nano, parts[0]); err == nil { - l.provider.Engine.Log(app, fmt.Sprintf("%s/%s/%s", typ, name, pod), ts, parts[1]) - } - } - } - } -} +// for { +// select { +// case <-ctx.Done(): +// return +// case log, ok := <-ch: +// if !ok { +// return +// } +// if parts := strings.SplitN(log, " ", 2); len(parts) == 2 { +// if ts, err := time.Parse(time.RFC3339Nano, parts[0]); err == nil { +// l.provider.Engine.Log(app, fmt.Sprintf("%s/%s/%s", typ, name, pod), ts, parts[1]) +// } +// } +// } +// } +// }