insights: factor out common OnProgress handler (#45132)

We can reduce code duplication here since we have the same handler copy
pasted 4 times. Motivated by an open PR which wanted to add a new
condition to the handler.

Test Plan: go test
This commit is contained in:
Keegan Carruthers-Smith 2022-12-05 16:30:59 +02:00 committed by GitHub
parent 4107fc850f
commit ea12468c80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -37,6 +37,26 @@ type SelectRepoResult struct {
Repos []itypes.MinimalRepo
}
// onProgress is the common FrontendStreamDecoder.OnProgress handler.
func (s *StreamDecoderEvents) onProgress(progress *streamapi.Progress) {
if !progress.Done {
return
}
// Skipped elements are built progressively for a Progress update until it is Done, so
// we want to register its contents only once it is done.
for _, skipped := range progress.Skipped {
// ShardTimeout is a specific skipped event that we want to retry on. Currently
// we only retry on Alert events so this is why we add it there. This behaviour will
// be uniformised eventually.
if skipped.Reason == streamapi.ShardTimeout {
s.Alerts = append(s.Alerts, fmt.Sprintf("%s: %s", skipped.Reason, skipped.Message))
s.DidTimeout = true
} else {
s.SkippedReasons = append(s.SkippedReasons, fmt.Sprintf("%s: %s", skipped.Reason, skipped.Message))
}
}
}
// TabulationDecoder will tabulate the result counts per repository.
func TabulationDecoder() (streamhttp.FrontendStreamDecoder, *TabulationResult) {
tr := &TabulationResult{
@ -57,24 +77,7 @@ func TabulationDecoder() (streamhttp.FrontendStreamDecoder, *TabulationResult) {
}
return streamhttp.FrontendStreamDecoder{
OnProgress: func(progress *streamapi.Progress) {
if !progress.Done {
return
}
// Skipped elements are built progressively for a Progress update until it is Done, so
// we want to register its contents only once it is done.
for _, skipped := range progress.Skipped {
// ShardTimeout is a specific skipped event that we want to retry on. Currently
// we only retry on Alert events so this is why we add it there. This behaviour will
// be uniformised eventually.
if skipped.Reason == streamapi.ShardTimeout {
tr.Alerts = append(tr.Alerts, fmt.Sprintf("%s: %s", skipped.Reason, skipped.Message))
tr.DidTimeout = true
} else {
tr.SkippedReasons = append(tr.SkippedReasons, fmt.Sprintf("%s: %s", skipped.Reason, skipped.Message))
}
}
},
OnProgress: tr.onProgress,
OnMatches: func(matches []streamhttp.EventMatch) {
for _, match := range matches {
switch match := match.(type) {
@ -155,24 +158,7 @@ func MatchContextComputeDecoder() (client.ComputeMatchContextStreamDecoder, *Com
}
return client.ComputeMatchContextStreamDecoder{
OnProgress: func(progress *streamapi.Progress) {
if !progress.Done {
return
}
// Skipped elements are built progressively for a Progress update until it is Done, so
// we want to register its contents only once it is done.
for _, skipped := range progress.Skipped {
// ShardTimeout is a specific skipped event that we want to retry on. Currently
// we only retry on Alert events so this is why we add it there. This behaviour will
// be uniformised eventually.
if skipped.Reason == streamapi.ShardTimeout {
ctr.Alerts = append(ctr.Alerts, fmt.Sprintf("%s: %s", skipped.Reason, skipped.Message))
ctr.DidTimeout = true
} else {
ctr.SkippedReasons = append(ctr.SkippedReasons, fmt.Sprintf("%s: %s", skipped.Reason, skipped.Message))
}
}
},
OnProgress: ctr.onProgress,
OnResult: func(results []compute.MatchContext) {
for _, result := range results {
current := getRepoCounts(result)
@ -220,24 +206,7 @@ func ComputeTextDecoder() (client.ComputeTextExtraStreamDecoder, *ComputeTabulat
}
return client.ComputeTextExtraStreamDecoder{
OnProgress: func(progress *streamapi.Progress) {
if !progress.Done {
return
}
// Skipped elements are built progressively for a Progress update until it is Done, so
// we want to register its contents only once it is done.
for _, skipped := range progress.Skipped {
// ShardTimeout is a specific skipped event that we want to retry on. Currently
// we only retry on Alert events so this is why we add it there. This behaviour will
// be uniformised eventually.
if skipped.Reason == streamapi.ShardTimeout {
ctr.Alerts = append(ctr.Alerts, fmt.Sprintf("%s: %s", skipped.Reason, skipped.Message))
ctr.DidTimeout = true
} else {
ctr.SkippedReasons = append(ctr.SkippedReasons, fmt.Sprintf("%s: %s", skipped.Reason, skipped.Message))
}
}
},
OnProgress: ctr.onProgress,
OnResult: func(results []compute.TextExtra) {
for _, result := range results {
vals := strings.Split(result.Value, "\n")
@ -274,24 +243,7 @@ func SelectRepoDecoder() (streamhttp.FrontendStreamDecoder, *SelectRepoResult) {
}
return streamhttp.FrontendStreamDecoder{
OnProgress: func(progress *streamapi.Progress) {
if !progress.Done {
return
}
// Skipped elements are built progressively for a Progress update until it is Done, so
// we want to register its contents only once it is done.
for _, skipped := range progress.Skipped {
// ShardTimeout is a specific skipped event that we want to retry on. Currently
// we only retry on Alert events so this is why we add it there. This behaviour will
// be uniformised eventually.
if skipped.Reason == streamapi.ShardTimeout {
repoResult.Alerts = append(repoResult.Alerts, fmt.Sprintf("%s: %s", skipped.Reason, skipped.Message))
repoResult.DidTimeout = true
} else {
repoResult.SkippedReasons = append(repoResult.SkippedReasons, fmt.Sprintf("%s: %s", skipped.Reason, skipped.Message))
}
}
},
OnProgress: repoResult.onProgress,
OnMatches: func(matches []streamhttp.EventMatch) {
for _, match := range matches {
switch match := match.(type) {