mirror of
https://github.com/FlipsideCrypto/convox.git
synced 2026-02-06 10:56:56 +00:00
router: add redis storage backend (#91)
* router: add redis storage backend * add newline to error print
This commit is contained in:
parent
2985a8d66c
commit
e0223e3138
@ -41,7 +41,7 @@ func NewCacheRedis(addr, password string, secure bool) (*CacheRedis, error) {
|
||||
func (c *CacheRedis) Delete(ctx context.Context, key string) error {
|
||||
fmt.Printf("ns=cache.redis at=delete key=%s\n", key)
|
||||
|
||||
if _, err := c.redis.Del(fmt.Sprintf("cache.%s", key)).Result(); err != nil {
|
||||
if _, err := c.redis.Del(fmt.Sprintf("router/cache/%s", key)).Result(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -51,7 +51,7 @@ func (c *CacheRedis) Delete(ctx context.Context, key string) error {
|
||||
func (c *CacheRedis) Get(ctx context.Context, key string) ([]byte, error) {
|
||||
fmt.Printf("ns=cache.redis at=get key=%s\n", key)
|
||||
|
||||
v, err := c.redis.Get(fmt.Sprintf("cache.%s", key)).Result()
|
||||
v, err := c.redis.Get(fmt.Sprintf("router/cache/%s", key)).Bytes()
|
||||
if err == redis.Nil {
|
||||
return nil, autocert.ErrCacheMiss
|
||||
}
|
||||
@ -59,13 +59,13 @@ func (c *CacheRedis) Get(ctx context.Context, key string) ([]byte, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return []byte(v), nil
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func (c *CacheRedis) Put(ctx context.Context, key string, data []byte) error {
|
||||
fmt.Printf("ns=cache.redis at=put key=%s\n", key)
|
||||
|
||||
if _, err := c.redis.Set(fmt.Sprintf("cache.%s", key), data, 0).Result(); err != nil {
|
||||
if _, err := c.redis.Set(fmt.Sprintf("router/cache/%s", key), data, 0).Result(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
healthTick = 10 * time.Second
|
||||
idleTick = 1 * time.Minute
|
||||
idleTimeout = 60 * time.Minute
|
||||
)
|
||||
@ -95,8 +96,12 @@ func New() (*Router, error) {
|
||||
|
||||
r.storage = s
|
||||
case "redis":
|
||||
addr := os.Getenv("REDIS_ADDR")
|
||||
fmt.Printf("addr: %+v\n", addr)
|
||||
s, err := NewStorageRedis(os.Getenv("REDIS_ADDR"), os.Getenv("REDIS_AUTH"), os.Getenv("REDIS_SECURE") == "true")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r.storage = s
|
||||
default:
|
||||
r.storage = NewStorageMemory()
|
||||
}
|
||||
@ -140,6 +145,7 @@ func (r *Router) Serve() error {
|
||||
go serve(ch, r.HTTP)
|
||||
go serve(ch, r.HTTPS)
|
||||
|
||||
go r.healthTicker()
|
||||
go r.idleTicker()
|
||||
|
||||
return <-ch
|
||||
@ -243,9 +249,6 @@ func (r *Router) autocertHostPolicy(ctx context.Context, host string) error {
|
||||
return fmt.Errorf("unknown host")
|
||||
}
|
||||
|
||||
// work around chrome's agressive CT caching
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -290,6 +293,35 @@ func (r *Router) generateCertificateCA(hello *tls.ClientHelloInfo) (*tls.Certifi
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// try to request every known host on a timer to trigger things like
|
||||
// certificate generation before the user gets to them
|
||||
func (r *Router) healthTicker() {
|
||||
for range time.Tick(healthTick) {
|
||||
if err := r.healthTick(); err != nil {
|
||||
fmt.Printf("ns=router at=health.ticker error=%v\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Router) healthTick() error {
|
||||
hs, err := r.storage.HostList()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, h := range hs {
|
||||
if strings.HasSuffix(h, ".local") {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err = http.Get(fmt.Sprintf("https://%s/convox/health", h)); err != nil {
|
||||
fmt.Printf("ns=router at=health.tick host=%q error=%v\n", h, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Router) idleTicker() {
|
||||
for range time.Tick(idleTick) {
|
||||
if err := r.idleTick(); err != nil {
|
||||
|
||||
@ -3,6 +3,7 @@ package router
|
||||
import "time"
|
||||
|
||||
type Storage interface {
|
||||
HostList() ([]string, error)
|
||||
RequestBegin(target string) error
|
||||
RequestEnd(target string) error
|
||||
Stale(cutoff time.Time) ([]string, error)
|
||||
|
||||
@ -1,10 +1,12 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||
"github.com/convox/convox/pkg/common"
|
||||
@ -33,6 +35,43 @@ func NewStorageDynamo(hosts, targets string) (*StorageDynamo, error) {
|
||||
return d, nil
|
||||
}
|
||||
|
||||
func (s *StorageDynamo) HostList() ([]string, error) {
|
||||
// fmt.Printf("ns=storage.dynamo at=host.list\n")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
req := &dynamodb.ScanInput{
|
||||
ProjectionExpression: aws.String("host"),
|
||||
TableName: aws.String(s.hosts),
|
||||
}
|
||||
|
||||
p := request.Pagination{
|
||||
NewRequest: func() (*request.Request, error) {
|
||||
r, _ := s.ddb.ScanRequest(req)
|
||||
r.SetContext(ctx)
|
||||
return r, nil
|
||||
},
|
||||
}
|
||||
|
||||
hosts := []string{}
|
||||
|
||||
for p.Next() {
|
||||
page := p.Page().(*dynamodb.ScanOutput)
|
||||
|
||||
for _, item := range page.Items {
|
||||
if host := item["host"].S; host != nil {
|
||||
hosts = append(hosts, *host)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := p.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return hosts, nil
|
||||
}
|
||||
|
||||
func (s *StorageDynamo) IdleGet(target string) (bool, error) {
|
||||
fmt.Printf("ns=storage.dynamo at=idle.get target=%q\n", target)
|
||||
|
||||
@ -128,7 +167,7 @@ func (s *StorageDynamo) TargetAdd(host, target string, idles bool) error {
|
||||
}
|
||||
|
||||
func (s *StorageDynamo) TargetList(host string) ([]string, error) {
|
||||
fmt.Printf("ns=storage.dynamo at=target.list\n")
|
||||
// fmt.Printf("ns=storage.dynamo at=target.list\n")
|
||||
|
||||
res, err := s.ddb.GetItem(&dynamodb.GetItemInput{
|
||||
Key: map[string]*dynamodb.AttributeValue{"host": {S: aws.String(host)}},
|
||||
|
||||
@ -24,6 +24,21 @@ func NewStorageMemory() *StorageMemory {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StorageMemory) HostList() ([]string, error) {
|
||||
// fmt.Printf("ns=storage.memory at=host.list\n")
|
||||
|
||||
hosts := []string{}
|
||||
|
||||
s.routes.Range(func(key, value interface{}) bool {
|
||||
if host, ok := key.(string); ok {
|
||||
hosts = append(hosts, host)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
return hosts, nil
|
||||
}
|
||||
|
||||
func (s *StorageMemory) IdleGet(target string) (bool, error) {
|
||||
fmt.Printf("ns=storage.memory at=idle.get target=%q\n", target)
|
||||
|
||||
@ -125,6 +140,8 @@ func (s *StorageMemory) TargetAdd(host, target string, idles bool) error {
|
||||
}
|
||||
|
||||
func (s *StorageMemory) TargetList(host string) ([]string, error) {
|
||||
// fmt.Printf("ns=storage.memory at=target.list\n")
|
||||
|
||||
s.targetLock.Lock()
|
||||
defer s.targetLock.Unlock()
|
||||
|
||||
|
||||
144
pkg/router/storage_redis.go
Normal file
144
pkg/router/storage_redis.go
Normal file
@ -0,0 +1,144 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis"
|
||||
)
|
||||
|
||||
const ()
|
||||
|
||||
type StorageRedis struct {
|
||||
redis *redis.Client
|
||||
}
|
||||
|
||||
func NewStorageRedis(addr, password string, secure bool) (*StorageRedis, error) {
|
||||
fmt.Printf("ns=storage.redis at=new addr=%s\n", addr)
|
||||
|
||||
opts := &redis.Options{
|
||||
Addr: addr,
|
||||
Password: password,
|
||||
}
|
||||
|
||||
if secure {
|
||||
opts.TLSConfig = &tls.Config{}
|
||||
}
|
||||
|
||||
rc := redis.NewClient(opts)
|
||||
|
||||
if _, err := rc.Ping().Result(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r := &StorageRedis{
|
||||
redis: rc,
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (s *StorageRedis) HostList() ([]string, error) {
|
||||
// fmt.Printf("ns=storage.redis at=host.list\n")
|
||||
|
||||
hs, err := s.redis.SMembers("router/hosts").Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return hs, nil
|
||||
}
|
||||
|
||||
func (s *StorageRedis) IdleGet(target string) (bool, error) {
|
||||
fmt.Printf("ns=storage.redis at=idle.get target=%q\n", target)
|
||||
|
||||
idle, err := s.redis.Get(fmt.Sprintf("router/idle/%s", target)).Result()
|
||||
if err == redis.Nil {
|
||||
return false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return idle == "true", nil
|
||||
}
|
||||
|
||||
func (s *StorageRedis) IdleSet(target string, idle bool) error {
|
||||
fmt.Printf("ns=storage.redis at=idle.get target=%q idle=%t\n", target, idle)
|
||||
|
||||
if _, err := s.redis.Set(fmt.Sprintf("router/idle/%s", target), fmt.Sprintf("%t", idle), 0).Result(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StorageRedis) RequestBegin(target string) error {
|
||||
fmt.Printf("ns=storage.redis at=request.begin target=%q\n", target)
|
||||
|
||||
if _, err := s.redis.Set(fmt.Sprintf("router/activity/%s", target), time.Now().UTC(), 0).Result(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := s.redis.IncrBy(fmt.Sprintf("router/connections/%s", target), 1).Result(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StorageRedis) RequestEnd(target string) error {
|
||||
fmt.Printf("ns=storage.redis at=request.end target=%q\n", target)
|
||||
|
||||
if _, err := s.redis.IncrBy(fmt.Sprintf("router/connections/%s", target), -1).Result(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO implement
|
||||
func (s *StorageRedis) Stale(cutoff time.Time) ([]string, error) {
|
||||
fmt.Printf("ns=storage.redis at=stale cutoff=%s\n", cutoff)
|
||||
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
func (s *StorageRedis) TargetAdd(host, target string, idles bool) error {
|
||||
fmt.Printf("ns=storage.redis at=target.add host=%q target=%q idles=%t\n", host, target, idles)
|
||||
|
||||
if _, err := s.redis.SAdd("router/hosts", host).Result(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := s.redis.LPush(fmt.Sprintf("router/targets/%s", host), target).Result(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StorageRedis) TargetList(host string) ([]string, error) {
|
||||
// fmt.Printf("ns=storage.redis at=target.list\n")
|
||||
|
||||
ts, err := s.redis.LRange(fmt.Sprintf("router/targets/%s", host), 0, -1).Result()
|
||||
if err == redis.Nil {
|
||||
return []string{}, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ts, nil
|
||||
}
|
||||
|
||||
func (s *StorageRedis) TargetRemove(host, target string) error {
|
||||
fmt.Printf("ns=storage.redis at=target.remove host=%q target=%q\n", host, target)
|
||||
|
||||
if _, err := s.redis.LRem(fmt.Sprintf("router/targets/%s", host), 1, target).Result(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -68,5 +68,6 @@ module "router" {
|
||||
env = {
|
||||
CACHE = "redis"
|
||||
REDIS_ADDR = module.redis.addr
|
||||
STORAGE = "redis"
|
||||
}
|
||||
}
|
||||
|
||||
@ -69,5 +69,6 @@ module "router" {
|
||||
env = {
|
||||
CACHE = "redis"
|
||||
REDIS_ADDR = module.redis.addr
|
||||
STORAGE = "redis"
|
||||
}
|
||||
}
|
||||
|
||||
@ -62,5 +62,6 @@ module "router" {
|
||||
env = {
|
||||
CACHE = "redis"
|
||||
REDIS_ADDR = module.redis.addr
|
||||
STORAGE = "redis"
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user