convox/pkg/atom/controller.go

218 lines
4.8 KiB
Go
Raw Permalink Normal View History

2019-08-16 19:49:19 +00:00
package atom
import (
"fmt"
"time"
2019-08-19 13:59:15 +00:00
ct "github.com/convox/convox/pkg/atom/pkg/apis/atom/v1"
2019-08-16 19:49:19 +00:00
cv "github.com/convox/convox/pkg/atom/pkg/client/clientset/versioned"
2019-08-19 13:59:15 +00:00
ic "github.com/convox/convox/pkg/atom/pkg/client/informers/externalversions/atom/v1"
"github.com/convox/convox/pkg/common"
2019-08-16 19:49:19 +00:00
"github.com/convox/convox/pkg/kctl"
"github.com/pkg/errors"
ac "k8s.io/api/core/v1"
am "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
type AtomController struct {
atom *Client
controller *kctl.Controller
convox cv.Interface
kubernetes kubernetes.Interface
}
func NewController(cfg *rest.Config) (*AtomController, error) {
ac, err := New(cfg)
if err != nil {
return nil, errors.WithStack(err)
}
cc, err := cv.NewForConfig(cfg)
if err != nil {
return nil, errors.WithStack(err)
}
kc, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, errors.WithStack(err)
}
acc := &AtomController{
atom: ac,
convox: cc,
kubernetes: kc,
}
c, err := kctl.NewController("kube-system", "convox-atom", acc)
if err != nil {
return nil, errors.WithStack(err)
}
acc.controller = c
return acc, nil
}
func (c *AtomController) Client() kubernetes.Interface {
return c.kubernetes
}
func (c *AtomController) ListOptions(opts *am.ListOptions) {
}
func (c *AtomController) Run() {
i := ic.NewFilteredAtomInformer(c.convox, ac.NamespaceAll, 5*time.Second, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, c.ListOptions)
ch := make(chan error)
go c.controller.Run(i, ch)
for err := range ch {
fmt.Printf("err = %+v\n", err)
}
}
func (c *AtomController) Start() error {
return nil
}
func (c *AtomController) Stop() error {
return nil
}
func (c *AtomController) Add(obj interface{}) error {
return nil
}
func (c *AtomController) Delete(obj interface{}) error {
return nil
}
func (c *AtomController) Update(prev, cur interface{}) error {
pa, err := assertAtom(prev)
if err != nil {
return errors.WithStack(err)
}
ca, err := assertAtom(cur)
if err != nil {
return errors.WithStack(err)
}
if pa.Status != ca.Status {
c.atomEvent(ca, "Status", fmt.Sprintf("%s => %s", common.CoalesceString(string(pa.Status), "None"), ca.Status))
fmt.Printf("ns=atom.controller at=update atom=\"%s/%s\" event=status from=%q to=%q\n", ca.Namespace, ca.Name, pa.Status, ca.Status)
}
2019-08-16 19:49:19 +00:00
switch ca.Status {
case "Failure", "Reverted", "Running":
return nil
case "Failed", "Success": // legacy
return nil
2019-08-16 19:49:19 +00:00
}
switch ca.Status {
case "Cancelled", "Deadline", "Error":
if err := c.atom.rollback(ca); err != nil {
c.atomStatus(ca, "Failure")
return errors.WithStack(err)
}
c.atomStatus(ca, "Rollback")
case "Pending":
2019-08-16 19:49:19 +00:00
if err := c.atom.apply(ca); err != nil {
c.atomStatus(ca, "Error")
return err
2019-08-16 19:49:19 +00:00
}
c.atomStatus(ca, "Updating")
2019-08-16 19:49:19 +00:00
case "Rollback":
if deadline := am.NewTime(time.Now().UTC().Add(-1 * time.Duration(ca.Spec.ProgressDeadlineSeconds) * time.Second)); ca.Started.Before(&deadline) {
c.atomStatus(ca, "Failure")
return nil
}
success, err := c.atom.check(ca.Namespace, ca.Spec.CurrentVersion)
if err != nil {
c.atomStatus(ca, "Failure")
return errors.WithStack(err)
}
if success {
c.atomStatus(ca, "Reverted")
}
case "Updating":
2019-08-16 19:49:19 +00:00
if deadline := am.NewTime(time.Now().UTC().Add(-1 * time.Duration(ca.Spec.ProgressDeadlineSeconds) * time.Second)); ca.Started.Before(&deadline) {
c.atomStatus(ca, "Deadline")
2019-08-16 19:49:19 +00:00
return nil
}
success, err := c.atom.check(ca.Namespace, ca.Spec.CurrentVersion)
2019-08-16 19:49:19 +00:00
if err != nil {
c.atomStatus(ca, "Error")
2019-08-16 19:49:19 +00:00
return errors.WithStack(err)
}
if success {
c.atomStatus(ca, "Running")
2019-08-16 19:49:19 +00:00
}
}
return nil
}
func (c *AtomController) atomEvent(a *ct.Atom, reason, message string) error {
ts := am.Now()
e := &ac.Event{
Count: 1,
Message: message,
Reason: reason,
FirstTimestamp: ts,
LastTimestamp: ts,
Type: "Normal",
InvolvedObject: ac.ObjectReference{
APIVersion: "atom.convox.com/v1",
Kind: "Atom",
Name: a.Name,
Namespace: a.Namespace,
UID: a.UID,
},
ObjectMeta: am.ObjectMeta{
GenerateName: fmt.Sprintf("%s-", a.Name),
},
Source: ac.EventSource{
Component: "convox.atom",
},
}
if _, err := c.kubernetes.CoreV1().Events(a.Namespace).Create(e); err != nil {
return errors.WithStack(err)
}
return nil
}
func (c *AtomController) atomStatus(a *ct.Atom, status string) error {
_, err := c.atom.update(a, func(aa *ct.Atom) {
aa.Status = ct.AtomStatus(status)
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
2019-08-16 19:49:19 +00:00
func assertAtom(v interface{}) (*ct.Atom, error) {
a, ok := v.(*ct.Atom)
if !ok {
return nil, errors.WithStack(fmt.Errorf("could not assert atom for type: %T", v))
}
return a, nil
}