Create a dashboard for background jobs (#44901)

- Add data access layer for Redis HASHes and LISTs (with tests) (in `rcache.go`)
- Add types on the back end
- Add business logic between GraphQL and Redis
- Add GraphQL endpoint in `schema.graphql` with Node compatibility
- Hook into periodic goroutines, DB-backed workers, and Batches scheduler
- Add menu item, routing, and page with page description, legend at the top, job and routine list
- Add filtering for errors and pause/play
- Extracted `formatMilliseconds` for reuse
- Format tooltip for large numbers nicely
- Remove a cyclic dependency in the `types` package

Co-authored-by: Eric Fritz <eric@sourcegraph.com>
Co-authored-by: Thorsten Ball <mrnugget@gmail.com>
Co-authored-by: Kelli Rockwell <kelli@sourcegraph.com>
This commit is contained in:
David Veszelovszki 2023-01-17 14:27:51 +01:00 committed by GitHub
parent 6a549271ef
commit bd7f2d3da3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 2293 additions and 104 deletions

View File

@ -35,7 +35,8 @@ All notable changes to Sourcegraph are documented in this file.
- Bitbucket Cloud code host connections now support permissions syncing. [#46312](https://github.com/sourcegraph/sourcegraph/pull/46312)
- Keep a log of corruption events that happen on repositories as they are detected. The Admin repositories page will now show when a repository has been detected as being corrupt and they'll also be able to see a history log of the corruption for that repository. [#46004](https://github.com/sourcegraph/sourcegraph/pull/46004)
- Added corrupted statistic as part of the global repositories statistics. [46412](https://github.com/sourcegraph/sourcegraph/pull/46412)
- Add a `Corrupted` status filter on the Admin repositories page, allowing Administrators to filter the list of repositories to only those that have been detected as corrupt. [#46415](https://github.com/sourcegraph/sourcegraph/pull/46415)
- Added a `Corrupted` status filter on the Admin repositories page, allowing Administrators to filter the list of repositories to only those that have been detected as corrupt. [#46415](https://github.com/sourcegraph/sourcegraph/pull/46415)
- Added “Background job dashboard” admin feature [#44901](https://github.com/sourcegraph/sourcegraph/pull/44901)
### Changed

View File

@ -10,6 +10,8 @@ describe('Timestamp', () => {
test('noAbout', () => expect(render(<Timestamp date="2006-01-02" noAbout={true} />).asFragment()).toMatchSnapshot())
test('noAgo', () => expect(render(<Timestamp date="2006-01-02" noAgo={true} />).asFragment()).toMatchSnapshot())
test('absolute time with formatting', () =>
expect(
render(

View File

@ -13,6 +13,9 @@ interface TimestampProps {
/** Omit the "about". */
noAbout?: boolean
/** Omit the "ago". */
noAgo?: boolean
/** Function that returns the current time (for stability in visual tests). */
now?: () => Date
@ -43,22 +46,23 @@ const RERENDER_INTERVAL_MSEC = 7000
export const Timestamp: React.FunctionComponent<React.PropsWithChildren<TimestampProps>> = ({
date,
noAbout = false,
noAgo = false,
strict = false,
now = Date.now,
preferAbsolute = false,
timestampFormat,
utc = false,
}) => {
const [label, setLabel] = useState<string>(calculateLabel(date, now, strict, noAbout))
const [label, setLabel] = useState<string>(calculateLabel(date, now, strict, noAbout, noAgo))
useEffect(() => {
const intervalHandle = window.setInterval(
() => setLabel(calculateLabel(date, now, strict, noAbout)),
() => setLabel(calculateLabel(date, now, strict, noAbout, noAgo)),
RERENDER_INTERVAL_MSEC
)
return () => {
window.clearInterval(intervalHandle)
}
}, [date, noAbout, now, strict])
}, [date, noAbout, noAgo, now, strict])
const tooltip = useMemo(() => {
let parsedDate = typeof date === 'string' ? parseISO(date) : new Date(date)
@ -81,16 +85,17 @@ function calculateLabel(
date: string | Date | number,
now: () => Date | number,
strict: boolean,
noAbout: boolean
noAbout: boolean,
noAgo: boolean
): string {
let label: string
if (strict) {
label = formatDistanceStrict(typeof date === 'string' ? parseISO(date) : date, now(), {
addSuffix: true,
addSuffix: !noAgo,
})
} else {
label = formatDistance(typeof date === 'string' ? parseISO(date) : date, now(), {
addSuffix: true,
addSuffix: !noAgo,
includeSeconds: true,
})
}

View File

@ -30,6 +30,16 @@ exports[`Timestamp noAbout 1`] = `
</DocumentFragment>
`;
exports[`Timestamp noAgo 1`] = `
<DocumentFragment>
<span
class="timestamp"
>
about 22 hours
</span>
</DocumentFragment>
`;
exports[`Timestamp with time time 1`] = `
<DocumentFragment>
<span

View File

@ -1,10 +1,11 @@
import React from 'react'
import { mdiCheckCircle, mdiAlertCircle } from '@mdi/js'
import { mdiAlertCircle, mdiCheckCircle } from '@mdi/js'
import { Timestamp } from '@sourcegraph/branded/src/components/Timestamp'
import { pluralize } from '@sourcegraph/common'
import { LoadingSpinner, CardBody, Card, Icon } from '@sourcegraph/wildcard'
import { Card, CardBody, Icon, LoadingSpinner } from '@sourcegraph/wildcard'
import { formatDurationLong } from '../util/time'
import { Collapsible } from './Collapsible'
import { LogOutput } from './LogOutput'
@ -62,7 +63,7 @@ export const ExecutionLogEntry: React.FunctionComponent<React.PropsWithChildren<
{logEntry.exitCode !== null && logEntry.durationMilliseconds !== null && (
<>
<span className="text-muted">, ran for</span>{' '}
{formatMilliseconds(logEntry.durationMilliseconds)}
{formatDurationLong(logEntry.durationMilliseconds)}
</>
)}
</div>
@ -82,49 +83,3 @@ export const ExecutionLogEntry: React.FunctionComponent<React.PropsWithChildren<
</div>
</Card>
)
const timeOrders: [number, string][] = [
[1000 * 60 * 60 * 24, 'day'],
[1000 * 60 * 60, 'hour'],
[1000 * 60, 'minute'],
[1000, 'second'],
[1, 'millisecond'],
]
/**
* This is essentially to date-fns/formatDistance with support for milliseconds.
* The output of this function has the following properties:
*
* - Consists of one unit (e.g. `x days`) or two units (e.g. `x days and y hours`).
* - If there are more than one unit, they are adjacent (e.g. never `x days and y minutes`).
* - If there is a greater unit, the value will not exceed the next threshold (e.g. `2 minutes and 5 seconds`, never `125 seconds`).
*
* @param milliseconds The number of milliseconds elapsed.
*/
const formatMilliseconds = (milliseconds: number): string => {
const parts: string[] = []
// Construct a list of parts like `1 day` or `7 hours` in descending
// order. If the value is zero, an empty string is added to the list.`
timeOrders.reduce((msRemaining, [denominator, suffix]) => {
// Determine how many units can fit into the current value
const part = Math.floor(msRemaining / denominator)
// Format this part (pluralize if value is more than one)
parts.push(part > 0 ? `${part} ${pluralize(suffix, part)}` : '')
// Remove this order's contribution to the current value
return msRemaining - part * denominator
}, milliseconds)
const description = parts
// Trim leading zero-valued parts
.slice(parts.findIndex(part => part !== ''))
// Keep only two consecutive non-zero parts
.slice(0, 2)
// Re-filter zero-valued parts
.filter(part => part !== '')
// If there are two parts, join them
.join(' and ')
// If description is empty return a canned string
return description || '0 milliseconds'
}

View File

@ -0,0 +1,27 @@
.table-header {
display: grid;
grid-template-columns: auto minmax(0, 125px);
grid-gap: 0.25rem;
align-items: center;
}
.filter-select {
margin-bottom: 0;
width: auto;
}
.routine {
display: grid;
margin-bottom: 1rem;
grid-template-columns: auto minmax(0, 125px);
grid-gap: 0.25rem;
align-items: center;
}
.name-and-description {
overflow-wrap: anywhere;
}
.link-color {
color: var(--link-color);
}

View File

@ -0,0 +1,505 @@
import React, { useCallback, useEffect, useMemo, useState } from 'react'
import {
mdiAccountHardHat,
mdiAlert,
mdiCached,
mdiCheck,
mdiClose,
mdiDatabase,
mdiHelp,
mdiNumeric,
mdiShape,
} from '@mdi/js'
import format from 'date-fns/format'
import { RouteComponentProps } from 'react-router'
import { Timestamp } from '@sourcegraph/branded/src/components/Timestamp'
import { pluralize } from '@sourcegraph/common'
import { useQuery } from '@sourcegraph/http-client'
import { TelemetryProps } from '@sourcegraph/shared/src/telemetry/telemetryService'
import {
Button,
Container,
ErrorAlert,
Icon,
Link,
LoadingSpinner,
PageHeader,
Select,
Text,
Tooltip,
useSessionStorage,
} from '@sourcegraph/wildcard'
import { PageTitle } from '../components/PageTitle'
import { BackgroundJobsResult, BackgroundJobsVariables, BackgroundRoutineType } from '../graphql-operations'
import { formatDurationLong } from '../util/time'
import { ValueLegendList } from './analytics/components/ValueLegendList'
import { BACKGROUND_JOBS, BACKGROUND_JOBS_PAGE_POLL_INTERVAL_MS } from './backend'
import styles from './SiteAdminBackgroundJobsPage.module.scss'
export interface SiteAdminBackgroundJobsPageProps extends RouteComponentProps, TelemetryProps {}
export type BackgroundJob = BackgroundJobsResult['backgroundJobs']['nodes'][0]
export type BackgroundRoutine = BackgroundJob['routines'][0]
// "short" runs are displayed with a “success” style.
// “long” runs are displayed with a “warning” style to make sure they stand out somewhat.
// “dangerous” runs are displayed with a “danger” style to make sure they stand out even more.
type RunLengthCategory = 'short' | 'long' | 'dangerous'
// The maximum number of recent runs to fetch for each routine.
const recentRunCount = 5
// A map of the routine icons by type
const routineTypeToIcon: Record<BackgroundRoutineType, string> = {
[BackgroundRoutineType.PERIODIC]: mdiCached,
[BackgroundRoutineType.PERIODIC_WITH_METRICS]: mdiNumeric,
[BackgroundRoutineType.DB_BACKED]: mdiDatabase,
[BackgroundRoutineType.CUSTOM]: mdiShape,
}
export const SiteAdminBackgroundJobsPage: React.FunctionComponent<
React.PropsWithChildren<SiteAdminBackgroundJobsPageProps>
> = ({ telemetryService }) => {
// Log page view
useEffect(() => {
telemetryService.logPageView('SiteAdminBackgroundJobs')
}, [telemetryService])
// Data query and polling setting
const { data, loading, error, stopPolling, startPolling } = useQuery<BackgroundJobsResult, BackgroundJobsVariables>(
BACKGROUND_JOBS,
{
variables: { recentRunCount },
pollInterval: BACKGROUND_JOBS_PAGE_POLL_INTERVAL_MS,
}
)
const [polling, setPolling] = useState(true)
const togglePolling = useCallback(() => {
if (polling) {
stopPolling()
} else {
startPolling(BACKGROUND_JOBS_PAGE_POLL_INTERVAL_MS)
}
setPolling(!polling)
}, [polling, startPolling, stopPolling])
return (
<div>
<PageTitle title="Background jobs - Admin" />
<Button variant="secondary" onClick={togglePolling} className="float-right">
{polling ? 'Pause polling' : 'Resume polling'}
</Button>
<PageHeader
path={[{ text: 'Background jobs' }]}
headingElement="h2"
description={
<>
This page lists{' '}
<Link to="/help/admin/workers" target="_blank" rel="noopener noreferrer">
all running jobs
</Link>
, their routines, recent runs, any errors, timings, and stats.
</>
}
className="mb-3"
/>
<Text>Terminology:</Text>
<ul>
<li>
<strong>Job</strong>: a bag of routines, started when the Sourcegraph app is launched
</li>
<li>
<strong>Routine</strong>: a background process that repeatedly executes its task indefinitely, using
an interval passed at start
</li>
<li>
<strong>Run</strong>: a single execution of a routine's task
</li>
<li>
<strong>Host</strong>: a Sourcegraph instance that starts some jobs when launched
</li>
<li>
<strong>Instance</strong>: a job ran on a host
</li>
</ul>
<Container className="mb-3">
{error && !loading && <ErrorAlert error={error} />}
{loading && !error && <LoadingSpinner />}
{!loading && !error && data?.backgroundJobs.nodes && <JobList jobs={data.backgroundJobs.nodes} />}
</Container>
</div>
)
}
const JobList: React.FunctionComponent<{
jobs: BackgroundJob[]
}> = ({ jobs }) => {
const [onlyShowProblematic, setOnlyShowProblematic] = useSessionStorage(
'site-admin.background-jobs.only-show-problematic-routines',
false
)
const hostNames = useMemo(
() =>
jobs
.map(job => job.routines[0]?.instances[0]?.hostName) // get the host name of the first routine
.filter((host, index, hosts) => hosts.indexOf(host) === index) // deduplicate
.filter(host => !!host), // remove undefined
[jobs]
)
const problematicJobs = useMemo(
() => jobs.filter(job => job.routines.some(routine => isRoutineProblematic(routine))),
[jobs]
)
const jobsToDisplay = onlyShowProblematic ? problematicJobs : jobs
return (
<>
<LegendList jobs={jobs} hostNameCount={hostNames.length} />
{jobsToDisplay ? (
<>
<div className={styles.tableHeader}>
<div>
<Select
aria-label="Filter for problematic routines"
onChange={value => setOnlyShowProblematic(value.target.value === 'problematic')}
selectClassName={styles.filterSelect}
defaultValue={onlyShowProblematic ? 'problematic' : 'all'}
>
<option value="all">Show all routines</option>
<option value="problematic">Only show problematic routines</option>
</Select>
</div>
<div className="text-center">Fastest / avg / slowest run (ms)</div>
</div>
<ul className="list-group list-group-flush">
{jobsToDisplay.map(job => (
<JobItem
key={job.name}
job={job}
hostNames={hostNames}
onlyShowProblematic={onlyShowProblematic}
/>
))}
</ul>
</>
) : (
'No jobs to display.'
)}
</>
)
}
const JobItem: React.FunctionComponent<{ job: BackgroundJob; hostNames: string[]; onlyShowProblematic: boolean }> =
React.memo(function JobItem({ job, hostNames, onlyShowProblematic }) {
const jobHostNames = [
...new Set(job.routines.map(routine => routine.instances.map(instance => instance.hostName)).flat()),
].sort()
return (
<li key={job.name} className="list-group-item px-0 py-2">
<div className="d-flex align-items-center justify-content-between mb-2">
<div className="d-flex flex-row align-items-center mb-0">
<Icon aria-hidden={true} svgPath={mdiAccountHardHat} />{' '}
<Text className="mb-0 ml-2">
<strong>{job.name}</strong>{' '}
<span className="text-muted">
(starts {job.routines.length} {pluralize('routine', job.routines.length)}
{hostNames.length > 1
? ` on ${jobHostNames.length} ${pluralize('instance', jobHostNames.length)}`
: ''}
)
</span>
</Text>
</div>
</div>
{job.routines
.filter(routine => (onlyShowProblematic ? isRoutineProblematic(routine) : true))
.map(routine => (
<RoutineItem routine={routine} key={routine.name} />
))}
</li>
)
})
const LegendList: React.FunctionComponent<{ jobs: BackgroundJob[]; hostNameCount: number }> = React.memo(
({ jobs, hostNameCount }) => {
const routineCount = jobs.reduce((acc, job) => acc + job.routines.length, 0)
const routineInstanceCount = jobs.reduce(
(acc, job) => acc + job.routines.reduce((acc, routine) => acc + routine.instances.length, 0),
0
)
const recentRunErrors = jobs.reduce(
(acc, job) =>
acc +
job.routines.reduce(
(acc, routine) => acc + routine.recentRuns.filter(run => run.errorMessage).length,
0
),
0
)
const legends = [
{
value: jobs.length,
description: pluralize('Job', jobs.length),
tooltip: 'The number of known background jobs in the system.',
},
{
value: routineCount,
description: pluralize('Routine', routineCount),
tooltip: 'The total number of routines across all jobs.',
},
{
value: hostNameCount,
description: pluralize('Host', hostNameCount),
tooltip: 'The total number of known hosts where jobs run.',
},
{
value: routineInstanceCount,
description: pluralize('Instance', routineInstanceCount),
tooltip: 'The total number of routine instances across all jobs and hosts.',
},
{
value: recentRunErrors,
description: pluralize('Recent error', recentRunErrors),
color: recentRunErrors > 0 ? 'var(--red)' : undefined,
tooltip: 'The total number of errors across all runs across all routine instances.',
},
]
return <ValueLegendList className="mb-3" items={legends} />
}
)
const RoutineItem: React.FunctionComponent<{ routine: BackgroundRoutine }> = ({ routine }) => {
const allHostNames = routine.recentRuns
.map(run => run.hostName) // get host name
.filter((host, index, hosts) => hosts.indexOf(host) === index) // deduplicate
const commonHostName = allHostNames.length === 1 ? allHostNames[0] : undefined
const routineTypeDisplayableName = routine.type.toLowerCase().replace(/_/g, ' ')
const recentRunsTooltipContent = (
<div>
{commonHostName ? <Text className="mb-0">All on {commonHostName}:</Text> : ''}
<ul className="pl-4">
{routine.recentRuns.map(run => (
<li key={run.at}>
<Text className="mb-0">
{run.errorMessage ? (
<Icon aria-hidden={true} svgPath={mdiAlert} className="text-danger" />
) : (
''
)}{' '}
<Timestamp date={new Date(run.at)} noAbout={true} />
{commonHostName ? '' : ` on the host called “${run.hostName}”,`} for{' '}
<span className={getRunDurationTextClass(run.durationMs, routine.intervalMs)}>
{run.durationMs}ms
</span>
.{run.errorMessage ? ` Error: ${run.errorMessage}` : ''}
</Text>
</li>
))}
</ul>
</div>
)
const recentRunsWithErrors = routine.recentRuns.filter(run => run.errorMessage)
return (
<div className={styles.routine}>
<div className={styles.nameAndDescription}>
<Text className="mb-1 ml-4">
<span className="mr-2">
<StartedStoppedIndicator routine={routine} />
</span>
<Tooltip content={routineTypeDisplayableName} placement="top">
<Icon
aria-label={routineTypeDisplayableName}
svgPath={routineTypeToIcon[routine.type] ?? mdiHelp}
/>
</Tooltip>
<span className="ml-2">
<strong>{routine.name}</strong>
</span>
<span className="ml-2 text-muted">{routine.description}</span>
</Text>
<Text className="mb-0 ml-4 text-muted">
{routine.intervalMs ? (
<>
{routine.type === 'DB_BACKED' ? 'Checks queue ' : 'Runs '}every{' '}
<strong>{formatDurationLong(routine.intervalMs)}</strong>.{' '}
</>
) : null}
{routine.recentRuns.length > 0 ? (
<Tooltip content={recentRunsTooltipContent}>
<span>
<strong>
<span className={recentRunsWithErrors.length ? 'text-danger' : 'text-success'}>{`${
recentRunsWithErrors.length
} ${pluralize('error', recentRunsWithErrors.length)}`}</span>
</strong>
<span className={styles.linkColor}>*</span> in the last{' '}
{`${routine.recentRuns.length} ${pluralize('run', routine.recentRuns.length)}`}.{' '}
</span>
</Tooltip>
) : null}
{routine.stats.runCount ? (
<>
<span className={routine.stats.errorCount ? 'text-danger' : 'text-success'}>
<strong>
{routine.stats.errorCount} {pluralize('error', routine.stats.errorCount)}
</strong>
</span>{' '}
in <strong>{routine.stats.runCount}</strong> {pluralize('run', routine.stats.runCount)}
{routine.stats.since ? (
<>
{' '}
in the last{' '}
<Timestamp date={new Date(routine.stats.since)} noAbout={true} noAgo={true} />.
</>
) : null}
</>
) : null}
</Text>
</div>
<div className="text-center">
{routine.stats.runCount ? (
<Tooltip content="Fastest / avg / slowest run in milliseconds">
<div>
<span className={getRunDurationTextClass(routine.stats.minDurationMs, routine.intervalMs)}>
{routine.stats.minDurationMs}
</span>{' '}
/{' '}
<span className={getRunDurationTextClass(routine.stats.avgDurationMs, routine.intervalMs)}>
{routine.stats.avgDurationMs}
</span>{' '}
/{' '}
<span className={getRunDurationTextClass(routine.stats.maxDurationMs, routine.intervalMs)}>
{routine.stats.maxDurationMs}
</span>
</div>
</Tooltip>
) : (
<span className="text-muted">No stats yet.</span>
)}
</div>
</div>
)
}
const StartedStoppedIndicator: React.FunctionComponent<{ routine: BackgroundRoutine }> = ({ routine }) => {
// The last time this job was started
const latestStartDateString = routine.instances.reduce(
(mostRecent, instance) =>
instance.lastStartedAt && (!mostRecent || instance.lastStartedAt > mostRecent)
? instance.lastStartedAt
: mostRecent,
''
)
// The earliest time this job was stopped
const earliestStopDateString = routine.instances.reduce(
(earliest, instance) =>
instance.lastStoppedAt && (!earliest || instance.lastStoppedAt < earliest)
? instance.lastStoppedAt
: earliest,
''
)
// The date of the most recent run
const mostRecentRunDate = routine.recentRuns.length ? new Date(routine.recentRuns[0].at) : null
// See if this routine is stopped or not seen recently
const isStopped = earliestStopDateString ? earliestStopDateString >= latestStartDateString : false
const isUnseenInAWhile = !!(
routine.intervalMs &&
routine.type !== BackgroundRoutineType.DB_BACKED &&
(!mostRecentRunDate ||
mostRecentRunDate.getTime() +
routine.intervalMs +
routine.stats.maxDurationMs +
BACKGROUND_JOBS_PAGE_POLL_INTERVAL_MS <=
Date.now())
)
const tooltip = isStopped
? `This routine is currently stopped.
Started at ${format(new Date(latestStartDateString), 'yyyy-MM-dd HH:mm:ss')},
Stopped at: ${format(new Date(earliestStopDateString), 'yyyy-MM-dd HH:mm:ss')}`
: isUnseenInAWhile
? mostRecentRunDate
? `This routine has not been seen in a while. It should've run at ${format(
new Date(mostRecentRunDate.getTime() + (routine.intervalMs || 0)),
'yyyy-MM-dd HH:mm:ss'
)}.`
: 'This routine was started but it has never been seen running.'
: `This routine is currently started.${
mostRecentRunDate ? `\nLast seen running at ${format(mostRecentRunDate, 'yyyy-MM-dd HH:mm:ss')}.` : ''
}`
return isStopped || isUnseenInAWhile ? (
<Tooltip content={tooltip}>
<Icon aria-label="stopped or unseen" svgPath={mdiClose} className="text-danger" />
</Tooltip>
) : (
<Tooltip content={tooltip}>
<Icon aria-label="started" svgPath={mdiCheck} className="text-success" />
</Tooltip>
)
}
function isRoutineProblematic(routine: BackgroundRoutine): boolean {
return (
routine.stats.errorCount > 0 ||
routine.recentRuns.some(
run => run.errorMessage || categorizeRunDuration(run.durationMs, routine.intervalMs) !== 'short'
) ||
categorizeRunDuration(routine.stats.minDurationMs, routine.intervalMs) !== 'short' ||
categorizeRunDuration(routine.stats.avgDurationMs, routine.intervalMs) !== 'short' ||
categorizeRunDuration(routine.stats.maxDurationMs, routine.intervalMs) !== 'short'
)
}
// Contains some magic numbers
function categorizeRunDuration(durationMs: number, routineIntervalMs: number | null): RunLengthCategory {
// Recognize dangerously long runs
const dangerouslyLongRunRelativeCutoff = 0.7
if (routineIntervalMs && durationMs > routineIntervalMs * dangerouslyLongRunRelativeCutoff) {
return 'dangerous'
}
// Recognize long runs
const longRunCutoffMs = 5000
if (durationMs > longRunCutoffMs) {
return 'long'
}
// Shorter runs of non-periodic routines are always “short”
if (!routineIntervalMs) {
return 'short'
}
// If the run is more than 10% longer than the interval, it's long. (the cutoff is 50% for very short intervals)
const veryShortIntervalCutoffMs = 1000
const relativeLongRunCutoffMs = routineIntervalMs * (routineIntervalMs <= veryShortIntervalCutoffMs ? 0.5 : 0.1)
return durationMs > relativeLongRunCutoffMs ? 'long' : 'short'
}
function getRunDurationTextClass(durationMs: number, routineIntervalMs: number | null): string {
const category = categorizeRunDuration(durationMs, routineIntervalMs)
switch (category) {
case 'dangerous':
return 'text-danger'
case 'long':
return 'text-warning'
default:
return 'text-success'
}
}

View File

@ -39,9 +39,15 @@ export const ValueLegendItem: React.FunctionComponent<ValueLegendItemProps> = ({
return search
}, [filter, location.search])
const tooltipOnNumber =
formattedNumber !== unformattedNumber
? isNaN(parseFloat(unformattedNumber))
? unformattedNumber
: Intl.NumberFormat('en').format(parseFloat(unformattedNumber))
: undefined
return (
<div className={classNames('d-flex flex-column align-items-center mr-4 justify-content-center', className)}>
<Tooltip content={formattedNumber !== unformattedNumber ? unformattedNumber : undefined}>
<Tooltip content={tooltipOnNumber}>
{filter ? (
<Link to={`?${searchParams.toString()}`} style={{ color }} className={styles.count}>
{formattedNumber}

View File

@ -285,7 +285,44 @@ export const OUTBOUND_REQUESTS = gql`
}
}
`
export const BACKGROUND_JOBS = gql`
query BackgroundJobs($recentRunCount: Int) {
backgroundJobs(recentRunCount: $recentRunCount) {
nodes {
name
routines {
name
type
description
intervalMs
instances {
hostName
lastStartedAt
lastStoppedAt
}
recentRuns {
at
hostName
durationMs
errorMessage
}
stats {
since
runCount
errorCount
minDurationMs
avgDurationMs
maxDurationMs
}
}
}
}
}
`
export const OUTBOUND_REQUESTS_PAGE_POLL_INTERVAL_MS = 5000
export const BACKGROUND_JOBS_PAGE_POLL_INTERVAL_MS = 5000
export const UPDATE_MIRROR_REPOSITORY = gql`
mutation UpdateMirrorRepository($repository: ID!) {

View File

@ -112,6 +112,11 @@ export const siteAdminAreaRoutes: readonly SiteAdminAreaRoute[] = [
exact: true,
render: lazyComponent(() => import('./SiteAdminOutboundRequestsPage'), 'SiteAdminOutboundRequestsPage'),
},
{
path: '/background-jobs',
exact: true,
render: lazyComponent(() => import('./SiteAdminBackgroundJobsPage'), 'SiteAdminBackgroundJobsPage'),
},
{
path: '/feature-flags',
exact: true,

View File

@ -159,10 +159,15 @@ export const maintenanceGroup: SiteAdminSideBarGroup = {
source: 'server',
},
{
label: 'Slow Requests',
label: 'Slow requests',
to: '/site-admin/slow-requests',
source: 'server',
},
{
label: 'Background jobs',
to: '/site-admin/background-jobs',
source: 'server',
},
],
}

View File

@ -0,0 +1,14 @@
import { formatDurationLong } from './time'
describe('formatDurationLong', () => {
it('should format durations as per the spec', () => {
expect(formatDurationLong(0)).toEqual('0 milliseconds')
expect(formatDurationLong(100)).toEqual('100 milliseconds')
expect(formatDurationLong(1000)).toEqual('1 second')
expect(formatDurationLong(10000)).toEqual('10 seconds')
expect(formatDurationLong(100000)).toEqual('1 minute and 40 seconds')
expect(formatDurationLong(1000000)).toEqual('16 minutes and 40 seconds')
expect(formatDurationLong(10000000)).toEqual('2 hours and 46 minutes')
expect(formatDurationLong(100000000)).toEqual('1 day and 3 hours')
})
})

View File

@ -0,0 +1,65 @@
import { pluralize } from '@sourcegraph/common'
const units = [
{ denominator: 1000 * 60 * 60 * 24, name: 'day' },
{ denominator: 1000 * 60 * 60, name: 'hour' },
{ denominator: 1000 * 60, name: 'minute' },
{ denominator: 1000, name: 'second' },
{ denominator: 1, name: 'millisecond' },
]
type unitName = typeof units[number]['name']
export interface StructuredDuration {
amount: number
unit: unitName
}
/**
* This is essentially to date-fns/formatDistance with support for milliseconds.
*
* Examples for the output:
* - "1 day"
* - "2 days and 1 hour"
* - "1 minute and 5 seconds"
* - "5 seconds"
* - "1 millisecond".
*
* The output has the following properties:
*
* - Consists of either one unit ("x days") or two units ("x days and y hours")
* - If there are more than one unit, they are adjacent (never "x days and y minutes")
* - If there is a greater unit, the value will not exceed the next threshold (e.g. `2 minutes and 5 seconds`, never `125 seconds`).
*
* @param millis The number of milliseconds elapsed.
*/
export function formatDurationLong(millis: number): string {
const parts = formatDurationStructured(millis)
const description = parts
.slice(0, 2)
.map(part => `${part.amount} ${pluralize(part.unit, part.amount)}`)
.join(' and ')
// If description is empty return a canned string
return description || '0 milliseconds'
}
function formatDurationStructured(millis: number): StructuredDuration[] {
const parts: { amount: number; unit: string }[] = []
// Construct a list of parts like `1 day` or `7 hours` in descending
// order. If the value is zero, an empty string is added to the list.`
units.reduce((msRemaining, { denominator, name }) => {
// Determine how many units can fit into the current value
const part = Math.floor(msRemaining / denominator)
// Format this part (pluralize if value is more than one)
if (part > 0) {
parts.push({ amount: part, unit: name })
}
// Remove this order's contribution to the current value
return msRemaining - part * denominator
}, millis)
return parts
}

View File

@ -0,0 +1,244 @@
package graphqlbackend
import (
"context"
"sync"
"github.com/graph-gophers/graphql-go"
"github.com/graph-gophers/graphql-go/relay"
"github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend/graphqlutil"
"github.com/sourcegraph/sourcegraph/internal/auth"
"github.com/sourcegraph/sourcegraph/internal/goroutine/recorder"
"github.com/sourcegraph/sourcegraph/internal/gqlutil"
)
// dayCountForStats is hard-coded for now. This signifies the number of days to use for generating the stats for each routine.
const dayCountForStats = 7
// defaultRecentRunCount signifies the default number of recent runs to return for each routine.
const defaultRecentRunCount = 5
type backgroundJobsArgs struct {
First *int32
After *string
RecentRunCount *int32
}
type BackgroundJobResolver struct {
jobInfo recorder.JobInfo
}
type RoutineResolver struct {
routine recorder.RoutineInfo
}
type RoutineInstanceResolver struct {
instance recorder.RoutineInstanceInfo
}
type RoutineRecentRunResolver struct {
recentRun recorder.RoutineRun
}
type RoutineStatsResolver struct {
stats recorder.RoutineRunStats
}
// backgroundJobConnectionResolver resolves a list of access tokens.
//
// 🚨 SECURITY: When instantiating a backgroundJobConnectionResolver value, the caller MUST check
// permissions.
type backgroundJobConnectionResolver struct {
first *int32
after string
recentRunCount *int32
// cache results because they are used by multiple fields
once sync.Once
resolvers []*BackgroundJobResolver
err error
}
func (r *schemaResolver) BackgroundJobs(ctx context.Context, args *backgroundJobsArgs) (*backgroundJobConnectionResolver, error) {
// 🚨 SECURITY: Only site admins may list background jobs.
if err := auth.CheckCurrentUserIsSiteAdmin(ctx, r.db); err != nil {
return nil, err
}
// Parse `after` argument
var after string
if args.After != nil {
err := relay.UnmarshalSpec(graphql.ID(*args.After), &after)
if err != nil {
return nil, err
}
}
return &backgroundJobConnectionResolver{
first: args.First,
after: after,
recentRunCount: args.RecentRunCount,
}, nil
}
func (r *schemaResolver) backgroundJobByID(ctx context.Context, id graphql.ID) (*BackgroundJobResolver, error) {
// 🚨 SECURITY: Only site admins may view background jobs.
if err := auth.CheckCurrentUserIsSiteAdmin(ctx, r.db); err != nil {
return nil, err
}
var jobName string
err := relay.UnmarshalSpec(id, &jobName)
if err != nil {
return nil, err
}
item, err := recorder.GetBackgroundJobInfo(recorder.GetCache(), jobName, defaultRecentRunCount, dayCountForStats)
if err != nil {
return nil, err
}
return &BackgroundJobResolver{jobInfo: item}, nil
}
func (r *backgroundJobConnectionResolver) Nodes(context.Context) ([]*BackgroundJobResolver, error) {
resolvers, err := r.compute()
if err != nil {
return nil, err
}
if r.first != nil && *r.first > -1 && len(resolvers) > int(*r.first) {
resolvers = resolvers[:*r.first]
}
return resolvers, nil
}
func (r *backgroundJobConnectionResolver) TotalCount(context.Context) (int32, error) {
resolvers, err := r.compute()
if err != nil {
return 0, err
}
return int32(len(resolvers)), nil
}
func (r *backgroundJobConnectionResolver) PageInfo(context.Context) (*graphqlutil.PageInfo, error) {
resolvers, err := r.compute()
if err != nil {
return nil, err
}
if r.first != nil && *r.first > -1 && len(resolvers) > int(*r.first) {
return graphqlutil.NextPageCursor(string(resolvers[*r.first-1].ID())), nil
}
return graphqlutil.HasNextPage(false), nil
}
func (r *backgroundJobConnectionResolver) compute() ([]*BackgroundJobResolver, error) {
recentRunCount := defaultRecentRunCount
if r.recentRunCount != nil {
recentRunCount = int(*r.recentRunCount)
}
r.once.Do(func() {
jobInfos, err := recorder.GetBackgroundJobInfos(recorder.GetCache(), r.after, recentRunCount, dayCountForStats)
if err != nil {
r.resolvers, r.err = nil, err
return
}
resolvers := make([]*BackgroundJobResolver, 0, len(jobInfos))
for _, jobInfo := range jobInfos {
resolvers = append(resolvers, &BackgroundJobResolver{jobInfo: jobInfo})
}
r.resolvers, r.err = resolvers, nil
})
return r.resolvers, r.err
}
func (r *BackgroundJobResolver) ID() graphql.ID {
return relay.MarshalID("BackgroundJob", r.jobInfo.ID)
}
func (r *BackgroundJobResolver) Name() string { return r.jobInfo.Name }
func (r *BackgroundJobResolver) Routines() []*RoutineResolver {
resolvers := make([]*RoutineResolver, 0, len(r.jobInfo.Routines))
for _, routine := range r.jobInfo.Routines {
resolvers = append(resolvers, &RoutineResolver{routine: routine})
}
return resolvers
}
func (r *RoutineResolver) Name() string { return r.routine.Name }
func (r *RoutineResolver) Type() recorder.RoutineType { return r.routine.Type }
func (r *RoutineResolver) Description() string { return r.routine.Description }
func (r *RoutineResolver) IntervalMs() *int32 {
if r.routine.IntervalMs == 0 {
return nil
}
return &r.routine.IntervalMs
}
func (r *RoutineResolver) Instances() []*RoutineInstanceResolver {
resolvers := make([]*RoutineInstanceResolver, 0, len(r.routine.Instances))
for _, routineInstance := range r.routine.Instances {
resolvers = append(resolvers, &RoutineInstanceResolver{instance: routineInstance})
}
return resolvers
}
func (r *RoutineInstanceResolver) HostName() string { return r.instance.HostName }
func (r *RoutineInstanceResolver) LastStartedAt() *gqlutil.DateTime {
return gqlutil.DateTimeOrNil(r.instance.LastStartedAt)
}
func (r *RoutineInstanceResolver) LastStoppedAt() *gqlutil.DateTime {
return gqlutil.DateTimeOrNil(r.instance.LastStoppedAt)
}
func (r *RoutineResolver) RecentRuns() []*RoutineRecentRunResolver {
resolvers := make([]*RoutineRecentRunResolver, 0, len(r.routine.RecentRuns))
for _, recentRun := range r.routine.RecentRuns {
resolvers = append(resolvers, &RoutineRecentRunResolver{recentRun: recentRun})
}
return resolvers
}
func (r *RoutineRecentRunResolver) At() gqlutil.DateTime {
return gqlutil.DateTime{Time: r.recentRun.At}
}
func (r *RoutineRecentRunResolver) HostName() string { return r.recentRun.HostName }
func (r *RoutineRecentRunResolver) DurationMs() int32 { return r.recentRun.DurationMs }
func (r *RoutineRecentRunResolver) ErrorMessage() *string {
if r.recentRun.ErrorMessage == "" {
return nil
}
return &r.recentRun.ErrorMessage
}
func (r *RoutineResolver) Stats() *RoutineStatsResolver {
return &RoutineStatsResolver{stats: r.routine.Stats}
}
func (r *RoutineStatsResolver) Since() *gqlutil.DateTime {
if r.stats.Since.IsZero() {
return nil
}
return gqlutil.DateTimeOrNil(&r.stats.Since)
}
func (r *RoutineStatsResolver) RunCount() int32 { return r.stats.RunCount }
func (r *RoutineStatsResolver) ErrorCount() int32 { return r.stats.ErrorCount }
func (r *RoutineStatsResolver) MinDurationMs() int32 { return r.stats.MinDurationMs }
func (r *RoutineStatsResolver) AvgDurationMs() int32 { return r.stats.AvgDurationMs }
func (r *RoutineStatsResolver) MaxDurationMs() int32 { return r.stats.MaxDurationMs }

View File

@ -625,6 +625,9 @@ func newSchemaResolver(db database.DB, gitserverClient gitserver.Client) *schema
"OutboundRequest": func(ctx context.Context, id graphql.ID) (Node, error) {
return r.outboundRequestByID(ctx, id)
},
"BackgroundJob": func(ctx context.Context, id graphql.ID) (Node, error) {
return r.backgroundJobByID(ctx, id)
},
"Executor": func(ctx context.Context, id graphql.ID) (Node, error) {
return executorByID(ctx, db, id)
},

View File

@ -279,6 +279,11 @@ func (r *NodeResolver) ToOutboundRequest() (*OutboundRequestResolver, bool) {
return n, ok
}
func (r *NodeResolver) ToBackgroundJob() (*BackgroundJobResolver, bool) {
n, ok := r.Node.(*BackgroundJobResolver)
return n, ok
}
func (r *NodeResolver) ToWebhook() (WebhookResolver, bool) {
n, ok := r.Node.(WebhookResolver)
return n, ok

View File

@ -1668,6 +1668,26 @@ type Query {
after: String
): OutboundRequestConnection!
"""
Get a list of background jobs that are currently known in the system.
"""
backgroundJobs(
"""
Returns the first n jobs. If omitted then it returns all of them.
"""
first: Int
"""
Opaque pagination cursor.
"""
after: String
"""
The maximum number of recent runs to return for each routine.
"""
recentRunCount: Int
): BackgroundJobConnection!
"""
(experimental)
Get invitation based on the JWT in the invitation URL
@ -8193,6 +8213,191 @@ type HTTPHeader {
values: [String!]!
}
"""
A list of background jobs that are currently known in the system
"""
type BackgroundJobConnection {
"""
A list of outbound requests.
"""
nodes: [BackgroundJob!]!
"""
The total number of outbound request log items in the connection.
"""
totalCount: Int!
"""
Pagination information.
"""
pageInfo: PageInfo!
}
"""
A single background job.
"""
type BackgroundJob implements Node {
"""
The background job ID.
"""
id: ID!
"""
The name of the job.
"""
name: String!
"""
The routines that run inside this job.
"""
routines: [BackgroundRoutine!]!
}
"""
A routine that runs inside a background job.
"""
type BackgroundRoutine {
"""
The name of the routine.
"""
name: String!
"""
Tells whether this is a periodic goroutine, a DB worker, or something else
"""
type: BackgroundRoutineType!
"""
Explains what the routine does.
"""
description: String!
"""
The interval at which the routine runs, if it's periodic.
"""
intervalMs: Int
"""
The instances of this routine that are running or ran recently. An instance means one routine on one host.
"""
instances: [BackgroundRoutineInstance!]!
"""
The recent runs of this routine.
"""
recentRuns: [BackgroundRoutineRecentRun!]!
"""
Some stats of the runs of this routine in the past few days.
"""
stats: BackgroundRoutineStats!
}
"""
Enum of the possible background routine types
"""
enum BackgroundRoutineType {
"""
Periodic routine
"""
PERIODIC
"""
Periodic routine with metrics set up
"""
PERIODIC_WITH_METRICS
"""
DB-backed worker
"""
DB_BACKED
"""
Custom routine
"""
CUSTOM
}
"""
One instance of the background routine, running on a host.
"""
type BackgroundRoutineInstance {
"""
The ID of the instance.
"""
hostName: String!
"""
The time the instance was last started. (If it's unknown, this will be null.)
"""
lastStartedAt: DateTime
"""
The time the instance was last stopped. (If it's unknown, this will be null.)
"""
lastStoppedAt: DateTime
}
"""
A single run of the routine. A run is not the start/stop event but the actual execution of the routine handler.
"""
type BackgroundRoutineRecentRun {
"""
The time the run started.
"""
at: DateTime!
"""
The name of the host that ran the routine.
"""
hostName: String!
"""
The duration of the run.
"""
durationMs: Int!
"""
The error message, if any.
"""
errorMessage: String
}
"""
Holds statistics about a background routine.
"""
type BackgroundRoutineStats {
"""
The start of the earliest day for which we have any runs registered.
"""
since: DateTime
"""
The number of times the routine ran in the period.
"""
runCount: Int!
"""
The number of times the routine run ended with an error.
"""
errorCount: Int!
"""
The minimum duration of a run, in milliseconds.
"""
minDurationMs: Int!
"""
The average duration of a run, in milliseconds.
"""
avgDurationMs: Int!
"""
The maximum duration of a run, in milliseconds.
"""
maxDurationMs: Int!
}
"""
The clone status of a repository.
"""

View File

@ -11,6 +11,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/goroutine/recorder"
"github.com/sourcegraph/sourcegraph/cmd/worker/internal/codeintel"
"github.com/sourcegraph/sourcegraph/cmd/worker/internal/encryption"
@ -41,6 +42,11 @@ const addr = ":3189"
type EnterpriseInit = func(ossDB database.DB)
type namedBackgroundRoutine struct {
Routine goroutine.BackgroundRoutine
JobName string
}
// Start runs the worker.
func Start(observationCtx *observation.Context, additionalJobs map[string]job.Job, registerEnterpriseMigrators oobmigration.RegisterMigratorsFunc, enterpriseInit EnterpriseInit) error {
registerMigrators := oobmigration.ComposeRegisterMigratorsFuncs(migrations.RegisterOSSMigrators, registerEnterpriseMigrators)
@ -102,7 +108,7 @@ func Start(observationCtx *observation.Context, additionalJobs map[string]job.Jo
// Create the background routines that the worker will monitor for its
// lifetime. There may be a non-trivial startup time on this step as we
// connect to external databases, wait for migrations, etc.
allRoutines, err := createBackgroundRoutines(observationCtx, jobs)
allRoutinesWithJobNames, err := createBackgroundRoutines(observationCtx, jobs)
if err != nil {
return err
}
@ -113,7 +119,20 @@ func Start(observationCtx *observation.Context, additionalJobs map[string]job.Jo
WriteTimeout: 10 * time.Minute,
Handler: httpserver.NewHandler(nil),
})
allRoutines = append(allRoutines, server)
serverRoutineWithJobName := namedBackgroundRoutine{Routine: server, JobName: "health-server"}
allRoutinesWithJobNames = append(allRoutinesWithJobNames, serverRoutineWithJobName)
// Register recorder in all routines that support it
recorderCache := recorder.GetCache()
rec := recorder.New(observationCtx.Logger, env.MyName, recorderCache)
for _, rj := range allRoutinesWithJobNames {
if recordable, ok := rj.Routine.(recorder.Recordable); ok {
recordable.SetJobName(rj.JobName)
recordable.RegisterRecorder(rec)
rec.Register(recordable)
}
}
rec.RegistrationDone()
// We're all set up now
// Respond positively to ready checks
@ -121,6 +140,11 @@ func Start(observationCtx *observation.Context, additionalJobs map[string]job.Jo
// This method blocks while the app is live - the following return is only to appease
// the type checker.
allRoutines := make([]goroutine.BackgroundRoutine, 0, len(allRoutinesWithJobNames))
for _, r := range allRoutinesWithJobNames {
allRoutines = append(allRoutines, r.Routine)
}
goroutine.MonitorBackgroundRoutines(context.Background(), allRoutines...)
return nil
}
@ -206,15 +230,15 @@ func emitJobCountMetrics(jobs map[string]job.Job) {
// createBackgroundRoutines runs the Routines function of each of the given jobs concurrently.
// If an error occurs from any of them, a fatal log message will be emitted. Otherwise, the set
// of background routines from each job will be returned.
func createBackgroundRoutines(observationCtx *observation.Context, jobs map[string]job.Job) ([]goroutine.BackgroundRoutine, error) {
func createBackgroundRoutines(observationCtx *observation.Context, jobs map[string]job.Job) ([]namedBackgroundRoutine, error) {
var (
allRoutines []goroutine.BackgroundRoutine
descriptions []string
allRoutinesWithJobNames []namedBackgroundRoutine
descriptions []string
)
for result := range runRoutinesConcurrently(observationCtx, jobs) {
if result.err == nil {
allRoutines = append(allRoutines, result.routines...)
allRoutinesWithJobNames = append(allRoutinesWithJobNames, result.routinesWithJobNames...)
} else {
descriptions = append(descriptions, fmt.Sprintf(" - %s: %s", result.name, result.err))
}
@ -225,13 +249,13 @@ func createBackgroundRoutines(observationCtx *observation.Context, jobs map[stri
return nil, errors.Newf("Failed to initialize worker:\n%s", strings.Join(descriptions, "\n"))
}
return allRoutines, nil
return allRoutinesWithJobNames, nil
}
type routinesResult struct {
name string
routines []goroutine.BackgroundRoutine
err error
name string
routinesWithJobNames []namedBackgroundRoutine
err error
}
// runRoutinesConcurrently returns a channel that will be populated with the return value of
@ -261,7 +285,14 @@ func runRoutinesConcurrently(observationCtx *observation.Context, jobs map[strin
defer wg.Done()
routines, err := jobs[name].Routines(ctx, observationCtx)
results <- routinesResult{name, routines, err}
routinesWithJobNames := make([]namedBackgroundRoutine, 0, len(routines))
for _, r := range routines {
routinesWithJobNames = append(routinesWithJobNames, namedBackgroundRoutine{
Routine: r,
JobName: name,
})
}
results <- routinesResult{name, routinesWithJobNames, err}
if err == nil {
jobLogger.Debug("Finished initializing job")

View File

@ -1605,6 +1605,7 @@ query {
db.UsersFunc.SetDefaultReturn(users)
bbProjects := database.NewMockBitbucketProjectPermissionsStore()
entry := workerutil.ExecutionLogEntry{Key: "key", Command: []string{"command"}, StartTime: mustParseTime("2020-01-06"), ExitCode: intPtr(1), Out: "out", DurationMs: intPtr(1)}
bbProjects.ListJobsFunc.SetDefaultReturn([]*types.BitbucketProjectPermissionJob{
{
ID: 1,
@ -1617,7 +1618,7 @@ query {
NumResets: 1,
NumFailures: 2,
LastHeartbeatAt: mustParseTime("2020-01-05"),
ExecutionLogs: []workerutil.ExecutionLogEntry{{Key: "key", Command: []string{"command"}, StartTime: mustParseTime("2020-01-06"), ExitCode: intPtr(1), Out: "out", DurationMs: intPtr(1)}},
ExecutionLogs: []types.ExecutionLogEntry{&entry},
WorkerHostname: "worker-hostname",
ProjectKey: "project-key",
ExternalServiceID: 1,

View File

@ -5,22 +5,23 @@ import (
"time"
"github.com/inconshreveable/log15"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/store"
"github.com/sourcegraph/sourcegraph/enterprise/internal/batches/types/scheduler/config"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/goroutine/recorder"
)
// Scheduler provides a scheduling service that moves changesets from the
// scheduled state to the queued state based on the current rate limit, if
// anything. Changesets are processed in a FIFO manner.
type Scheduler struct {
ctx context.Context
done chan struct{}
store *store.Store
ctx context.Context
done chan struct{}
store *store.Store
jobName string
recorder *recorder.Recorder
}
var _ goroutine.BackgroundRoutine = &Scheduler{}
var _ recorder.Recordable = &Scheduler{}
func NewScheduler(ctx context.Context, bstore *store.Store) *Scheduler {
return &Scheduler{
@ -31,6 +32,10 @@ func NewScheduler(ctx context.Context, bstore *store.Store) *Scheduler {
}
func (s *Scheduler) Start() {
if s.recorder != nil {
go s.recorder.LogStart(s)
}
// Set up a global backoff strategy where we start at 5 seconds, up to a
// minute, when we don't have any changesets to enqueue. Without this, an
// unlimited schedule will essentially busy-wait calling Take().
@ -51,6 +56,8 @@ func (s *Scheduler) Start() {
for {
select {
case delay := <-ticker.C:
start := time.Now()
// We can enqueue a changeset. Let's try to do so, ensuring that
// we always return a duration back down the delay channel.
if err := s.enqueueChangeset(); err != nil {
@ -65,6 +72,11 @@ func (s *Scheduler) Start() {
delay <- time.Duration(0)
}
duration := time.Since(start)
if s.recorder != nil {
go s.recorder.LogRun(s, duration, nil)
}
case <-validity.C:
// The schedule is no longer valid, so let's break out of this
// loop and build a new schedule.
@ -88,6 +100,9 @@ func (s *Scheduler) Start() {
}
func (s *Scheduler) Stop() {
if s.recorder != nil {
go s.recorder.LogStop(s)
}
s.done <- struct{}{}
close(s.done)
}
@ -137,3 +152,31 @@ func (b *backoff) next() time.Duration {
func (b *backoff) reset() {
b.current = b.init
}
func (s *Scheduler) Name() string {
return "batches-scheduler"
}
func (s *Scheduler) Type() recorder.RoutineType {
return recorder.CustomRoutine
}
func (s *Scheduler) JobName() string {
return s.jobName
}
func (s *Scheduler) SetJobName(jobName string) {
s.jobName = jobName
}
func (s *Scheduler) Description() string {
return "Scheduler for batch changes"
}
func (s *Scheduler) Interval() time.Duration {
return 1 * time.Minute // Actually between 5 sec and 1 min, changes dynamically
}
func (s *Scheduler) RegisterRecorder(recorder *recorder.Recorder) {
s.recorder = recorder
}

View File

@ -187,7 +187,8 @@ func ScanBitbucketProjectPermissionJob(rows dbutil.Scanner) (*types.BitbucketPro
}
for _, entry := range executionLogs {
job.ExecutionLogs = append(job.ExecutionLogs, workerutil.ExecutionLogEntry(entry))
logEntry := workerutil.ExecutionLogEntry(entry)
job.ExecutionLogs = append(job.ExecutionLogs, &logEntry)
}
for _, perm := range permissions {

View File

@ -162,6 +162,7 @@ func TestScanFirstBitbucketProjectPermissionsJob(t *testing.T) {
job, err := ScanBitbucketProjectPermissionJob(rows)
require.NoError(t, err)
require.NotNil(t, job)
entry := workerutil.ExecutionLogEntry{Key: "key", Command: []string{"command"}, StartTime: mustParseTime("2020-01-06"), ExitCode: intPtr(1), Out: "out", DurationMs: intPtr(1)}
require.Equal(t, &types.BitbucketProjectPermissionJob{
ID: 1,
State: "queued",
@ -173,7 +174,7 @@ func TestScanFirstBitbucketProjectPermissionsJob(t *testing.T) {
NumResets: 1,
NumFailures: 2,
LastHeartbeatAt: mustParseTime("2020-01-05"),
ExecutionLogs: []workerutil.ExecutionLogEntry{{Key: "key", Command: []string{"command"}, StartTime: mustParseTime("2020-01-06"), ExitCode: intPtr(1), Out: "out", DurationMs: intPtr(1)}},
ExecutionLogs: []types.ExecutionLogEntry{&entry},
WorkerHostname: "worker-hostname",
ProjectKey: "project-key",
ExternalServiceID: 1,

View File

@ -5,9 +5,8 @@ import (
"time"
"github.com/derision-test/glock"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/goroutine/recorder"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
@ -19,16 +18,21 @@ import (
// for more information and a step-by-step guide on how to implement a
// PeriodicBackgroundRoutine.
type PeriodicGoroutine struct {
interval time.Duration
handler unifiedHandler
operation *observation.Operation
clock glock.Clock
ctx context.Context // root context passed to the handler
cancel context.CancelFunc // cancels the root context
finished chan struct{} // signals that Start has finished
name string
routineType recorder.RoutineType
description string
jobName string
recorder *recorder.Recorder
interval time.Duration
handler unifiedHandler
operation *observation.Operation
clock glock.Clock
ctx context.Context // root context passed to the handler
cancel context.CancelFunc // cancels the root context
finished chan struct{} // signals that Start has finished
}
var _ BackgroundRoutine = &PeriodicGoroutine{}
var _ recorder.Recordable = &PeriodicGoroutine{}
type unifiedHandler interface {
Handler
@ -55,7 +59,7 @@ type Finalizer interface {
OnShutdown()
}
// HandlerFunc wraps a function so it can be used as a Handler.
// HandlerFunc wraps a function, so it can be used as a Handler.
type HandlerFunc func(ctx context.Context) error
func (f HandlerFunc) Handle(ctx context.Context) error {
@ -114,24 +118,36 @@ func newPeriodicGoroutine(ctx context.Context, name, description string, interva
}
return &PeriodicGoroutine{
handler: h,
interval: interval,
operation: operation,
clock: clock,
ctx: ctx,
cancel: cancel,
finished: make(chan struct{}),
name: name,
description: description,
handler: h,
interval: interval,
operation: operation,
clock: clock,
ctx: ctx,
cancel: cancel,
finished: make(chan struct{}),
}
}
// Start begins the process of calling the registered handler in a loop. This process will
// wait the interval supplied at construction between invocations.
func (r *PeriodicGoroutine) Start() {
if r.recorder != nil {
go r.recorder.LogStart(r)
}
defer close(r.finished)
loop:
for {
if shutdown, err := runPeriodicHandler(r.ctx, r.handler, r.operation); shutdown {
start := time.Now()
shutdown, err := runPeriodicHandler(r.ctx, r.handler, r.operation)
duration := time.Since(start)
if r.recorder != nil {
go r.recorder.LogRun(r, duration, err)
}
if shutdown {
break
} else if h, ok := r.handler.(ErrorHandler); ok && err != nil {
h.HandleError(err)
@ -153,10 +169,45 @@ loop:
// iteration of work, then break the loop in the Start method so that no new work
// is accepted. This method blocks until Start has returned.
func (r *PeriodicGoroutine) Stop() {
if r.recorder != nil {
go r.recorder.LogStop(r)
}
r.cancel()
<-r.finished
}
func (r *PeriodicGoroutine) Name() string {
return r.name
}
func (r *PeriodicGoroutine) Type() recorder.RoutineType {
if r.operation != nil {
return recorder.PeriodicWithMetrics
} else {
return recorder.PeriodicRoutine
}
}
func (r *PeriodicGoroutine) Description() string {
return r.description
}
func (r *PeriodicGoroutine) Interval() time.Duration {
return r.interval
}
func (r *PeriodicGoroutine) JobName() string {
return r.jobName
}
func (r *PeriodicGoroutine) SetJobName(jobName string) {
r.jobName = jobName
}
func (r *PeriodicGoroutine) RegisterRecorder(recorder *recorder.Recorder) {
r.recorder = recorder
}
func runPeriodicHandler(ctx context.Context, handler Handler, operation *observation.Operation) (_ bool, err error) {
if operation != nil {
tmpCtx, _, endObservation := operation.With(ctx, &err, observation.Args{})

View File

@ -0,0 +1,118 @@
package recorder
import (
"math"
"time"
"github.com/sourcegraph/sourcegraph/internal/rcache"
)
// JobInfo contains information about a job, including all its routines.
type JobInfo struct {
ID string `json:"id"`
Name string `json:"name"`
Routines []RoutineInfo
}
// RoutineInfo contains information about a routine.
type RoutineInfo struct {
Name string `json:"name"`
Type RoutineType `json:"type"`
JobName string `json:"jobName"`
Description string `json:"description"`
IntervalMs int32 `json:"intervalMs"` // Assumes that the routine runs at a fixed interval across all hosts.
Instances []RoutineInstanceInfo
RecentRuns []RoutineRun
Stats RoutineRunStats
}
// serializableRoutineInfo represents a single routine in a job, and is used for serialization in Redis.
type serializableRoutineInfo struct {
Name string `json:"name"`
Type RoutineType `json:"type"`
JobName string `json:"jobName"`
Description string `json:"description"`
Interval time.Duration `json:"interval"`
}
// RoutineInstanceInfo contains information about a routine instance.
// That is, a single version that's running (or ran) on a single node.
type RoutineInstanceInfo struct {
HostName string `json:"hostName"`
LastStartedAt *time.Time `json:"lastStartedAt"`
LastStoppedAt *time.Time `json:"LastStoppedAt"`
}
// RoutineRun contains information about a single run of a routine.
// That is, a single action that a running instance of a routine performed.
type RoutineRun struct {
At time.Time `json:"at"`
HostName string `json:"hostname"`
DurationMs int32 `json:"durationMs"`
ErrorMessage string `json:"errorMessage"`
}
// RoutineRunStats contains statistics about a routine.
type RoutineRunStats struct {
Since time.Time `json:"since"`
RunCount int32 `json:"runCount"`
ErrorCount int32 `json:"errorCount"`
MinDurationMs int32 `json:"minDurationMs"`
AvgDurationMs int32 `json:"avgDurationMs"`
MaxDurationMs int32 `json:"maxDurationMs"`
}
type RoutineType string
const (
PeriodicRoutine RoutineType = "PERIODIC"
PeriodicWithMetrics RoutineType = "PERIODIC_WITH_METRICS"
DBBackedRoutine RoutineType = "DB_BACKED"
CustomRoutine RoutineType = "CUSTOM"
)
const ttlSeconds = 604800 // 7 days
func GetCache() *rcache.Cache {
return rcache.NewWithTTL(keyPrefix, ttlSeconds)
}
// mergeStats returns the given stats updated with the given run data.
func mergeStats(a RoutineRunStats, b RoutineRunStats) RoutineRunStats {
// Calculate earlier "since"
var since time.Time
if a.Since.IsZero() {
since = b.Since
}
if b.Since.IsZero() {
since = a.Since
}
if !a.Since.IsZero() && !b.Since.IsZero() && a.Since.Before(b.Since) {
since = a.Since
}
// Calculate durations
var minDurationMs int32
if a.MinDurationMs == 0 || b.MinDurationMs < a.MinDurationMs {
minDurationMs = b.MinDurationMs
} else {
minDurationMs = a.MinDurationMs
}
avgDurationMs := int32(math.Round(float64(a.AvgDurationMs*a.RunCount+b.AvgDurationMs*b.RunCount) / float64(a.RunCount+b.RunCount)))
var maxDurationMs int32
if b.MaxDurationMs > a.MaxDurationMs {
maxDurationMs = b.MaxDurationMs
} else {
maxDurationMs = a.MaxDurationMs
}
// Return merged stats
return RoutineRunStats{
Since: since,
RunCount: a.RunCount + b.RunCount,
ErrorCount: a.ErrorCount + b.ErrorCount,
MinDurationMs: minDurationMs,
AvgDurationMs: avgDurationMs,
MaxDurationMs: maxDurationMs,
}
}

View File

@ -0,0 +1,143 @@
package recorder
import (
"testing"
"time"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/rcache"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/stretchr/testify/assert"
)
// TestLoggerAndReaderHappyPaths tests pretty much everything in the happy path of both the logger and the log reader.
func TestLoggerAndReaderHappyPaths(t *testing.T) {
rcache.SetupForTest(t)
// Create logger
c := rcache.NewWithTTL(keyPrefix, 1)
recorder := New(log.NoOp(), "test", c)
// Create routines
routine1 := newRoutineMock("routine-1", "a routine", 2*time.Minute)
routine1.SetJobName("job-1")
routine2 := newRoutineMock("routine-2", "another routine", 2*time.Minute)
routine2.SetJobName("job-1")
routine3 := newRoutineMock("routine-3", "a third routine", 2*time.Minute)
routine3.SetJobName("job-2")
// Register routines
recorder.Register(routine1)
recorder.Register(routine2)
recorder.Register(routine3)
recorder.RegistrationDone()
// Get infos
jobInfos, err := GetBackgroundJobInfos(c, "", 5, 7)
// Assert
assert.NoError(t, err)
assert.Len(t, jobInfos, 2)
assert.Equal(t, "job-1", jobInfos[0].ID)
assert.Equal(t, "job-1", jobInfos[0].Name)
assert.Equal(t, 2, len(jobInfos[0].Routines))
assert.Equal(t, "a routine", jobInfos[0].Routines[0].Description)
assert.Equal(t, 1, len(jobInfos[0].Routines[0].Instances))
assert.Equal(t, "test", jobInfos[0].Routines[0].Instances[0].HostName)
assertRoutineStats(t, jobInfos[0].Routines[0], "routine-1", false, false, 0, 0, 0, 0, 0, 0)
assertRoutineStats(t, jobInfos[0].Routines[1], "routine-2", false, false, 0, 0, 0, 0, 0, 0)
assertRoutineStats(t, jobInfos[1].Routines[0], "routine-3", false, false, 0, 0, 0, 0, 0, 0)
// Log some runs: 3x routine-1 (1x with error), 1x routine-2, 0x routine-3 (and stops)
recorder.LogStart(routine1)
recorder.LogStart(routine2)
recorder.LogStart(routine3)
recorder.LogRun(routine1, 10*time.Millisecond, nil)
recorder.LogRun(routine1, 15*time.Millisecond, nil)
recorder.LogRun(routine1, 20*time.Millisecond, errors.New("test error"))
recorder.LogRun(routine2, 2*time.Second, nil)
recorder.LogStop(routine3)
// Get infos again
jobInfos, err = GetBackgroundJobInfos(c, "", 5, 7)
// Assert
assert.NoError(t, err)
assert.Len(t, jobInfos, 2)
assertRoutineStats(t, jobInfos[0].Routines[0], "routine-1", true, false, 3, 3, 1, 10, 15, 20)
assertRoutineStats(t, jobInfos[0].Routines[1], "routine-2", true, false, 1, 1, 0, 2000, 2000, 2000)
assertRoutineStats(t, jobInfos[1].Routines[0], "routine-3", true, true, 0, 0, 0, 0, 0, 0)
}
func assertRoutineStats(t *testing.T, r RoutineInfo, name string,
started bool, stopped bool, rRuns int, sRuns int32, sErrors int32, sMin int32, sAvg int32, sMax int32) {
assert.Equal(t, name, r.Name)
if started {
assert.NotNil(t, r.Instances[0].LastStartedAt)
} else {
assert.Nil(t, r.Instances[0].LastStartedAt)
}
assert.Equal(t, rRuns, len(r.RecentRuns))
assert.Equal(t, sRuns, r.Stats.RunCount)
assert.Equal(t, sErrors, r.Stats.ErrorCount)
assert.Equal(t, sMin, r.Stats.MinDurationMs)
assert.Equal(t, sAvg, r.Stats.AvgDurationMs)
assert.Equal(t, sMax, r.Stats.MaxDurationMs)
if stopped {
assert.NotNil(t, r.Instances[0].LastStoppedAt)
} else {
assert.Nil(t, r.Instances[0].LastStoppedAt)
}
}
type RoutineMock struct {
name string
description string
jobName string
interval time.Duration
}
var _ Recordable = &RoutineMock{}
func newRoutineMock(name string, description string, interval time.Duration) *RoutineMock {
return &RoutineMock{
name: name,
description: description,
interval: interval,
}
}
func (r *RoutineMock) Start() {
// Do nothing
}
func (r *RoutineMock) Stop() {
// Do nothing
}
func (r *RoutineMock) Name() string {
return r.name
}
func (r *RoutineMock) Type() RoutineType {
return CustomRoutine
}
func (r *RoutineMock) JobName() string {
return r.jobName
}
func (r *RoutineMock) SetJobName(jobName string) {
r.jobName = jobName
}
func (r *RoutineMock) Description() string {
return r.description
}
func (r *RoutineMock) Interval() time.Duration {
return r.interval
}
func (r *RoutineMock) RegisterRecorder(*Recorder) {
// Do nothing
}

View File

@ -0,0 +1,292 @@
package recorder
import (
"encoding/json"
"sort"
"time"
"github.com/sourcegraph/sourcegraph/internal/rcache"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
// GetBackgroundJobInfos returns information about all known jobs.
func GetBackgroundJobInfos(c *rcache.Cache, after string, recentRunCount int, dayCountForStats int) ([]JobInfo, error) {
// Get known job names sorted by name, ascending
knownJobNames, err := getKnownJobNames(c)
if err != nil {
return nil, errors.Wrap(err, "get known job names")
}
// Get all jobs
jobs := make([]JobInfo, 0, len(knownJobNames))
for _, jobName := range knownJobNames {
job, err := GetBackgroundJobInfo(c, jobName, recentRunCount, dayCountForStats)
if err != nil {
return nil, errors.Wrapf(err, "get job info for %q", jobName)
}
jobs = append(jobs, job)
}
// Filter jobs by name to respect "after" (they are ordered by name)
if after != "" {
for i, job := range jobs {
if job.Name > after {
return jobs[i:], nil
}
}
}
return jobs, nil
}
// GetBackgroundJobInfo returns information about the given job.
func GetBackgroundJobInfo(c *rcache.Cache, jobName string, recentRunCount int, dayCountForStats int) (JobInfo, error) {
allHostNames, err := getKnownHostNames(c)
if err != nil {
return JobInfo{}, err
}
routines, err := getKnownRoutinesForJob(c, jobName)
if err != nil {
return JobInfo{}, err
}
routineInfos := make([]RoutineInfo, 0, len(routines))
for _, r := range routines {
routineInfo, err := getRoutineInfo(c, r, allHostNames, recentRunCount, dayCountForStats)
if err != nil {
return JobInfo{}, err
}
routineInfos = append(routineInfos, routineInfo)
}
return JobInfo{ID: jobName, Name: jobName, Routines: routineInfos}, nil
}
// getKnownJobNames returns a list of all known job names, ascending, filtered by their “last seen” time.
func getKnownJobNames(c *rcache.Cache) ([]string, error) {
jobNames, err := c.GetHashAll("knownJobNames")
if err != nil {
return nil, err
}
// Get the values only from the map
var values []string
for jobName, lastSeenString := range jobNames {
// Parse “last seen” time
lastSeen, err := time.Parse(time.RFC3339, lastSeenString)
if err != nil {
return nil, errors.Wrap(err, "failed to parse job last seen time")
}
// Check if job is still running
if time.Since(lastSeen) > seenTimeout {
continue
}
values = append(values, jobName)
}
// Sort the values
sort.Strings(values)
return values, nil
}
// getKnownHostNames returns a list of all known host names, ascending, filtered by their “last seen” time.
func getKnownHostNames(c *rcache.Cache) ([]string, error) {
hostNames, err := c.GetHashAll("knownHostNames")
if err != nil {
return nil, err
}
// Get the values only from the map
var values []string
for hostName, lastSeenString := range hostNames {
// Parse “last seen” time
lastSeen, err := time.Parse(time.RFC3339, lastSeenString)
if err != nil {
return nil, errors.Wrap(err, "failed to parse host last seen time")
}
// Check if job is still running
if time.Since(lastSeen) > seenTimeout {
continue
}
values = append(values, hostName)
}
// Sort the values
sort.Strings(values)
return values, nil
}
// getKnownRoutinesForJob returns a list of all known recordables for the given job name, ascending.
func getKnownRoutinesForJob(c *rcache.Cache, jobName string) ([]serializableRoutineInfo, error) {
// Get all recordables
routines, err := getKnownRoutines(c)
if err != nil {
return nil, err
}
// Filter by job name
var routinesForJob []serializableRoutineInfo
for _, r := range routines {
if r.JobName == jobName {
routinesForJob = append(routinesForJob, r)
}
}
// Sort them by name
sort.Slice(routinesForJob, func(i, j int) bool {
return routinesForJob[i].Name < routinesForJob[j].Name
})
return routinesForJob, nil
}
// getKnownRoutines returns a list of all known recordables, unfiltered, in no particular order.
func getKnownRoutines(c *rcache.Cache) ([]serializableRoutineInfo, error) {
rawItems, err := c.GetHashAll("knownRoutines")
if err != nil {
return nil, err
}
routines := make([]serializableRoutineInfo, 0, len(rawItems))
for _, rawItem := range rawItems {
var item serializableRoutineInfo
err := json.Unmarshal([]byte(rawItem), &item)
if err != nil {
return nil, err
}
routines = append(routines, item)
}
return routines, nil
}
// getRoutineInfo returns the info for a single routine: its instances, recent runs, and stats.
func getRoutineInfo(c *rcache.Cache, r serializableRoutineInfo, allHostNames []string, recentRunCount int, dayCountForStats int) (RoutineInfo, error) {
routineInfo := RoutineInfo{
Name: r.Name,
Type: r.Type,
JobName: r.JobName,
Description: r.Description,
IntervalMs: int32(r.Interval / time.Millisecond),
Instances: make([]RoutineInstanceInfo, 0, len(allHostNames)),
RecentRuns: []RoutineRun{},
}
// Collect instances
for _, hostName := range allHostNames {
instanceInfo, err := getRoutineInstanceInfo(c, r.JobName, r.Name, hostName)
if err != nil {
return RoutineInfo{}, err
}
routineInfo.Instances = append(routineInfo.Instances, instanceInfo)
}
// Collect recent runs
for _, hostName := range allHostNames {
recentRunsForHost, err := loadRecentRuns(c, r.JobName, r.Name, hostName, recentRunCount)
if err != nil {
return RoutineInfo{}, err
}
routineInfo.RecentRuns = append(routineInfo.RecentRuns, recentRunsForHost...)
}
// Sort recent runs descending by start time
sort.Slice(routineInfo.RecentRuns, func(i, j int) bool {
return routineInfo.RecentRuns[i].At.After(routineInfo.RecentRuns[j].At)
})
// Limit to recentRunCount
if len(routineInfo.RecentRuns) > recentRunCount {
routineInfo.RecentRuns = routineInfo.RecentRuns[:recentRunCount]
}
// Collect stats
stats, err := loadRunStats(c, r.JobName, r.Name, time.Now(), dayCountForStats)
if err != nil {
return RoutineInfo{}, errors.Wrap(err, "load run stats")
}
routineInfo.Stats = stats
return routineInfo, nil
}
// getRoutineInstanceInfo returns the info for a single routine instance.
func getRoutineInstanceInfo(c *rcache.Cache, jobName string, routineName string, hostName string) (RoutineInstanceInfo, error) {
var lastStart *time.Time
var lastStop *time.Time
lastStartBytes, ok := c.Get(jobName + ":" + routineName + ":" + hostName + ":" + "lastStart")
if ok {
t, err := time.Parse(time.RFC3339, string(lastStartBytes))
if err != nil {
return RoutineInstanceInfo{}, errors.Wrap(err, "parse last start")
}
lastStart = &t
}
lastStopBytes, ok := c.Get(jobName + ":" + routineName + ":" + hostName + ":" + "lastStop")
if ok {
t, err := time.Parse(time.RFC3339, string(lastStopBytes))
if err != nil {
return RoutineInstanceInfo{}, errors.Wrap(err, "parse last stop")
}
lastStop = &t
}
return RoutineInstanceInfo{
HostName: hostName,
LastStartedAt: lastStart,
LastStoppedAt: lastStop,
}, nil
}
// loadRecentRuns loads the recent runs for a routine, in no particular order.
func loadRecentRuns(c *rcache.Cache, jobName string, routineName string, hostName string, count int) ([]RoutineRun, error) {
recentRuns, err := c.GetLastListItems(jobName+":"+routineName+":"+hostName+":"+"recentRuns", count)
if err != nil {
return nil, errors.Wrap(err, "load recent runs")
}
runs := make([]RoutineRun, 0, len(recentRuns))
for _, serializedRun := range recentRuns {
var run RoutineRun
err := json.Unmarshal([]byte(serializedRun), &run)
if err != nil {
return nil, errors.Wrap(err, "deserialize run")
}
runs = append(runs, run)
}
return runs, nil
}
// loadRunStats loads the run stats for a routine.
func loadRunStats(c *rcache.Cache, jobName string, routineName string, now time.Time, dayCount int) (RoutineRunStats, error) {
// Get all stats
var stats RoutineRunStats
for i := 0; i < dayCount; i++ {
date := now.AddDate(0, 0, -i).Truncate(24 * time.Hour)
statsRaw, found := c.Get(jobName + ":" + routineName + ":runStats:" + date.Format("2006-01-02"))
if found {
var statsForDay RoutineRunStats
err := json.Unmarshal(statsRaw, &statsForDay)
if err != nil {
return RoutineRunStats{}, errors.Wrap(err, "deserialize stats for day")
}
stats = mergeStats(stats, statsForDay)
if stats.Since.IsZero() {
stats.Since = date
}
}
}
return stats, nil
}

View File

@ -0,0 +1,232 @@
package recorder
import (
"encoding/json"
"time"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/rcache"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
type Recordable interface {
Start()
Stop()
Name() string
Type() RoutineType
JobName() string
SetJobName(string)
Description() string
Interval() time.Duration
RegisterRecorder(recorder *Recorder)
}
type Recorder struct {
rcache *rcache.Cache
logger log.Logger
recordables []Recordable
hostName string
}
// seenTimeout is the maximum time we allow no activity for each host, job, and routine.
// After this time, we consider them non-existent.
const seenTimeout = 6 * 24 * time.Hour // 6 days
const keyPrefix = "background-job-logger"
// maxRecentRunsLength is the maximum number of recent runs we want to store for each routine.
const maxRecentRunsLength = 100
// New creates a new recorder.
func New(logger log.Logger, hostName string, cache *rcache.Cache) *Recorder {
return &Recorder{rcache: cache, logger: logger, hostName: hostName}
}
// Register registers a new routine with the recorder.
func (m *Recorder) Register(r Recordable) {
m.recordables = append(m.recordables, r)
}
// RegistrationDone should be called after all recordables have been registered.
// It saves the known job names, host names, and routine names in Redis, along with updating their “last seen” date/time.
func (m *Recorder) RegistrationDone() {
// Save/update known job names
for _, jobName := range m.collectAllJobNames() {
m.saveKnownJobName(jobName)
}
// Save known host name
m.saveKnownHostName()
// Save/update all known recordables
for _, r := range m.recordables {
m.saveKnownRoutine(r)
}
}
// collectAllJobNames collects all known job names in Redis.
func (m *Recorder) collectAllJobNames() []string {
names := make(map[string]struct{}, len(m.recordables))
for _, r := range m.recordables {
names[r.JobName()] = struct{}{}
}
allJobNames := make([]string, 0, len(names))
for name := range names {
allJobNames = append(allJobNames, name)
}
return allJobNames
}
// saveKnownJobName updates the “lastSeen” date of a known job in Redis. Also adds it to the list of known jobs if it doesnt exist.
func (m *Recorder) saveKnownJobName(jobName string) {
err := m.rcache.SetHashItem("knownJobNames", jobName, time.Now().Format(time.RFC3339))
if err != nil {
m.logger.Error("failed to save/update known job name", log.Error(err), log.String("jobName", jobName))
}
}
// saveKnownHostName updates the “lastSeen” date of a known host in Redis. Also adds it to the list of known hosts if it doesnt exist.
func (m *Recorder) saveKnownHostName() {
err := m.rcache.SetHashItem("knownHostNames", m.hostName, time.Now().Format(time.RFC3339))
if err != nil {
m.logger.Error("failed to save/update known host name", log.Error(err), log.String("hostName", m.hostName))
}
}
// saveKnownRoutine updates the routine in Redis. Also adds it to the list of known recordables if it doesnt exist.
func (m *Recorder) saveKnownRoutine(recordable Recordable) {
r := serializableRoutineInfo{
Name: recordable.Name(),
Type: recordable.Type(),
JobName: recordable.JobName(),
Description: recordable.Description(),
Interval: recordable.Interval(),
}
// Serialize Routine
routineJson, err := json.Marshal(r)
if err != nil {
m.logger.Error("failed to serialize routine", log.Error(err), log.String("routineName", r.Name))
return
}
// Save/update Routine
err = m.rcache.SetHashItem("knownRoutines", r.JobName+":"+r.Name, string(routineJson))
if err != nil {
m.logger.Error("failed to save/update known routine", log.Error(err), log.String("routineName", r.Name))
}
}
// LogStart logs the start of a routine.
func (m *Recorder) LogStart(r Recordable) {
m.rcache.Set(r.JobName()+":"+r.Name()+":"+m.hostName+":"+"lastStart", []byte(time.Now().Format(time.RFC3339)))
m.logger.Debug("Routine just started! 🚀", log.String("routine", r.Name()))
}
// LogStop logs the stop of a routine.
func (m *Recorder) LogStop(r Recordable) {
m.rcache.Set(r.JobName()+":"+r.Name()+":"+m.hostName+":"+"lastStop", []byte(time.Now().Format(time.RFC3339)))
m.logger.Debug("" + r.Name() + " just stopped! 🛑")
}
func (m *Recorder) LogRun(r Recordable, duration time.Duration, runErr error) {
durationMs := int32(duration.Milliseconds())
// Save the run
err := m.saveRun(r.JobName(), r.Name(), m.hostName, durationMs, runErr)
if err != nil {
m.logger.Error("failed to save run", log.Error(err))
}
// Save run stats
err = saveRunStats(m.rcache, r.JobName(), r.Name(), durationMs, runErr != nil)
if err != nil {
m.logger.Error("failed to save run stats", log.Error(err))
}
// Update host's and job's “last seen” dates
m.saveKnownHostName()
m.saveKnownJobName(r.JobName())
m.logger.Debug("Hello from " + r.Name() + "! 😄")
}
// saveRun saves a run in the Redis list under the "*:recentRuns" key and trims the list.
func (m *Recorder) saveRun(jobName string, routineName string, hostName string, durationMs int32, err error) error {
errorMessage := ""
if err != nil {
errorMessage = err.Error()
}
// Create Run
run := RoutineRun{
At: time.Now(),
HostName: hostName,
DurationMs: durationMs,
ErrorMessage: errorMessage,
}
// Serialize run
runJson, err := json.Marshal(run)
if err != nil {
return errors.Wrap(err, "serialize run")
}
// Save run
err = m.rcache.AddToList(jobName+":"+routineName+":"+hostName+":"+"recentRuns", string(runJson))
if err != nil {
return errors.Wrap(err, "save run")
}
// Trim list
err = m.rcache.LTrimList(jobName+":"+routineName+":"+hostName+":"+"recentRuns", maxRecentRunsLength)
if err != nil {
return errors.Wrap(err, "trim list")
}
return nil
}
// saveRunStats updates the run stats for a routine in Redis.
func saveRunStats(c *rcache.Cache, jobName string, routineName string, durationMs int32, errored bool) error {
// Prepare data
isoDate := time.Now().Format("2006-01-02")
// Get stats
lastStatsRaw, found := c.Get(jobName + ":" + routineName + ":runStats:" + isoDate)
var lastStats RoutineRunStats
if found {
err := json.Unmarshal(lastStatsRaw, &lastStats)
if err != nil {
return errors.Wrap(err, "deserialize last stats")
}
}
// Update stats
newStats := addRunToStats(lastStats, durationMs, errored)
// Serialize and save updated stats
updatedStatsJson, err := json.Marshal(newStats)
if err != nil {
return errors.Wrap(err, "serialize updated stats")
}
c.Set(jobName+":"+routineName+":runStats:"+isoDate, updatedStatsJson)
return nil
}
// addRunToStats adds a new run to the stats.
func addRunToStats(stats RoutineRunStats, durationMs int32, errored bool) RoutineRunStats {
errorCount := int32(0)
if errored {
errorCount = 1
}
return mergeStats(stats, RoutineRunStats{
Since: time.Now(),
RunCount: 1,
ErrorCount: errorCount,
MinDurationMs: durationMs,
AvgDurationMs: durationMs,
MaxDurationMs: durationMs,
})
}

View File

@ -194,6 +194,67 @@ func (r *Cache) KeyTTL(key string) (int, bool) {
return ttl, ttl >= 0
}
// SetHashItem sets a key in a HASH.
// If the HASH does not exist, it is created.
// If the key already exists and is a different type, an error is returned.
// If the hash key does not exist, it is created. If it exists, the value is overwritten.
func (r *Cache) SetHashItem(key string, hashKey string, hashValue string) error {
c := pool.Get()
defer func(c redis.Conn) {
_ = c.Close()
}(c)
_, err := c.Do("HSET", r.rkeyPrefix()+key, hashKey, hashValue)
return err
}
// GetHashItem gets a key in a HASH.
func (r *Cache) GetHashItem(key string, hashKey string) (string, error) {
c := pool.Get()
defer func(c redis.Conn) {
_ = c.Close()
}(c)
return redis.String(c.Do("HGET", r.rkeyPrefix()+key, hashKey))
}
// GetHashAll returns the members of the HASH stored at `key`, in no particular order.
func (r *Cache) GetHashAll(key string) (map[string]string, error) {
c := pool.Get()
defer func(c redis.Conn) {
_ = c.Close()
}(c)
return redis.StringMap(c.Do("HGETALL", r.rkeyPrefix()+key))
}
// AddToList adds a value to the end of a list.
// If the list does not exist, it is created.
func (r *Cache) AddToList(key string, value string) error {
c := pool.Get()
defer func(c redis.Conn) {
_ = c.Close()
}(c)
_, err := c.Do("RPUSH", r.rkeyPrefix()+key, value)
return err
}
// GetLastListItems returns the last `count` items in the list.
func (r *Cache) GetLastListItems(key string, count int) ([]string, error) {
c := pool.Get()
defer func(c redis.Conn) {
_ = c.Close()
}(c)
return redis.Strings(c.Do("LRANGE", r.rkeyPrefix()+key, -count, -1))
}
// LTrimList trims the list to the last `count` items.
func (r *Cache) LTrimList(key string, count int) error {
c := pool.Get()
defer func(c redis.Conn) {
_ = c.Close()
}(c)
_, err := c.Do("LTRIM", r.rkeyPrefix()+key, -count, -1)
return err
}
// DeleteMulti deletes the given keys.
func (r *Cache) DeleteMulti(keys ...string) {
for _, key := range keys {

View File

@ -283,6 +283,76 @@ func TestCache_ListKeys(t *testing.T) {
}
}
func TestCache_LTrimList(t *testing.T) {
SetupForTest(t)
c := NewWithTTL("some_prefix", 1)
c.AddToList("list", "1")
c.AddToList("list", "2")
c.AddToList("list", "3")
c.AddToList("list", "4")
c.AddToList("list", "5")
c.LTrimList("list", 2)
items, err := c.GetLastListItems("list", 8)
assert.NoError(t, err)
assert.Equal(t, 2, len(items))
assert.Equal(t, "4", items[0])
assert.Equal(t, "5", items[1])
}
func TestCache_Hashes(t *testing.T) {
SetupForTest(t)
// Test SetHashItem
c := NewWithTTL("simple_hash", 1)
err := c.SetHashItem("key", "hashKey1", "value1")
assert.NoError(t, err)
err = c.SetHashItem("key", "hashKey2", "value2")
assert.NoError(t, err)
// Test GetHashItem
val1, err := c.GetHashItem("key", "hashKey1")
assert.NoError(t, err)
assert.Equal(t, "value1", val1)
val2, err := c.GetHashItem("key", "hashKey2")
assert.NoError(t, err)
assert.Equal(t, "value2", val2)
val3, err := c.GetHashItem("key", "hashKey3")
assert.Error(t, err)
assert.Equal(t, "", val3)
// Test GetHashAll
all, err := c.GetHashAll("key")
assert.NoError(t, err)
assert.Equal(t, map[string]string{"hashKey1": "value1", "hashKey2": "value2"}, all)
}
func TestCache_Lists(t *testing.T) {
SetupForTest(t)
// Use AddToList to fill list
c := NewWithTTL("simple_list", 1)
err := c.AddToList("key", "item1")
assert.NoError(t, err)
err = c.AddToList("key", "item2")
assert.NoError(t, err)
err = c.AddToList("key", "item3")
assert.NoError(t, err)
// Use GetLastListItems to get last 2 items
last2, err := c.GetLastListItems("key", 2)
assert.NoError(t, err)
assert.Equal(t, []string{"item2", "item3"}, last2)
// Use GetLastListItems to get last 5 items (we only have 3)
last5, err := c.GetLastListItems("key", 5)
assert.NoError(t, err)
assert.Equal(t, []string{"item1", "item2", "item3"}, last5)
}
func bytes(s ...string) [][]byte {
t := make([][]byte, len(s))
for i, v := range s {

View File

@ -1,11 +1,15 @@
package types
import (
"database/sql/driver"
"time"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
)
type ExecutionLogEntry interface {
Scan(value any) error
Value() (driver.Value, error)
}
// BitbucketProjectPermissionJob represents a task to apply a set of permissions
// to all the repos of the given Bitbucket project.
type BitbucketProjectPermissionJob struct {
@ -19,7 +23,7 @@ type BitbucketProjectPermissionJob struct {
NumResets int
NumFailures int
LastHeartbeatAt time.Time
ExecutionLogs []workerutil.ExecutionLogEntry
ExecutionLogs []ExecutionLogEntry
WorkerHostname string
// Name of the Bitbucket Project

View File

@ -8,10 +8,9 @@ import (
"github.com/derision-test/glock"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/errcode"
"github.com/sourcegraph/sourcegraph/internal/goroutine/recorder"
"github.com/sourcegraph/sourcegraph/internal/hostname"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/trace"
@ -39,8 +38,17 @@ type Worker[T Record] struct {
wg sync.WaitGroup // tracks active handler routines
finished chan struct{} // signals that Start has finished
runningIDSet *IDSet // tracks the running job IDs to heartbeat
jobName string
recorder *recorder.Recorder
}
// dummyType is only for this compile-time test.
type dummyType struct{}
func (d dummyType) RecordID() int { return 0 }
var _ recorder.Recordable = &Worker[dummyType]{}
type WorkerOptions struct {
// Name denotes the name of the worker used to distinguish log messages and
// emitted metrics. The worker constructor will fail if this field is not
@ -131,6 +139,9 @@ func newWorker[T Record](ctx context.Context, store Store[T], handler Handler[T]
// Start begins polling for work from the underlying store and processing records.
func (w *Worker[T]) Start() {
if w.recorder != nil {
go w.recorder.LogStart(w)
}
defer close(w.finished)
// Create a background routine that periodically writes the current time to the running records.
@ -238,9 +249,12 @@ loop:
}
// Stop will cause the worker loop to exit after the current iteration. This is done by canceling the
// context passed to the dequeue operations (but not the handler perations). This method blocks until
// context passed to the dequeue operations (but not the handler operations). This method blocks until
// all handler goroutines have exited.
func (w *Worker[T]) Stop() {
if w.recorder != nil {
go w.recorder.LogStop(w)
}
w.dequeueCancel()
w.Wait()
}
@ -373,11 +387,16 @@ func (w *Worker[T]) handle(ctx, workerContext context.Context, record T) (err er
}
// Open namespace for logger to avoid key collisions on fields
start := time.Now()
handleErr = w.handler.Handle(ctx, handleLog.With(log.Namespace("handle")), record)
if w.options.MaximumRuntimePerJob > 0 && errors.Is(handleErr, context.DeadlineExceeded) {
handleErr = errors.Wrap(handleErr, fmt.Sprintf("job exceeded maximum execution time of %s", w.options.MaximumRuntimePerJob))
}
duration := time.Since(start)
if w.recorder != nil {
go w.recorder.LogRun(w, duration, handleErr)
}
if errcode.IsNonRetryable(handleErr) || handleErr != nil && w.isJobCanceled(record.RecordID(), handleErr, ctx.Err()) {
if marked, markErr := w.store.MarkFailed(workerContext, record.RecordID(), handleErr.Error()); markErr != nil {
@ -418,3 +437,31 @@ func (w *Worker[T]) preDequeueHook(ctx context.Context) (dequeueable bool, extra
return true, nil, nil
}
func (w *Worker[T]) Name() string {
return w.options.Name
}
func (w *Worker[T]) Type() recorder.RoutineType {
return recorder.DBBackedRoutine
}
func (w *Worker[T]) JobName() string {
return w.jobName
}
func (w *Worker[T]) SetJobName(jobName string) {
w.jobName = jobName
}
func (w *Worker[T]) Description() string {
return w.options.Description
}
func (w *Worker[T]) Interval() time.Duration {
return w.options.Interval
}
func (w *Worker[T]) RegisterRecorder(r *recorder.Recorder) {
w.recorder = r
}