move logs to fluentd

This commit is contained in:
David Dollar 2019-08-23 09:36:12 -04:00
parent c5a611a053
commit a9546baa3b
No known key found for this signature in database
GPG Key ID: AFAF263FB45B2124
3 changed files with 100 additions and 104 deletions

View File

@ -536,7 +536,7 @@ func writeLogEvents(w io.Writer, events []*cloudwatchlogs.FilteredLogEvent, opts
prefix = fmt.Sprintf("%s %s ", t.Format(time.RFC3339), *e.LogStreamName)
}
line := fmt.Sprintf("%s%s\n", prefix, *e.Message)
line := fmt.Sprintf("%s%s\n", prefix, strings.TrimSuffix(*e.Message, "\n"))
if _, err := w.Write([]byte(line)); err != nil {
return 0, err

View File

@ -117,7 +117,7 @@ func (p *Provider) SystemLogs(opts structs.LogsOptions) (io.ReadCloser, error) {
}
func (p *Provider) appLogGroup(app string) string {
return fmt.Sprintf("%s-%s", p.Name, app)
return fmt.Sprintf("/convox/%s/%s", p.Name, app)
}
func (p *Provider) createLogGroup(app string) error {

View File

@ -1,12 +1,8 @@
package k8s
import (
"bufio"
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"
"github.com/convox/convox/pkg/kctl"
@ -21,15 +17,15 @@ type PodController struct {
Controller *kctl.Controller
Provider *Provider
logger *podLogger
start time.Time
// logger *podLogger
start time.Time
}
func NewPodController(p *Provider) (*PodController, error) {
pc := &PodController{
Provider: p,
logger: NewPodLogger(p),
start: time.Now().UTC(),
// logger: NewPodLogger(p),
start: time.Now().UTC(),
}
c, err := kctl.NewController(p.Namespace, "convox-k8s-pod", pc)
@ -84,8 +80,8 @@ func (c *PodController) Add(obj interface{}) error {
switch p.Status.Phase {
case "Succeeded", "Failed":
go c.cleanupPod(p)
case "Pending", "Running":
c.logger.Start(p.ObjectMeta.Namespace, p.ObjectMeta.Name, c.start)
// case "Pending", "Running":
// c.logger.Start(p.ObjectMeta.Namespace, p.ObjectMeta.Name, c.start)
}
return nil
@ -151,121 +147,121 @@ func assertPod(v interface{}) (*ac.Pod, error) {
// return nil
// }
type podLogger struct {
provider *Provider
streams sync.Map
}
// type podLogger struct {
// provider *Provider
// streams sync.Map
// }
func NewPodLogger(p *Provider) *podLogger {
return &podLogger{provider: p}
}
// func NewPodLogger(p *Provider) *podLogger {
// return &podLogger{provider: p}
// }
func (l *podLogger) Start(namespace, pod string, start time.Time) {
key := fmt.Sprintf("%s:%s", namespace, pod)
// func (l *podLogger) Start(namespace, pod string, start time.Time) {
// key := fmt.Sprintf("%s:%s", namespace, pod)
ctx, cancel := context.WithCancel(context.Background())
// ctx, cancel := context.WithCancel(context.Background())
if _, exists := l.streams.LoadOrStore(key, cancel); !exists {
go l.watch(ctx, namespace, pod, start)
}
}
// if _, exists := l.streams.LoadOrStore(key, cancel); !exists {
// go l.watch(ctx, namespace, pod, start)
// }
// }
func (l *podLogger) Stop(namespace, pod string) {
key := fmt.Sprintf("%s:%s", namespace, pod)
// func (l *podLogger) Stop(namespace, pod string) {
// key := fmt.Sprintf("%s:%s", namespace, pod)
if cv, ok := l.streams.Load(key); ok {
if cfn, ok := cv.(context.CancelFunc); ok {
cfn()
}
l.streams.Delete(key)
}
}
// if cv, ok := l.streams.Load(key); ok {
// if cfn, ok := cv.(context.CancelFunc); ok {
// cfn()
// }
// l.streams.Delete(key)
// }
// }
func (l *podLogger) stream(ch chan string, namespace, pod string, start time.Time) {
defer close(ch)
// func (l *podLogger) stream(ch chan string, namespace, pod string, start time.Time) {
// defer close(ch)
since := am.NewTime(start)
// since := am.NewTime(start)
for {
lopts := &ac.PodLogOptions{
Follow: true,
SinceTime: &since,
Timestamps: true,
}
r, err := l.provider.Cluster.CoreV1().Pods(namespace).GetLogs(pod, lopts).Stream()
if err != nil {
fmt.Printf("err = %+v\n", err)
break
}
// for {
// lopts := &ac.PodLogOptions{
// Follow: true,
// SinceTime: &since,
// Timestamps: true,
// }
// r, err := l.provider.Cluster.CoreV1().Pods(namespace).GetLogs(pod, lopts).Stream()
// if err != nil {
// fmt.Printf("err = %+v\n", err)
// break
// }
s := bufio.NewScanner(r)
// s := bufio.NewScanner(r)
s.Buffer(make([]byte, ScannerStartSize), ScannerMaxSize)
// s.Buffer(make([]byte, ScannerStartSize), ScannerMaxSize)
for s.Scan() {
line := s.Text()
// for s.Scan() {
// line := s.Text()
if ts, err := time.Parse(time.RFC3339Nano, strings.Split(line, " ")[0]); err == nil {
since = am.NewTime(ts)
}
// if ts, err := time.Parse(time.RFC3339Nano, strings.Split(line, " ")[0]); err == nil {
// since = am.NewTime(ts)
// }
ch <- line
}
// ch <- line
// }
if err := s.Err(); err != nil {
fmt.Printf("err = %+v\n", err)
continue
}
// if err := s.Err(); err != nil {
// fmt.Printf("err = %+v\n", err)
// continue
// }
break
}
}
// break
// }
// }
func (l *podLogger) watch(ctx context.Context, namespace, pod string, start time.Time) {
defer l.Stop(namespace, pod)
// func (l *podLogger) watch(ctx context.Context, namespace, pod string, start time.Time) {
// defer l.Stop(namespace, pod)
ch := make(chan string)
// ch := make(chan string)
var p *ac.Pod
var err error
// var p *ac.Pod
// var err error
for {
p, err = l.provider.Cluster.CoreV1().Pods(namespace).Get(pod, am.GetOptions{})
if err != nil {
fmt.Printf("err = %+v\n", err)
return
}
// for {
// p, err = l.provider.Cluster.CoreV1().Pods(namespace).Get(pod, am.GetOptions{})
// if err != nil {
// fmt.Printf("err = %+v\n", err)
// return
// }
if p.Status.Phase != "Pending" {
break
}
// if p.Status.Phase != "Pending" {
// break
// }
time.Sleep(1 * time.Second)
}
// time.Sleep(1 * time.Second)
// }
app := p.ObjectMeta.Labels["app"]
typ := p.ObjectMeta.Labels["type"]
name := p.ObjectMeta.Labels["name"]
// app := p.ObjectMeta.Labels["app"]
// typ := p.ObjectMeta.Labels["type"]
// name := p.ObjectMeta.Labels["name"]
if typ == "process" {
typ = "service"
}
// if typ == "process" {
// typ = "service"
// }
go l.stream(ch, namespace, pod, start)
// go l.stream(ch, namespace, pod, start)
for {
select {
case <-ctx.Done():
return
case log, ok := <-ch:
if !ok {
return
}
if parts := strings.SplitN(log, " ", 2); len(parts) == 2 {
if ts, err := time.Parse(time.RFC3339Nano, parts[0]); err == nil {
l.provider.Engine.Log(app, fmt.Sprintf("%s/%s/%s", typ, name, pod), ts, parts[1])
}
}
}
}
}
// for {
// select {
// case <-ctx.Done():
// return
// case log, ok := <-ch:
// if !ok {
// return
// }
// if parts := strings.SplitN(log, " ", 2); len(parts) == 2 {
// if ts, err := time.Parse(time.RFC3339Nano, parts[0]); err == nil {
// l.provider.Engine.Log(app, fmt.Sprintf("%s/%s/%s", typ, name, pod), ts, parts[1])
// }
// }
// }
// }
// }