chore: remove unused dev/sg/internal/loki (#61923)

I don't believe this is used anymore, and given we don't use Loki in grafana cloud anymore either

## Test plan

Unused internal-only code
This commit is contained in:
Noah S-C 2024-04-16 13:19:32 +01:00 committed by GitHub
parent bfc9598ed3
commit 9d530f9752
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 6 additions and 558 deletions

View File

@ -12,7 +12,3 @@ datasources:
type: jaeger
access: proxy
url: http://host.docker.internal:16686/-/debug/jaeger
- name: Loki
type: loki
access: proxy
url: http://host.docker.internal:3100

View File

@ -14,7 +14,3 @@ datasources:
type: jaeger
access: proxy
url: http://127.0.0.1:16686/-/debug/jaeger
- name: Loki
type: loki
access: proxy
url: http://127.0.0.1:3100

View File

@ -13,7 +13,6 @@ go_library(
"//dev/ci/runtype",
"//dev/sg/internal/bk",
"//dev/sg/internal/category",
"//dev/sg/internal/loki",
"//dev/sg/internal/open",
"//dev/sg/internal/repo",
"//dev/sg/internal/run",

View File

@ -1,8 +1,9 @@
package ci
import (
"github.com/sourcegraph/sourcegraph/dev/sg/internal/category"
"github.com/urfave/cli/v2"
"github.com/sourcegraph/sourcegraph/dev/sg/internal/category"
)
var (
@ -58,9 +59,6 @@ sg ci status --build 123456
# Pull logs of failed jobs to stdout
sg ci logs
# Push logs of most recent main failure to local Loki for analysis
# You can spin up a Loki instance with 'sg run loki grafana'
sg ci logs --branch main --out http://127.0.0.1:3100
# Get the logs for a specific build number, useful when debugging
sg ci logs --build 123456

View File

@ -3,8 +3,6 @@ package ci
import (
"encoding/json"
"fmt"
"net/url"
"os"
"os/exec"
"strconv"
"strings"
@ -18,7 +16,6 @@ import (
"github.com/sourcegraph/sourcegraph/dev/ci/runtype"
"github.com/sourcegraph/sourcegraph/dev/sg/internal/bk"
"github.com/sourcegraph/sourcegraph/dev/sg/internal/loki"
"github.com/sourcegraph/sourcegraph/dev/sg/internal/open"
"github.com/sourcegraph/sourcegraph/dev/sg/internal/repo"
"github.com/sourcegraph/sourcegraph/dev/sg/internal/run"
@ -457,12 +454,9 @@ sg ci build docker-images-candidates-notest
var logsCommand = &cli.Command{
Name: "logs",
Usage: "Get logs from CI builds (e.g. to grep locally)",
Description: `Get logs from CI builds, and output them in stdout or push them to Loki. By default only gets failed jobs - to change this, use the '--state' flag.
Description: `Get logs from CI builds, and output them in stdout. By default only gets failed jobs - to change this, use the '--state' flag.
The '--job' flag can be used to narrow down the logs returned - you can provide either the ID, or part of the name of the job you want to see logs for.
To send logs to a Loki instance, you can provide --out=http://127.0.0.1:3100 after spinning up an instance with 'sg run loki grafana'.
From there, you can start exploring logs with the Grafana explore panel.
`,
Flags: append(ciTargetFlags,
&cli.StringFlag{
@ -479,8 +473,8 @@ From there, you can start exploring logs with the Grafana explore panel.
&cli.StringFlag{
Name: "out",
Aliases: []string{"o"},
Usage: fmt.Sprintf("Output `format`: one of [%s], or a URL pointing to a Loki instance, such as %s",
strings.Join([]string{ciLogsOutTerminal, ciLogsOutSimple, ciLogsOutJSON}, "|"), loki.DefaultLokiURL),
Usage: fmt.Sprintf("Output `format`: one of [%s]",
strings.Join([]string{ciLogsOutTerminal, ciLogsOutSimple, ciLogsOutJSON}, "|")),
Value: ciLogsOutTerminal,
},
&cli.StringFlag{
@ -544,11 +538,7 @@ From there, you can start exploring logs with the Grafana explore panel.
failed := logsOut
log.JobMeta.State = &failed
}
stream, err := loki.NewStreamFromJobLogs(log)
if err != nil {
return errors.Newf("build %d job %s: NewStreamFromJobLogs: %s", log.JobMeta.Build, log.JobMeta.Job, err)
}
b, err := json.MarshalIndent(stream, "", "\t")
b, err := json.MarshalIndent(log, "", "\t")
if err != nil {
return errors.Newf("build %d job %s: Marshal: %s", log.JobMeta.Build, log.JobMeta.Job, err)
}
@ -556,73 +546,6 @@ From there, you can start exploring logs with the Grafana explore panel.
}
default:
lokiURL, err := url.Parse(logsOut)
if err != nil {
return errors.Newf("invalid Loki target: %w", err)
}
lokiClient := loki.NewLokiClient(lokiURL)
std.Out.WriteLine(output.Styledf(output.StylePending, "Pushing to Loki instance at %q", lokiURL.Host))
var (
pushedEntries int
pushedStreams int
pushErrs []string
pending = std.Out.Pending(output.Styled(output.StylePending, "Processing logs..."))
)
for i, log := range logs {
job := log.JobMeta.Job
if log.JobMeta.Label != nil {
job = fmt.Sprintf("%q (%s)", *log.JobMeta.Label, log.JobMeta.Job)
}
overwriteState := cmd.String("overwrite-state")
if overwriteState != "" {
failed := overwriteState
log.JobMeta.State = &failed
}
pending.Updatef("Processing build %d job %s (%d/%d)...",
log.JobMeta.Build, job, i, len(logs))
stream, err := loki.NewStreamFromJobLogs(log)
if err != nil {
pushErrs = append(pushErrs, fmt.Sprintf("build %d job %s: %s",
log.JobMeta.Build, job, err))
continue
}
// Set buildkite metadata if available
if ciBranch := os.Getenv("BUILDKITE_BRANCH"); ciBranch != "" {
stream.Stream.Branch = ciBranch
}
if ciQueue := os.Getenv("BUILDKITE_AGENT_META_DATA_QUEUE"); ciQueue != "" {
stream.Stream.Queue = ciQueue
}
err = lokiClient.PushStreams(ctx, []*loki.Stream{stream})
if err != nil {
pushErrs = append(pushErrs, fmt.Sprintf("build %d job %q: %s",
log.JobMeta.Build, job, err))
continue
}
pushedEntries += len(stream.Values)
pushedStreams += 1
}
if pushedEntries > 0 {
pending.Complete(output.Linef(output.EmojiSuccess, output.StyleSuccess,
"Pushed %d entries from %d streams to Loki", pushedEntries, pushedStreams))
} else {
pending.Destroy()
}
if pushErrs != nil {
failedStreams := len(logs) - pushedStreams
std.Out.WriteLine(output.Linef(output.EmojiFailure, output.StyleWarning,
"Failed to push %d streams: \n - %s", failedStreams, strings.Join(pushErrs, "\n - ")))
if failedStreams == len(logs) {
return errors.New("failed to push all logs")
}
}
}
return nil

View File

@ -1,26 +0,0 @@
load("//dev:go_defs.bzl", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "loki",
srcs = ["loki.go"],
importpath = "github.com/sourcegraph/sourcegraph/dev/sg/internal/loki",
visibility = ["//dev/sg:__subpackages__"],
deps = [
"//dev/sg/internal/bk",
"//lib/errors",
"@com_github_grafana_regexp//:regexp",
],
)
go_test(
name = "loki_test",
timeout = "short",
srcs = ["loki_test.go"],
embed = [":loki"],
deps = [
"//dev/sg/internal/bk",
"//internal/randstring",
"@com_github_google_go_cmp//cmp",
],
)

View File

@ -1,227 +0,0 @@
package loki
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"net/url"
"strconv"
"strings"
"github.com/grafana/regexp"
"github.com/sourcegraph/sourcegraph/dev/sg/internal/bk"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
const pushEndpoint = "/loki/api/v1/push"
// On Grafana cloud Loki rejects log entries that are longer that 65536 bytes.
const maxEntrySize = math.MaxUint16
// To point at a custom instance, e.g. one on Grafana Cloud, refer to:
// https://grafana.com/orgs/sourcegraph/hosted-logs/85581#sending-logs
// The URL should have the format https://85581:$TOKEN@logs-prod-us-central1.grafana.net
const DefaultLokiURL = "http://127.0.0.1:3100"
// Stream is the Loki logs equivalent of a metric series.
type Stream struct {
// Labels map identifying a stream
Stream StreamLabels `json:"stream"`
// ["<unix epoch in nanoseconds>"", "<log line>"] value pairs
Values [][2]string `json:"values"`
}
// StreamLabels is an identifier for a Loki log stream, denoted by a set of labels.
//
// NOTE: bk.JobMeta is very high-cardinality, since we create a new stream for each job.
// Similarly to Prometheus, Loki is not designed to handle very high cardinality log streams.
// However, it is important that each job gets a separate stream, because Loki does not
// permit non-chronologically uploaded logs, so simultaneous jobs logs will collide.
// NewStreamFromJobLogs handles this within a job by merging entries with the same timestamp.
// Possible routes for investigation:
// - https://grafana.com/docs/loki/latest/operations/storage/retention/
// - https://grafana.com/docs/loki/latest/operations/storage/table-manager/
type StreamLabels struct {
bk.JobMeta
// Distinguish from other log streams
App string `json:"app"`
Component string `json:"component"`
// Additional metadata for CI when pushing
Branch string `json:"branch"`
Queue string `json:"queue"`
}
// NewStreamFromJobLogs cleans the given log data, splits it into log entries, merges
// entries with the same timestamp, and returns a Stream that can be pushed to Loki.
func NewStreamFromJobLogs(log *bk.JobLogs) (*Stream, error) {
stream := StreamLabels{
JobMeta: log.JobMeta,
App: "buildkite",
Component: "build-logs",
}
cleanedContent := bk.CleanANSI(*log.Content)
// seems to be some kind of buildkite line separator, followed by a timestamp
const bkTimestampSeparator = "_bk;"
if len(cleanedContent) == 0 {
return &Stream{
Stream: stream,
Values: make([][2]string, 0),
}, nil
}
if !strings.Contains(cleanedContent, bkTimestampSeparator) {
return nil, errors.Newf("log content does not contain Buildkite timestamps, denoted by %q", bkTimestampSeparator)
}
lines := strings.Split(cleanedContent, bkTimestampSeparator)
// parse lines into loki log entries
values := make([][2]string, 0, len(lines))
var previousTimestamp string
timestamp := regexp.MustCompile(`t=(?P<ts>\d{13})`) // 13 digits for unix epoch in nanoseconds
for _, line := range lines {
line = strings.TrimSpace(line)
if len(line) < 3 {
continue // ignore irrelevant lines
}
tsMatches := timestamp.FindStringSubmatch(line)
if len(tsMatches) == 0 {
return nil, errors.Newf("no timestamp on line %q", line)
}
line = strings.TrimSpace(strings.Replace(line, tsMatches[0], "", 1))
if len(line) < 3 {
continue // ignore irrelevant lines
}
ts := strings.Replace(tsMatches[0], "t=", "", 1)
if ts == previousTimestamp {
value := values[len(values)-1]
value[1] = value[1] + fmt.Sprintf("\n%s", line)
// Check that the current entry is not larger than maxEntrySize (65536) in bytes.
// If it is, we take the entry split into chunks of maxEntrySize bytes.
//
// To ensure that each chunked entry doesn't clash with a previous entry in Loki, the nanoseconds of
// each entry is incremented by 1 for each chunked entry.
chunkedEntries, err := chunkEntry(value, maxEntrySize)
if err != nil {
return nil, errors.Wrapf(err, "failed to split value entry into chunks")
}
// replace the value we split into chunks with the first chunk 0, then add the rest
values[len(values)-1] = chunkedEntries[0]
if len(chunkedEntries) > 1 {
values = append(values, chunkedEntries[1:]...)
}
} else {
// buildkite timestamps are in ms, so convert to ns with a lot of zeros
value := [2]string{ts + "000000", line}
chunkedEntries, err := chunkEntry(value, maxEntrySize)
if err != nil {
return nil, errors.Wrapf(err, "failed to split value entry into chunks")
}
values = append(values, chunkedEntries...)
previousTimestamp = ts
}
}
return &Stream{
Stream: stream,
Values: values,
}, nil
}
func chunkEntry(entry [2]string, chunkSize int) ([][2]string, error) {
if len(entry[1]) < chunkSize {
return [][2]string{entry}, nil
}
// the first item in an entry is the timestamp
epoch, err := strconv.ParseInt(entry[0], 10, 64)
if err != nil {
return nil, err
}
// TODO(burmudar): Use runes instead since with bytes we might split on a UTF-8 char
chunks := splitIntoChunks([]byte(entry[1]), chunkSize)
results := make([][2]string, len(chunks))
for i, c := range chunks {
ts := fmt.Sprintf("%d", epoch+int64(i))
results[i] = [2]string{ts, string(c)}
}
return results, nil
}
func splitIntoChunks(data []byte, chunkSize int) [][]byte {
count := math.Ceil(float64(len(data)) / float64(chunkSize))
if count <= 1 {
return [][]byte{data}
}
chunks := make([][]byte, int(count))
for i := range int(count) {
start := i * chunkSize
end := start + chunkSize
if end <= len(data) {
chunks[i] = data[start:end]
} else {
chunks[i] = data[start:]
}
}
return chunks
}
// https://grafana.com/docs/loki/latest/api/#post-lokiapiv1push
type jsonPushBody struct {
Streams []*Stream `json:"streams"`
}
type Client struct {
lokiURL *url.URL
}
func NewLokiClient(lokiURL *url.URL) *Client {
return &Client{lokiURL}
}
func (c *Client) PushStreams(ctx context.Context, streams []*Stream) error {
body, err := json.Marshal(&jsonPushBody{Streams: streams})
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, c.lokiURL.String()+pushEndpoint, bytes.NewBuffer(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
b, _ := io.ReadAll(resp.Body)
defer resp.Body.Close()
// Stream already published
if strings.Contains(string(b), "entry out of order") {
return nil
}
return errors.Newf("unexpected status code %d: %s", resp.StatusCode, string(b))
}
return nil
}

View File

@ -1,180 +0,0 @@
package loki
import (
"bytes"
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/sourcegraph/sourcegraph/dev/sg/internal/bk"
"github.com/sourcegraph/sourcegraph/internal/randstring"
)
func TestChunkEntry(t *testing.T) {
ts := time.Now().UnixNano()
line := "0123456789"
entry := [2]string{fmt.Sprintf("%d", ts), line}
results, err := chunkEntry(entry, 2)
if err != nil {
t.Error(err)
}
if len(results) != len(line)/2 {
t.Errorf("Got %d chunks wanted %d", len(results), len(line)/2)
}
allLines := bytes.NewBuffer(nil)
for i := range len(results) {
expectedTs := fmt.Sprintf("%d", ts+int64(i))
if results[i][0] != expectedTs {
t.Errorf("wrong timestamp at %d. Got %s wanted %s", i, results[i][0], expectedTs)
}
allLines.WriteString(results[i][1])
}
if allLines.String() != line {
t.Errorf("reconstructed chunked line differs from original line. Got %q wanted %q", allLines.String(), line)
}
}
func TestSplitIntoChunks(t *testing.T) {
t.Run("general split into chunks", func(t *testing.T) {
line := randstring.NewLen(100)
result := splitIntoChunks([]byte(line), 10)
if len(result) != 10 {
t.Errorf("expected string of size 100 to be split into 10 chunks. Got %d wanted %d", len(result), 10)
}
})
t.Run("chunk size larger than string", func(t *testing.T) {
line := randstring.NewLen(100)
result := splitIntoChunks([]byte(line), len(line)+1)
if len(result) != 1 {
t.Errorf("expected string of size 100 to be split into 10 chunks. Got %d wanted %d", len(result), 1)
}
})
t.Run("line size larger by 1 than chunk size", func(t *testing.T) {
line := randstring.NewLen(100)
result := splitIntoChunks([]byte(line), 99)
if len(result) != 2 {
t.Errorf("expected string of size 100 to be split into 10 chunks. Got %d wanted %d", len(result), 2)
}
})
t.Run("check chunk content", func(t *testing.T) {
line := "123456789"
result := splitIntoChunks([]byte(line), 5)
if bytes.Compare(result[0], []byte("12345")) != 0 {
t.Errorf("incorrect chunk content for 0 idx. Got %s wanted %s", string(result[0]), "12345")
}
if bytes.Compare(result[1], []byte("6789")) != 0 {
t.Errorf("incorrect chunk content for 0 idx. Got %s wanted %s", string(result[0]), "12345")
}
})
t.Run("chunk sizes", func(t *testing.T) {
line := randstring.NewLen(1337)
results := splitIntoChunks([]byte(line), 1024)
for i, r := range results {
if len(r) > 1024 {
t.Errorf("incorrect sized chunk found at %d with size %d", i, len(r))
}
}
})
}
func TestNewStreamFromJobLogs(t *testing.T) {
type args struct {
log string
}
tests := []struct {
name string
args args
want [][2]string
wantErr bool
}{
{
name: "parse empty content",
args: args{
log: ``,
},
want: [][2]string{},
},
{
name: "parse invalid line",
args: args{
log: `~~~ Preparing working directory`,
},
wantErr: true,
},
{
name: "parse line",
args: args{
log: `_bk;t=1633575941106~~~ Preparing working directory`,
},
want: [][2]string{
{"1633575941106000000", "~~~ Preparing working directory"},
},
},
{
name: "merge timestamps",
args: args{
log: `_bk;t=1633575941106~~~ Preparing working directory
_bk;t=1633575941106$ cd /buildkite/builds/buildkite-agent-77bfc969fc-4zfqc-1/sourcegraph/sourcegraph
_bk;t=1633575941112$ git remote set-url origin git@github.com:sourcegraph/sourcegraph.git
_bk;t=1633575946276remote: Enumerating objects: 25, done._bk;t=1633575947202
_bk;t=1633575947202remote: Counting objects: 4% (1/25)_bk;t=1633575947202
remote: Counting objects: 8% (2/25)_bk;t=1633575947202`,
},
want: [][2]string{
{"1633575941106000000", "~~~ Preparing working directory\n$ cd /buildkite/builds/buildkite-agent-77bfc969fc-4zfqc-1/sourcegraph/sourcegraph"},
{"1633575941112000000", "$ git remote set-url origin git@github.com:sourcegraph/sourcegraph.git"},
{"1633575946276000000", "remote: Enumerating objects: 25, done."},
{"1633575947202000000", "remote: Counting objects: 4% (1/25)\nremote: Counting objects: 8% (2/25)"},
},
},
{
name: "weird ansi things",
args: args{
log: `_bk;t=16335759518222021-10-07 03:05:51 INFO  Updating BUILDKITE_COMMIT to "d4b6e13eab2216ea2a934607df5c97a25e920207"
_bk;t=16335759518382021-10-07 03:05:54 INFO  Successfully uploaded and parsed pipeline config`,
},
want: [][2]string{
{"1633575951822000000", "2021-10-07 03:05:51 INFO Updating BUILDKITE_COMMIT to \"d4b6e13eab2216ea2a934607df5c97a25e920207\""},
{"1633575951838000000", "2021-10-07 03:05:54 INFO Successfully uploaded and parsed pipeline config"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewStreamFromJobLogs(&bk.JobLogs{
JobMeta: bk.JobMeta{Job: tt.name},
Content: &tt.args.log,
})
if (err != nil) != tt.wantErr {
t.Errorf("NewStreamFromJobLogs() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tt.wantErr {
return
}
if diff := cmp.Diff(tt.want, got.Values); diff != "" {
t.Fatalf("(-want +got):\n%s", diff)
}
})
}
}

View File

@ -14,7 +14,3 @@ datasources:
type: jaeger
access: proxy
url: http://127.0.0.1:16686/-/debug/jaeger
- name: Loki
type: loki
access: proxy
url: http://127.0.0.1:3100

View File

@ -936,22 +936,6 @@ commands:
cmd: echo "monitoring-generator is deprecated, please run 'sg generate go' or 'bazel run //dev:write_all_generated' instead"
env:
loki:
cmd: |
echo "Loki: serving on http://localhost:3100"
echo "Loki: note that logs are piped to ${LOKI_LOG_FILE}"
docker run --rm --name=loki \
-p 3100:3100 -v $LOKI_DISK:/loki \
index.docker.io/grafana/loki:$LOKI_VERSION >"${LOKI_LOG_FILE}" 2>&1
install: |
mkdir -p "${LOKI_DISK}"
mkdir -p "$(dirname ${LOKI_LOG_FILE})"
docker pull index.docker.io/grafana/loki:$LOKI_VERSION
env:
LOKI_DISK: $HOME/.sourcegraph-dev/data/loki
LOKI_VERSION: '2.3.0'
LOKI_LOG_FILE: $HOME/.sourcegraph-dev/logs/loki/loki.log
otel-collector:
install: |
bazel build //docker-images/opentelemetry-collector:image_tarball
@ -1158,17 +1142,6 @@ dockerCommands:
# docker containers must access things via docker host on non-linux platforms
CACHE: false
loki:
logfile: $HOME/.sourcegraph-dev/logs/loki/loki.log
docker:
image: index.docker.io/grafana/loki:2.3.0
pull: true
ports:
- 3100
volumes:
- from: $HOME/.sourcegraph-dev/data/loki
to: /loki
otel-collector:
target: //docker-images/opentelemetry-collector:image_tarball
description: OpenTelemetry collector