chore/msp-example: refactor to align with service structure best practices (#62954)

In monorepo:

```
cmd/my-service/main.go
-> cmd/my-service/service
-> cmd/my-service/internal/...
```

Outside monorepo a similar unnested structure aligns with this as well:

```
cmd/my-service <- command
service/...    <- entrypoint
internal/...   <- internal implementation
```


## Test plan

Basic example builds and runs: `sg run msp-example`
This commit is contained in:
Robert Lin 2024-05-29 09:58:43 -07:00 committed by GitHub
parent 2ecc169999
commit 17a5fdb1d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 441 additions and 314 deletions

View File

@ -11,7 +11,7 @@ go_library(
tags = [TAG_INFRA_CORESERVICES],
visibility = ["//visibility:private"],
deps = [
"//cmd/msp-example/internal/example",
"//cmd/msp-example/service",
"//lib/managedservicesplatform/runtime",
],
)

View File

@ -0,0 +1,14 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "bigquery",
srcs = ["bigquery.go"],
importpath = "github.com/sourcegraph/sourcegraph/cmd/msp-example/internal/bigquery",
visibility = ["//cmd/msp-example:__subpackages__"],
deps = [
"//lib/errors",
"//lib/managedservicesplatform/bigquerywriter",
"//lib/managedservicesplatform/runtime",
"@com_google_cloud_go_bigquery//:bigquery",
],
)

View File

@ -1,4 +1,4 @@
package example
package bigquery
import (
"context"
@ -8,17 +8,26 @@ import (
"cloud.google.com/go/bigquery"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/bigquerywriter"
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/runtime"
)
func writeBigQueryEvent(ctx context.Context, contract runtime.Contract, eventName string) error {
type Client struct {
w *bigquerywriter.Writer
}
func NewClient(ctx context.Context, contract runtime.Contract) (*Client, error) {
bq, err := contract.BigQuery.GetTableWriter(ctx, "example")
if err != nil {
return errors.Wrap(err, "BigQuery.GetTableWriter")
return nil, errors.Wrap(err, "BigQuery.GetTableWriter")
}
defer func() { _ = bq.Close() }()
return &Client{bq}, nil
}
return bq.Write(ctx, bigQueryEntry{
func (c *Client) Close() error { return c.w.Close() }
func (c *Client) Write(ctx context.Context, eventName string) error {
return c.w.Write(ctx, bigQueryEntry{
Name: eventName,
CreatedAt: time.Now(),
})

View File

@ -1,31 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "example",
srcs = [
"bigquery.go",
"diagnostics.go",
"example.go",
"metrics.go",
"postgresql.go",
"redis.go",
],
importpath = "github.com/sourcegraph/sourcegraph/cmd/msp-example/internal/example",
tags = [TAG_INFRA_CORESERVICES],
visibility = [
"//cmd/msp-example:__pkg__",
"//lib/managedservicesplatform/example:__subpackages__",
],
deps = [
"//internal/version",
"//lib/background",
"//lib/errors",
"//lib/managedservicesplatform/runtime",
"@com_github_go_redis_redis_v8//:redis",
"@com_github_sourcegraph_log//:log",
"@com_google_cloud_go_bigquery//:bigquery",
"@io_opentelemetry_go_contrib_instrumentation_net_http_otelhttp//:otelhttp",
"@io_opentelemetry_go_otel//:otel",
"@io_opentelemetry_go_otel_metric//:metric",
],
)

View File

@ -1,32 +0,0 @@
package example
import (
"context"
"net/url"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/runtime"
)
type serviceState struct {
statelessMode bool
contract runtime.Contract
}
func (s serviceState) Healthy(ctx context.Context, _ url.Values) error {
if s.statelessMode {
return nil
}
// Write a single test event
if err := writeBigQueryEvent(ctx, s.contract, "service.healthy"); err != nil {
return errors.Wrap(err, "writeBigQueryEvent")
}
// Check redis connection
if err := testRedisConnection(ctx, s.contract); err != nil {
return errors.Wrap(err, "newRedisConnection")
}
return nil
}

View File

@ -1,186 +0,0 @@
package example
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"
"time"
"github.com/sourcegraph/log"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"github.com/sourcegraph/sourcegraph/internal/version"
"github.com/sourcegraph/sourcegraph/lib/background"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/runtime"
)
type Config struct {
StatelessMode bool
Variable string
}
func (c *Config) Load(env *runtime.Env) {
c.StatelessMode = env.GetBool("STATELESS_MODE", "false", "if true, disable dependencies")
c.Variable = env.Get("VARIABLE", "13", "variable value")
}
type Service struct{}
var _ runtime.Service[Config] = Service{}
func (s Service) Name() string { return "example" }
func (s Service) Version() string { return version.Version() }
func (s Service) Initialize(
ctx context.Context,
logger log.Logger,
contract runtime.Contract,
config Config,
) (background.Routine, error) {
logger.Info("starting service")
if !config.StatelessMode {
if err := initPostgreSQL(ctx, contract); err != nil {
return nil, errors.Wrap(err, "initPostgreSQL")
}
logger.Info("postgresql connection success")
if err := writeBigQueryEvent(ctx, contract, "service.initialized"); err != nil {
return nil, errors.Wrap(err, "writeBigQueryEvent")
}
logger.Info("bigquery connection success")
if err := testRedisConnection(ctx, contract); err != nil {
return nil, errors.Wrap(err, "newRedisConnection")
}
logger.Info("redis connection success")
}
requestCounter, err := getRequestCounter()
if err != nil {
return nil, err
}
h := http.NewServeMux()
h.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCounter.Add(r.Context(), 1)
_, _ = w.Write([]byte(fmt.Sprintf("Variable: %s", config.Variable)))
}))
// Test endpoint for making CURL requests to arbitrary targets from this
// service, for testing networking. Requires diagnostic auth.
h.Handle("/proxy", contract.Diagnostics.DiagnosticsAuthMiddleware(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
host := r.URL.Query().Get("host")
if host == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("query parameter 'host' is required"))
return
}
hostURL, err := url.Parse(host)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
path := r.URL.Query().Get("path")
if path == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("query parameter 'path' is required"))
return
}
insecure, _ := strconv.ParseBool(r.URL.Query().Get("insecure"))
// Copy the request body and build the request
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
proxiedRequest, err := http.NewRequest(r.Method, "/"+path, bytes.NewReader(body))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
// Copy relevant request headers after stripping their prefixes
for k, vs := range r.Header {
if strings.HasPrefix(k, "X-Proxy-") {
for _, v := range vs {
proxiedRequest.Header.Add(strings.TrimPrefix(k, "X-Proxy-"), v)
}
}
}
// Send to target
proxy := httputil.NewSingleHostReverseProxy(hostURL)
if insecure {
customTransport := http.DefaultTransport.(*http.Transport).Clone()
customTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
proxy.Transport = customTransport
}
proxy.ServeHTTP(w, proxiedRequest)
}),
))
contract.Diagnostics.RegisterDiagnosticsHandlers(h, serviceState{
statelessMode: config.StatelessMode,
contract: contract,
})
return background.CombinedRoutine{
&httpRoutine{
log: logger,
Server: &http.Server{
Addr: fmt.Sprintf(":%d", contract.Port),
Handler: otelhttp.NewHandler(h, "http",
otelhttp.WithSpanNameFormatter(func(operation string, r *http.Request) string {
// If incoming, just include the path since our own host is not
// very interesting. If outgoing, include the host as well.
target := r.URL.Path
if r.RemoteAddr == "" { // no RemoteAddr indicates this is an outgoing request
target = r.Host + target
}
if operation != "" {
return fmt.Sprintf("%s.%s %s", operation, r.Method, target)
}
return fmt.Sprintf("%s %s", r.Method, target)
})),
},
},
}, nil
}
type httpRoutine struct {
log log.Logger
*http.Server
}
func (s *httpRoutine) Name() string { return "http" }
func (s *httpRoutine) Start() {
if err := s.Server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
s.log.Error("error stopping server", log.Error(err))
}
}
func (s *httpRoutine) Stop(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := s.Server.Shutdown(ctx); err != nil {
return errors.Wrap(err, "shutdown")
}
s.log.Info("server stopped")
return nil
}

View File

@ -1,28 +0,0 @@
package example
import (
"context"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/runtime"
)
// initPostgreSQL connects to a database 'primary' based on a DSN provided by
// contract, and attempts to ping it.
func initPostgreSQL(ctx context.Context, contract runtime.Contract) error {
sqlDB, err := contract.PostgreSQL.OpenDatabase(ctx, "primary")
if err != nil {
return errors.Wrap(err, "contract.GetPostgreSQLDB")
}
defer sqlDB.Close()
if err := sqlDB.PingContext(ctx); err != nil {
return errors.Wrap(err, "sqlDB.PingContext")
}
if _, err := sqlDB.ExecContext(ctx, "SELECT current_user;"); err != nil {
return errors.Wrap(err, "sqlDB.ExecContext SELECT current_user")
}
return nil
}

View File

@ -1,28 +0,0 @@
package example
import (
"context"
goredis "github.com/go-redis/redis/v8"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/runtime"
)
// testRedisConnection creates a new Redis client from the MSP contract and issues
// a PING to check the connection.
func testRedisConnection(ctx context.Context, c runtime.Contract) error {
if c.RedisEndpoint == nil {
return errors.New("no Redis endpoint provided")
}
redisOpts, err := goredis.ParseURL(*c.RedisEndpoint)
if err != nil {
return errors.Wrap(err, "invalid Redis DSN")
}
client := goredis.NewClient(redisOpts)
defer client.Close()
pong := client.Ping(ctx)
return pong.Err()
}

View File

@ -0,0 +1,16 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "httpapi",
srcs = [
"httpapi.go",
"metrics.go",
],
importpath = "github.com/sourcegraph/sourcegraph/cmd/msp-example/internal/httpapi",
visibility = ["//cmd/msp-example:__subpackages__"],
deps = [
"//lib/managedservicesplatform/runtime",
"@io_opentelemetry_go_otel//:otel",
"@io_opentelemetry_go_otel_metric//:metric",
],
)

View File

@ -0,0 +1,95 @@
package httpapi
import (
"bytes"
"crypto/tls"
"fmt"
"io"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/runtime"
)
// TODO: Demonstrate a connectrpc+gRPC example here instead.
type Config struct {
Variable string
}
func Register(h *http.ServeMux, contract runtime.Contract, config Config) error {
requestCounter, err := getRequestCounter()
if err != nil {
return err
}
h.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCounter.Add(r.Context(), 1)
_, _ = w.Write([]byte(fmt.Sprintf("Variable: %s", config.Variable)))
}))
// Test endpoint for making CURL requests to arbitrary targets from this
// service, for testing networking. Requires diagnostic auth.
h.Handle("/proxy", contract.Diagnostics.DiagnosticsAuthMiddleware(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
host := r.URL.Query().Get("host")
if host == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("query parameter 'host' is required"))
return
}
hostURL, err := url.Parse(host)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
path := r.URL.Query().Get("path")
if path == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("query parameter 'path' is required"))
return
}
insecure, _ := strconv.ParseBool(r.URL.Query().Get("insecure"))
// Copy the request body and build the request
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
proxiedRequest, err := http.NewRequest(r.Method, "/"+path, bytes.NewReader(body))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
// Copy relevant request headers after stripping their prefixes
for k, vs := range r.Header {
if strings.HasPrefix(k, "X-Proxy-") {
for _, v := range vs {
proxiedRequest.Header.Add(strings.TrimPrefix(k, "X-Proxy-"), v)
}
}
}
// Send to target
proxy := httputil.NewSingleHostReverseProxy(hostURL)
if insecure {
customTransport := http.DefaultTransport.(*http.Transport).Clone()
customTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
proxy.Transport = customTransport
}
proxy.ServeHTTP(w, proxiedRequest)
}),
))
return nil
}

View File

@ -1,4 +1,4 @@
package example
package httpapi
import (
"go.opentelemetry.io/otel"

View File

@ -0,0 +1,12 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "postgresql",
srcs = ["postgresql.go"],
importpath = "github.com/sourcegraph/sourcegraph/cmd/msp-example/internal/postgresql",
visibility = ["//cmd/msp-example:__subpackages__"],
deps = [
"//lib/errors",
"//lib/managedservicesplatform/runtime",
],
)

View File

@ -0,0 +1,35 @@
package postgresql
import (
"context"
"database/sql"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/runtime"
)
type Client struct {
db *sql.DB
}
func NewClient(ctx context.Context, contract runtime.Contract) (*Client, error) {
sqlDB, err := contract.PostgreSQL.OpenDatabase(ctx, "primary")
if err != nil {
return nil, errors.Wrap(err, "contract.GetPostgreSQLDB")
}
return &Client{sqlDB}, nil
}
func (c *Client) Ping(ctx context.Context) error {
if err := c.db.PingContext(ctx); err != nil {
return errors.Wrap(err, "sqlDB.PingContext")
}
if _, err := c.db.ExecContext(ctx, "SELECT current_user;"); err != nil {
return errors.Wrap(err, "sqlDB.ExecContext SELECT current_user")
}
return nil
}
func (c *Client) Close() error { return c.db.Close() }

View File

@ -0,0 +1,13 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "redis",
srcs = ["redis.go"],
importpath = "github.com/sourcegraph/sourcegraph/cmd/msp-example/internal/redis",
visibility = ["//cmd/msp-example:__subpackages__"],
deps = [
"//lib/errors",
"//lib/managedservicesplatform/runtime",
"@com_github_go_redis_redis_v8//:redis",
],
)

View File

@ -0,0 +1,27 @@
package redis
import (
"context"
goredis "github.com/go-redis/redis/v8"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/runtime"
)
type Client struct {
c *goredis.Client
}
func NewClient(ctx context.Context, contract runtime.Contract) (*Client, error) {
redisOpts, err := goredis.ParseURL(*contract.RedisEndpoint)
if err != nil {
return nil, errors.Wrap(err, "invalid Redis DSN")
}
return &Client{goredis.NewClient(redisOpts)}, nil
}
func (c *Client) Ping(ctx context.Context) error {
pong := c.c.Ping(ctx)
return pong.Err()
}

View File

@ -3,9 +3,9 @@ package main
import (
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/runtime"
"github.com/sourcegraph/sourcegraph/cmd/msp-example/internal/example"
"github.com/sourcegraph/sourcegraph/cmd/msp-example/service"
)
func main() {
runtime.Start(example.Service{})
runtime.Start(service.Service{})
}

View File

@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "service",
srcs = [
"config.go",
"diagnostics.go",
"service.go",
],
importpath = "github.com/sourcegraph/sourcegraph/cmd/msp-example/service",
visibility = ["//visibility:public"],
deps = [
"//cmd/msp-example/internal/bigquery",
"//cmd/msp-example/internal/httpapi",
"//cmd/msp-example/internal/postgresql",
"//cmd/msp-example/internal/redis",
"//internal/version",
"//lib/background",
"//lib/errors",
"//lib/managedservicesplatform/runtime",
"@com_github_sourcegraph_log//:log",
"@io_opentelemetry_go_contrib_instrumentation_net_http_otelhttp//:otelhttp",
],
)

View File

@ -0,0 +1,16 @@
package service
import (
"github.com/sourcegraph/sourcegraph/cmd/msp-example/internal/httpapi"
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/runtime"
)
type Config struct {
StatelessMode bool
HTTPAPI httpapi.Config
}
func (c *Config) Load(env *runtime.Env) {
c.StatelessMode = env.GetBool("STATELESS_MODE", "false", "if true, disable dependencies")
c.HTTPAPI.Variable = env.Get("VARIABLE", "13", "variable value")
}

View File

@ -0,0 +1,43 @@
package service
import (
"context"
"net/url"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/cmd/msp-example/internal/bigquery"
"github.com/sourcegraph/sourcegraph/cmd/msp-example/internal/postgresql"
"github.com/sourcegraph/sourcegraph/cmd/msp-example/internal/redis"
)
type serviceState struct {
statelessMode bool
bq *bigquery.Client
rd *redis.Client
pg *postgresql.Client
}
func (s serviceState) Healthy(ctx context.Context, _ url.Values) error {
if s.statelessMode {
return nil
}
// Write a single test event
if err := s.bq.Write(ctx, "service.healthy"); err != nil {
return errors.Wrap(err, "bigquery")
}
// Check redis connection
if err := s.rd.Ping(ctx); err != nil {
return errors.Wrap(err, "redis")
}
// Check database
if err := s.pg.Ping(ctx); err != nil {
return errors.Wrap(err, "postgresql")
}
return nil
}

View File

@ -0,0 +1,128 @@
package service
import (
"context"
"fmt"
"net/http"
"time"
"github.com/sourcegraph/log"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"github.com/sourcegraph/sourcegraph/cmd/msp-example/internal/bigquery"
"github.com/sourcegraph/sourcegraph/cmd/msp-example/internal/httpapi"
"github.com/sourcegraph/sourcegraph/cmd/msp-example/internal/postgresql"
"github.com/sourcegraph/sourcegraph/cmd/msp-example/internal/redis"
"github.com/sourcegraph/sourcegraph/internal/version"
"github.com/sourcegraph/sourcegraph/lib/background"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/managedservicesplatform/runtime"
)
type Service struct{}
var _ runtime.Service[Config] = Service{}
func (s Service) Name() string { return "msp-example" }
func (s Service) Version() string { return version.Version() }
func (s Service) Initialize(
ctx context.Context,
logger log.Logger,
contract runtime.Contract,
config Config,
) (background.Routine, error) {
logger.Info("starting service")
var (
bq *bigquery.Client
rd *redis.Client
pg *postgresql.Client
)
if !config.StatelessMode {
var err error
if bq, err = bigquery.NewClient(ctx, contract); err != nil {
return nil, errors.Wrap(err, "bigquery")
}
if err := bq.Write(ctx, "service.initialized"); err != nil {
return nil, errors.Wrap(err, "bigquery.Write")
}
logger.Info("bigquery connection success")
if rd, err = redis.NewClient(ctx, contract); err != nil {
return nil, errors.Wrap(err, "redis")
}
if err := rd.Ping(ctx); err != nil {
return nil, errors.Wrap(err, "redis.Ping")
}
logger.Info("redis connection success")
if pg, err = postgresql.NewClient(ctx, contract); err != nil {
return nil, errors.Wrap(err, "postgresl")
}
if err := pg.Ping(ctx); err != nil {
return nil, errors.Wrap(err, "postgresql.Ping")
}
logger.Info("postgresql connection success")
}
h := http.NewServeMux()
if err := httpapi.Register(h, contract, config.HTTPAPI); err != nil {
return nil, errors.Wrap(err, "httpapi.Register")
}
contract.Diagnostics.RegisterDiagnosticsHandlers(h, serviceState{
statelessMode: config.StatelessMode,
bq: bq,
rd: rd,
pg: pg,
})
return background.CombinedRoutine{
&httpRoutine{
log: logger,
Server: &http.Server{
Addr: fmt.Sprintf(":%d", contract.Port),
Handler: otelhttp.NewHandler(h, "http",
otelhttp.WithSpanNameFormatter(func(operation string, r *http.Request) string {
// If incoming, just include the path since our own host is not
// very interesting. If outgoing, include the host as well.
target := r.URL.Path
if r.RemoteAddr == "" { // no RemoteAddr indicates this is an outgoing request
target = r.Host + target
}
if operation != "" {
return fmt.Sprintf("%s.%s %s", operation, r.Method, target)
}
return fmt.Sprintf("%s %s", r.Method, target)
})),
},
},
}, nil
}
type httpRoutine struct {
log log.Logger
*http.Server
}
func (s *httpRoutine) Name() string { return "http" }
func (s *httpRoutine) Start() {
if err := s.Server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
s.log.Error("error stopping server", log.Error(err))
}
}
func (s *httpRoutine) Stop(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := s.Server.Shutdown(ctx); err != nil {
return errors.Wrap(err, "shutdown")
}
s.log.Info("server stopped")
return nil
}