port log streaming changes

This commit is contained in:
David Dollar 2019-10-14 13:58:37 -04:00
parent b9edebbc19
commit 469856e130
No known key found for this signature in database
GPG Key ID: AFAF263FB45B2124

View File

@ -15,6 +15,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/cloudformation"
"github.com/aws/aws-sdk-go/service/cloudformation/cloudformationiface"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
@ -346,20 +347,12 @@ func CloudWatchLogsSubscribe(ctx context.Context, cw cloudwatchlogsiface.CloudWa
func CloudWatchLogsStream(ctx context.Context, cw cloudwatchlogsiface.CloudWatchLogsAPI, w io.WriteCloser, group, stream string, opts structs.LogsOptions) error {
defer w.Close()
streams := map[string]bool{}
req := &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: aws.String(group),
}
if stream != "" {
req.LogStreamNames = []*string{aws.String(stream)}
} else {
req.Interleaved = aws.Bool(true)
}
if opts.Filter != nil {
req.FilterPattern = aws.String(*opts.Filter)
}
var start int64
if opts.Since != nil {
@ -367,6 +360,26 @@ func CloudWatchLogsStream(ctx context.Context, cw cloudwatchlogsiface.CloudWatch
req.StartTime = aws.Int64(start)
}
if stream != "" {
req.LogStreamNames = []*string{aws.String(stream)}
streams[stream] = true
} else {
req.Interleaved = aws.Bool(true)
ss, err := logStreamsSince(ctx, cw, group, start)
if err != nil {
return err
}
req.LogStreamNames = ss
}
if opts.Filter != nil {
req.FilterPattern = aws.String(*opts.Filter)
}
follow := DefaultBool(opts.Follow, true)
var seen = map[string]bool{}
sleep := time.Duration(100 * time.Millisecond)
@ -423,18 +436,32 @@ func CloudWatchLogsStream(ctx context.Context, cw cloudwatchlogsiface.CloudWatch
return err
}
if res.NextToken != nil {
done := len(res.SearchedLogStreams) > 0
for _, s := range res.SearchedLogStreams {
if s.SearchedCompletely == nil || !*s.SearchedCompletely {
done = false
}
}
if !done && res.NextToken != nil {
req.NextToken = res.NextToken
continue
}
req.NextToken = nil
if !DefaultBool(opts.Follow, true) {
if !follow {
return nil
}
req.NextToken = nil
req.StartTime = aws.Int64(start)
ss, err := logStreamsSince(ctx, cw, group, start)
if err != nil {
return err
}
req.LogStreamNames = ss
}
}
}
@ -443,6 +470,53 @@ func awscli(args ...string) ([]byte, error) {
return exec.Command("aws", args...).CombinedOutput()
}
func logStreamsSince(ctx context.Context, cw cloudwatchlogsiface.CloudWatchLogsAPI, group string, since int64) ([]*string, error) {
params := &cloudwatchlogs.DescribeLogStreamsInput{
Descending: aws.Bool(true),
LogGroupName: aws.String(group),
OrderBy: aws.String("LastEventTime"),
}
p := request.Pagination{
NewRequest: func() (*request.Request, error) {
req, _ := cw.DescribeLogStreamsRequest(params)
req.SetContext(ctx)
return req, nil
},
}
streams := []*string{}
done := false
for p.Next() {
if p.Err() != nil {
return nil, p.Err()
}
page := p.Page().(*cloudwatchlogs.DescribeLogStreamsOutput)
for _, stream := range page.LogStreams {
if stream.LastEventTimestamp == nil || stream.LogStreamName == nil {
continue
}
// last event timestamp takes a long time to update
if since > 0 && *stream.LastEventTimestamp < (since-86400000) {
done = true
break
}
streams = append(streams, aws.String(*stream.LogStreamName))
}
if done {
break
}
}
return streams, nil
}
func setupCredentialsStatic() (map[string]string, error) {
rb, err := awscli("configure", "get", "region")
if err != nil {