From f3c5e300cd7535d64998262ef38cff17cfa76de1 Mon Sep 17 00:00:00 2001 From: James Houlahan Date: Fri, 3 Feb 2023 15:06:27 +0100 Subject: [PATCH] fix(GODT-2327): Delay event processing until gluon user exists We don't want to start processing events until those events have somewhere to be sent to. Also, to be safe, ensure remove and re-add the gluon user while clearing its sync status. This shouldn't be necessary. fix(GODT-2327): Only start processing events once sync is finished fix(GODT-2327): avoid windows delete all deadlock fix(GODT-2327): Clear update channels whenever clearing sync status fix(GODT-2327): Properly cancel event stream when handling refresh fix(GODT-2327): Remove unnecessary sync abort call fix(GODT-2327): Fix lint issue fix(GODT-2327): Don't retry with abortable context because it's canceled fix(GODT-2327): Loop to retry until sync has complete fix(GODT-2327): Better sleep (with context) --- go.mod | 2 +- go.sum | 4 +- internal/bridge/bridge.go | 2 - internal/bridge/imap.go | 15 ++++ internal/bridge/user.go | 5 +- internal/user/events.go | 11 ++- internal/user/user.go | 150 +++++++++++++++++++++++++------------- 7 files changed, 125 insertions(+), 64 deletions(-) diff --git a/go.mod b/go.mod index bfc85e13..be900dbc 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.18 require ( github.com/0xAX/notificator v0.0.0-20220220101646-ee9b8921e557 github.com/Masterminds/semver/v3 v3.1.1 - github.com/ProtonMail/gluon v0.14.2-0.20230206091703-4a3d7a57eeae + github.com/ProtonMail/gluon v0.14.2-0.20230206162331-cf36d870802b github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a github.com/ProtonMail/go-proton-api v0.3.1-0.20230203120457-1849bf7d578b github.com/ProtonMail/go-rfc5322 v0.11.0 diff --git a/go.sum b/go.sum index 8b4c9e27..9df895d5 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,8 @@ github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf h1:yc9daCCYUefEs github.com/ProtonMail/bcrypt v0.0.0-20211005172633-e235017c1baf/go.mod h1:o0ESU9p83twszAU8LBeJKFAAMX14tISa0yk4Oo5TOqo= github.com/ProtonMail/docker-credential-helpers v1.1.0 h1:+kvUIpwWcbtP3WFv5sSvkFn/XLzSqPOB5AAthuk9xPk= github.com/ProtonMail/docker-credential-helpers v1.1.0/go.mod h1:mK0aBveCxhnQ756AmaTfXMZDeULvheYVhF/MWMErN5g= -github.com/ProtonMail/gluon v0.14.2-0.20230206091703-4a3d7a57eeae h1:iJ0CgJEZTBRGX+vwmMrIg2HEGo3+qBJD/PPcDvYqzQg= -github.com/ProtonMail/gluon v0.14.2-0.20230206091703-4a3d7a57eeae/go.mod h1:HYHr7hG7LPWI1S50M8NfHRb1kYi5B+Yu4/N/H+y+JUY= +github.com/ProtonMail/gluon v0.14.2-0.20230206162331-cf36d870802b h1:v/XwH5Em8gFSpJQErhSCt0XAsIxojFxgrVcfPUEWH7I= +github.com/ProtonMail/gluon v0.14.2-0.20230206162331-cf36d870802b/go.mod h1:HYHr7hG7LPWI1S50M8NfHRb1kYi5B+Yu4/N/H+y+JUY= github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a h1:D+aZah+k14Gn6kmL7eKxoo/4Dr/lK3ChBcwce2+SQP4= github.com/ProtonMail/go-autostart v0.0.0-20210130080809-00ed301c8e9a/go.mod h1:oTGdE7/DlWIr23G0IKW3OXK9wZ5Hw1GGiaJFccTvZi4= github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7/go.mod h1:z4/9nQmJSSwwds7ejkxaJwO37dru3geImFUdJlaLzQo= diff --git a/internal/bridge/bridge.go b/internal/bridge/bridge.go index 70cfb590..1a5432cd 100644 --- a/internal/bridge/bridge.go +++ b/internal/bridge/bridge.go @@ -375,8 +375,6 @@ func (bridge *Bridge) init(tlsReporter TLSReporter) error { // Attempt to load users from the vault when triggered. bridge.goLoad = bridge.tasks.Trigger(func(ctx context.Context) { - logrus.Info("Loading users") - if err := bridge.loadUsers(ctx); err != nil { logrus.WithError(err).Error("Failed to load users") return diff --git a/internal/bridge/imap.go b/internal/bridge/imap.go index dc606e0e..37afae0b 100644 --- a/internal/bridge/imap.go +++ b/internal/bridge/imap.go @@ -152,9 +152,22 @@ func (bridge *Bridge) addIMAPUser(ctx context.Context, user *user.User) error { // If the DB was newly created, clear the sync status; gluon's DB was not found. logrus.Warn("IMAP user DB was newly created, clearing sync status") + // Remove the user from IMAP so we can clear the sync status. + if err := bridge.imapServer.RemoveUser(ctx, gluonID, false); err != nil { + return fmt.Errorf("failed to remove IMAP user: %w", err) + } + + // Clear the sync status -- we need to resync all messages. if err := user.ClearSyncStatus(); err != nil { return fmt.Errorf("failed to clear sync status: %w", err) } + + // Add the user back to the IMAP server. + if isNew, err := bridge.imapServer.LoadUser(ctx, imapConn, gluonID, user.GluonKey()); err != nil { + return fmt.Errorf("failed to add IMAP user: %w", err) + } else if isNew { + panic("IMAP user should already have a database") + } } else if status := user.GetSyncStatus(); !status.HasLabels { // Otherwise, the DB already exists -- if the labels are not yet synced, we need to re-create the DB. if err := bridge.imapServer.RemoveUser(ctx, gluonID, true); err != nil { @@ -192,7 +205,9 @@ func (bridge *Bridge) addIMAPUser(ctx context.Context, user *user.User) error { } } + // Trigger a sync for the user, if needed. user.TriggerSync() + return nil } diff --git a/internal/bridge/user.go b/internal/bridge/user.go index 6d355d2a..758fa1f2 100644 --- a/internal/bridge/user.go +++ b/internal/bridge/user.go @@ -330,17 +330,18 @@ func (bridge *Bridge) loginUser(ctx context.Context, client *proton.Client, auth // loadUsers tries to load each user in the vault that isn't already loaded. func (bridge *Bridge) loadUsers(ctx context.Context) error { logrus.WithField("count", len(bridge.vault.GetUserIDs())).Info("Loading users") + defer logrus.Info("Finished loading users") return bridge.vault.ForUser(runtime.NumCPU(), func(user *vault.User) error { log := logrus.WithField("userID", user.UserID()) if user.AuthUID() == "" { - log.Info("Not loading disconnected user") + log.Info("User is not connected (skipping)") return nil } if safe.RLockRet(func() bool { return mapHas(bridge.users, user.UserID()) }, bridge.usersLock) { - log.Debug("Not loading already-loaded user") + log.Info("User is already loaded (skipping)") return nil } diff --git a/internal/user/events.go b/internal/user/events.go index 2b0fc314..03cc0f8e 100644 --- a/internal/user/events.go +++ b/internal/user/events.go @@ -90,8 +90,10 @@ func (user *User) handleRefreshEvent(ctx context.Context, refresh proton.Refresh l.WithError(err).Error("Failed to report refresh to sentry") } - // Cancel and restart ongoing syncs. - user.abortable.Abort() + // Cancel the event stream once this refresh is done. + defer user.pollAbort.Abort() + + // Resync after the refresh. defer user.goSync() return safe.LockRet(func() error { @@ -118,11 +120,8 @@ func (user *User) handleRefreshEvent(ctx context.Context, refresh proton.Refresh user.apiAddrs = groupBy(apiAddrs, func(addr proton.Address) string { return addr.ID }) user.apiLabels = groupBy(apiLabels, func(label proton.Label) string { return label.ID }) - // Reinitialize the update channels. - user.initUpdateCh(user.vault.AddressMode()) - // Clear sync status; we want to sync everything again. - if err := user.vault.ClearSyncStatus(); err != nil { + if err := user.clearSyncStatus(); err != nil { return fmt.Errorf("failed to clear sync status: %w", err) } diff --git a/internal/user/user.go b/internal/user/user.go index 350e026b..824a30e1 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -78,7 +78,8 @@ type User struct { updateChLock safe.RWMutex tasks *async.Group - abortable async.Abortable + syncAbort async.Abortable + pollAbort async.Abortable goSync func() pollAPIEventsCh chan chan struct{} @@ -171,42 +172,6 @@ func New( return nil }) - // Stream events from the API, logging any errors that occur. - // This does nothing until the sync has been marked as complete. - // When we receive an API event, we attempt to handle it. - // If successful, we update the event ID in the vault. - user.tasks.Once(func(ctx context.Context) { - ticker := proton.NewTicker(EventPeriod, EventJitter) - defer ticker.Stop() - - for { - var doneCh chan struct{} - - select { - case <-ctx.Done(): - return - - case doneCh = <-user.pollAPIEventsCh: - // ... - - case <-ticker.C: - // ... - } - - user.log.Debug("Event poll triggered") - - if !user.vault.SyncStatus().IsComplete() { - user.log.Debug("Sync is incomplete, skipping event poll") - } else if err := user.doEventPoll(ctx); err != nil { - user.log.WithError(err).Error("Failed to poll events") - } - - if doneCh != nil { - close(doneCh) - } - } - }) - // When triggered, poll the API for events, optionally blocking until the poll is complete. user.goPollAPIEvents = func(wait bool) { doneCh := make(chan struct{}) @@ -218,18 +183,37 @@ func New( } } - // When triggered, attempt to sync the user. + // When triggered, sync the user and then begin streaming API events. user.goSync = user.tasks.Trigger(func(ctx context.Context) { - user.log.Debug("Sync triggered") + user.log.Info("Sync triggered") - user.abortable.Do(ctx, func(ctx context.Context) { + // Sync the user. + user.syncAbort.Do(ctx, func(ctx context.Context) { if user.vault.SyncStatus().IsComplete() { - user.log.Debug("Sync is already complete, skipping") - } else if err := user.doSync(ctx); err != nil { - user.log.WithError(err).Error("Failed to sync, will retry later") - time.AfterFunc(SyncRetryCooldown, user.goSync) + user.log.Info("Sync already complete, skipping") + return + } + + for { + if err := ctx.Err(); err != nil { + user.log.WithError(err).Error("Sync aborted") + return + } else if err := user.doSync(ctx); err != nil { + user.log.WithError(err).Error("Failed to sync, will retry later") + sleepCtx(ctx, SyncRetryCooldown) + } else { + user.log.Info("Sync complete, starting API event stream") + return + } } }) + + // Once we know the sync has completed, we can start polling for API events. + if user.vault.SyncStatus().IsComplete() { + user.pollAbort.Do(ctx, func(ctx context.Context) { + user.startEvents(ctx) + }) + } }) return user, nil @@ -295,22 +279,21 @@ func (user *User) GetAddressMode() vault.AddressMode { func (user *User) SetAddressMode(_ context.Context, mode vault.AddressMode) error { user.log.WithField("mode", mode).Info("Setting address mode") - user.abortable.Abort() + user.syncAbort.Abort() + user.pollAbort.Abort() defer user.goSync() return safe.LockRet(func() error { - user.initUpdateCh(mode) - if err := user.vault.SetAddressMode(mode); err != nil { return fmt.Errorf("failed to set address mode: %w", err) } - if err := user.vault.ClearSyncStatus(); err != nil { + if err := user.clearSyncStatus(); err != nil { return fmt.Errorf("failed to clear sync status: %w", err) } return nil - }, user.apiAddrsLock, user.updateChLock) + }, user.eventLock, user.apiAddrsLock, user.updateChLock) } // SetShowAllMail sets whether to show the All Mail mailbox. @@ -486,7 +469,8 @@ func (user *User) OnStatusUp(context.Context) { func (user *User) OnStatusDown(context.Context) { user.log.Info("Connection is down") - user.abortable.Abort() + user.syncAbort.Abort() + user.pollAbort.Abort() } // GetSyncStatus returns the sync status of the user. @@ -495,8 +479,30 @@ func (user *User) GetSyncStatus() vault.SyncStatus { } // ClearSyncStatus clears the sync status of the user. +// This also drops any updates in the update channel(s). +// Warning: the gluon user must be removed and re-added if this happens! func (user *User) ClearSyncStatus() error { - return user.vault.ClearSyncStatus() + user.log.Info("Clearing sync status") + + return safe.LockRet(func() error { + return user.clearSyncStatus() + }, user.eventLock, user.apiAddrsLock, user.updateChLock) +} + +// clearSyncStatus clears the sync status of the user. +// This also drops any updates in the update channel(s). +// Warning: the gluon user must be removed and re-added if this happens! +// It is assumed that the eventLock, apiAddrsLock and updateChLock are already locked. +func (user *User) clearSyncStatus() error { + user.log.Info("Clearing sync status") + + user.initUpdateCh(user.vault.AddressMode()) + + if err := user.vault.ClearSyncStatus(); err != nil { + return fmt.Errorf("failed to clear sync status: %w", err) + } + + return nil } // Logout logs the user out from the API. @@ -574,6 +580,40 @@ func (user *User) initUpdateCh(mode vault.AddressMode) { } } +// startEvents streams events from the API, logging any errors that occur. +// This does nothing until the sync has been marked as complete. +// When we receive an API event, we attempt to handle it. +// If successful, we update the event ID in the vault. +func (user *User) startEvents(ctx context.Context) { + ticker := proton.NewTicker(EventPeriod, EventJitter) + defer ticker.Stop() + + for { + var doneCh chan struct{} + + select { + case <-ctx.Done(): + return + + case doneCh = <-user.pollAPIEventsCh: + // ... + + case <-ticker.C: + // ... + } + + user.log.Debug("Event poll triggered") + + if err := user.doEventPoll(ctx); err != nil { + user.log.WithError(err).Error("Failed to poll events") + } + + if doneCh != nil { + close(doneCh) + } + } +} + // doEventPoll is called whenever API events should be polled. func (user *User) doEventPoll(ctx context.Context) error { user.eventLock.Lock() @@ -643,3 +683,11 @@ func b32(b bool) uint32 { return 0 } + +// sleepCtx sleeps for the given duration, or until the context is canceled. +func sleepCtx(ctx context.Context, d time.Duration) { + select { + case <-ctx.Done(): + case <-time.After(d): + } +}