Migrate deprecated rxjs functions/methods (#61222)

As a follow up to https://github.com/sourcegraph/sourcegraph/pull/61122 this commit updates the deprecated methods with the suggested replacements.

**Note**

This only migrates functions/methods that are replaced with something else, or whose deprecated call signature can easily be identified (e.g. `throwError(error)` -> `throwError(() => error)`). It's possible that there are more functions which deprecate a specific signature that we are using. I'll migrate those as I encounter them.

**Notes about `.toPromise`**

The instances of `.toPromise` converted here are all instances where the updated return value of `Promise<X|undefined>` did not produce a TS error (the ones with errors have been converted in #61122). However that doesn't mean that they can simply be replaced with `firstValueFrom`, `lastValueFrom` (these two methods throw errors when the source observable hasn't emitted a value before closing).
I update the callsites under two assumptions:
- Callsites that involve GraphQL requests will always emit a value and thus can be converted to using `lastValueFrom`/`firstValueFrom`.
- For other callsites we cannot make the assumption that the source observable emits before closing and thus they need a default value.
This commit is contained in:
Felix Kling 2024-04-08 11:23:34 +02:00 committed by GitHub
parent b93f6883f9
commit db3e905242
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
87 changed files with 776 additions and 730 deletions

View File

@ -15,7 +15,6 @@ import {
switchMap,
take,
concatMap,
mapTo,
catchError,
distinctUntilChanged,
} from 'rxjs/operators'
@ -424,7 +423,7 @@ main()
function validateSite(): Observable<boolean> {
return fetchSite(requestGraphQL).pipe(
mapTo(true),
map(() => true),
catchError(() => [false])
)
}
@ -458,7 +457,7 @@ function observeCurrentTabRepoSyncError(): Observable<boolean> {
function observeSourcegraphUrlValidation(): Observable<boolean> {
return merge(
// Whenever the URL was persisted to storage, we can assume it was validated before-hand
observeStorageKey('sync', 'sourcegraphURL').pipe(mapTo(true)),
observeStorageKey('sync', 'sourcegraphURL').pipe(map(() => true)),
timer(0, INTERVAL_FOR_SOURCEGRPAH_URL_CHECK).pipe(mergeMap(() => validateSite()))
)
}

View File

@ -4,8 +4,7 @@ import '../../config/content.entry'
// Polyfill before other imports.
import '../../shared/polyfills'
import { fromEvent, Subscription } from 'rxjs'
import { first } from 'rxjs/operators'
import { firstValueFrom, fromEvent, Subscription } from 'rxjs'
import { setLinkComponent, AnchorLink } from '@sourcegraph/wildcard'
@ -90,9 +89,9 @@ async function main(): Promise<void> {
// If the extension marker isn't present, inject it and listen for a custom event sent by the native
// integration to signal its activation.
injectExtensionMarker()
const nativeIntegrationActivationEventReceived = fromEvent(document, NATIVE_INTEGRATION_ACTIVATED)
.pipe(first())
.toPromise()
const nativeIntegrationActivationEventReceived = firstValueFrom(fromEvent(document, NATIVE_INTEGRATION_ACTIVATED), {
defaultValue: undefined,
})
let previousSubscription: Subscription
subscriptions.add(

View File

@ -9,7 +9,7 @@ import React, { useCallback, useEffect, useMemo, useState } from 'react'
import { trimEnd, uniq } from 'lodash'
import { createRoot } from 'react-dom/client'
import { from, noop, type Observable, of } from 'rxjs'
import { catchError, distinctUntilChanged, filter, map, mapTo } from 'rxjs/operators'
import { catchError, distinctUntilChanged, filter, map } from 'rxjs/operators'
import type { Optional } from 'utility-types'
import { asError, isDefined } from '@sourcegraph/common'
@ -90,7 +90,7 @@ const isFullPage = !new URLSearchParams(window.location.search).get('popup')
const validateSourcegraphUrl = (url: string): Observable<string | undefined> =>
fetchSite(options => createRequestGraphQL(url)(options)).pipe(
mapTo(undefined),
map(() => undefined),
catchError(error => {
const { message } = asError(error)
// We lose Error type when communicating from the background page

View File

@ -1,6 +1,6 @@
/* eslint rxjs/no-ignored-subscription: warn */
import { Subject, forkJoin } from 'rxjs'
import { debounceTime, distinctUntilChanged, map, publishReplay, refCount, repeat, switchMap } from 'rxjs/operators'
import { ReplaySubject, Subject, forkJoin } from 'rxjs'
import { debounceTime, distinctUntilChanged, map, repeat, switchMap, share } from 'rxjs/operators'
import type { SearchMatch } from '@sourcegraph/shared/src/search/stream'
import { fetchStreamSuggestions } from '@sourcegraph/shared/src/search/suggestions'
@ -116,8 +116,12 @@ export const createSuggestionFetcher = (): ((input: SuggestionInput) => void) =>
suggestions: suggestions.flat().flatMap(suggestion => createSuggestions(suggestion)),
handler,
})),
publishReplay(),
refCount()
share({
connector: () => new ReplaySubject(),
resetOnError: false,
resetOnComplete: false,
resetOnRefCountZero: false,
})
)
),
// But resubscribe afterwards

View File

@ -1,6 +1,6 @@
import { compact, find, head } from 'lodash'
import { interval, type Observable, type Subject } from 'rxjs'
import { filter, map, refCount, publishReplay } from 'rxjs/operators'
import { ReplaySubject, interval, type Observable, type Subject } from 'rxjs'
import { filter, map, share } from 'rxjs/operators'
import type { MutationRecordLike } from '../../util/dom'
import type { CodeHost } from '../shared/codeHost'
@ -355,8 +355,12 @@ export const observeMutations = (
filter(({ addedNodes, removedNodes }) => !!addedNodes.length || !!removedNodes.length),
// Wrap in an array, because that's how mutation observers emit events.
map(mutationRecord => [mutationRecord]),
publishReplay(),
refCount()
share({
connector: () => new ReplaySubject(),
resetOnError: false,
resetOnComplete: false,
resetOnRefCountZero: false,
})
)
}

View File

@ -1,7 +1,7 @@
import * as Sentry from '@sentry/browser'
import classNames from 'classnames'
import { fromEvent, lastValueFrom } from 'rxjs'
import { filter, map, mapTo, tap } from 'rxjs/operators'
import { filter, map, tap } from 'rxjs/operators'
import type { Omit } from 'utility-types'
import { fetchCache, type LineOrPositionOrRange, subtypeOf } from '@sourcegraph/common'
@ -304,7 +304,7 @@ export const gitlabCodeHost = subtypeOf<CodeHost>()({
tap(({ repository }) => {
repoNameOnSourcegraph.next(repository?.name ?? '')
}),
mapTo(true)
map(() => true)
)
),
})

View File

@ -1,6 +1,6 @@
import { from, type Observable, of, throwError, lastValueFrom } from 'rxjs'
import { fromFetch } from 'rxjs/fetch'
import { map, mapTo, switchMap, catchError } from 'rxjs/operators'
import { map, switchMap, catchError } from 'rxjs/operators'
import { memoizeObservable } from '@sourcegraph/common'
import { dataOrThrowErrors, gql, checkOk } from '@sourcegraph/http-client'
@ -222,7 +222,7 @@ const createPhabricatorRepo = memoizeObservable(
`,
variables,
mightContainPrivateInfo: true,
}).pipe(mapTo(undefined)),
}).pipe(map(() => undefined)),
({ callsign }) => callsign
)
@ -256,14 +256,14 @@ export function getRepoDetailsFromCallsign(
}),
switchMap((details: PhabricatorRepoDetails | null) => {
if (!details) {
return throwError(new Error('could not parse repo details'))
return throwError(() => new Error('could not parse repo details'))
}
return createPhabricatorRepo({
callsign,
repoName: details.rawRepoName,
phabricatorURL: window.location.origin,
requestGraphQL,
}).pipe(mapTo(details))
}).pipe(map(() => details))
})
)
}
@ -309,17 +309,17 @@ const getRepoDetailsFromRepoPHID = memoizeObservable(
return from(convertConduitRepoToRepoDetails(repo)).pipe(
switchMap((details: PhabricatorRepoDetails | null) => {
if (!details) {
return throwError(new Error('could not parse repo details'))
return throwError(() => new Error('could not parse repo details'))
}
if (!repo.fields?.callsign) {
return throwError(new Error('callsign not found'))
return throwError(() => new Error('callsign not found'))
}
return createPhabricatorRepo({
callsign: repo.fields.callsign,
repoName: details.rawRepoName,
phabricatorURL: window.location.origin,
requestGraphQL,
}).pipe(mapTo(details))
}).pipe(map(() => details))
})
)
})
@ -590,10 +590,10 @@ export function resolveDiffRevision(
return resolveRepo({ rawRepoName: stagingDetails.repoName, requestGraphQL }).pipe(
// If the repo is present on the Sourcegraph instance,
// use the commitID and repo name from the staging details.
mapTo({
map(() => ({
commitID: stagingDetails.ref.commit,
stagingRepoName: stagingDetails.repoName,
}),
})),
// Otherwise, create a one-off commit containing the patch on the Sourcegraph instance,
// and resolve to the commit ID returned by the Sourcegraph instance.
catchError(error => {

View File

@ -114,7 +114,7 @@ function mockQueryConduit(responseMap?: ConduitResponseMap): QueryConduitHelper<
return (endpoint, parameters) => {
const mock = responseMap?.[endpoint] || DEFAULT_CONDUIT_RESPONSES[endpoint]
if (!mock) {
return throwError(new Error(`No mock for endpoint ${endpoint}`))
return throwError(() => new Error(`No mock for endpoint ${endpoint}`))
}
return mock(parameters)
}

View File

@ -207,7 +207,7 @@ export function getPhabricatorState(
throw new Error(`Could not determine Phabricator state from stateUrl ${stateUrl}`)
} catch (error) {
return throwError(error)
return throwError(() => error)
}
}

View File

@ -4,8 +4,8 @@ import { promisify } from 'util'
import type { RenderResult } from '@testing-library/react'
import type { Remote } from 'comlink'
import { uniqueId, noop, pick } from 'lodash'
import { BehaviorSubject, NEVER, of, Subscription } from 'rxjs'
import { take, first } from 'rxjs/operators'
import { BehaviorSubject, firstValueFrom, lastValueFrom, NEVER, of, Subscription } from 'rxjs'
import { take } from 'rxjs/operators'
import { TestScheduler } from 'rxjs/testing'
import * as sinon from 'sinon'
import type * as sourcegraph from 'sourcegraph'
@ -207,7 +207,7 @@ describe('codeHost', () => {
}),
})
)
await wrapRemoteObservable(extensionHostAPI.viewerUpdates()).pipe(first()).toPromise()
await firstValueFrom(wrapRemoteObservable(extensionHostAPI.viewerUpdates()))
expect(getEditors(extensionAPI)).toEqual([
{
@ -277,7 +277,9 @@ describe('codeHost', () => {
platformContext: createMockPlatformContext(),
})
)
await wrapRemoteObservable(extensionHostAPI.viewerUpdates()).pipe(take(2)).toPromise()
await lastValueFrom(wrapRemoteObservable(extensionHostAPI.viewerUpdates()).pipe(take(2)), {
defaultValue: null,
})
expect(getEditors(extensionAPI)).toEqual([
{
@ -299,7 +301,7 @@ describe('codeHost', () => {
// // Simulate codeView1 removal
setTimeout(() => mutations.next([{ addedNodes: [], removedNodes: [codeView1] }]))
// One editor should have been removed, model should still exist
await wrapRemoteObservable(extensionHostAPI.viewerUpdates()).pipe(first()).toPromise()
await firstValueFrom(wrapRemoteObservable(extensionHostAPI.viewerUpdates()), { defaultValue: null })
expect(getEditors(extensionAPI)).toEqual([
{
@ -313,7 +315,7 @@ describe('codeHost', () => {
// // Simulate codeView2 removal
setTimeout(() => mutations.next([{ addedNodes: [], removedNodes: [codeView2] }]))
// // Second editor and model should have been removed
await wrapRemoteObservable(extensionHostAPI.viewerUpdates()).pipe(first()).toPromise()
await firstValueFrom(wrapRemoteObservable(extensionHostAPI.viewerUpdates()), { defaultValue: null })
expect(getEditors(extensionAPI)).toEqual([])
})
@ -357,7 +359,7 @@ describe('codeHost', () => {
extensionsController: createMockController(extensionHostAPI),
})
)
await wrapRemoteObservable(extensionHostAPI.viewerUpdates()).pipe(first()).toPromise()
await firstValueFrom(wrapRemoteObservable(extensionHostAPI.viewerUpdates()), { defaultValue: null })
expect(getEditors(extensionAPI).length).toEqual(1)
await tick()
codeView.dispatchEvent(new MouseEvent('mouseover'))

View File

@ -19,6 +19,7 @@ import {
BehaviorSubject,
fromEvent,
lastValueFrom,
throwError,
} from 'rxjs'
import {
catchError,
@ -33,8 +34,7 @@ import {
tap,
startWith,
distinctUntilChanged,
retryWhen,
mapTo,
retry,
take,
} from 'rxjs/operators'
@ -679,7 +679,7 @@ const isSafeToContinueCodeIntel = async ({
// Show "Configure Sourcegraph" button
console.warn('Repository is not cloned.', error)
const settingsURL = await observeUserSettingsURL(requestGraphQL).toPromise()
const settingsURL = await lastValueFrom(observeUserSettingsURL(requestGraphQL), { defaultValue: undefined })
if (rawRepoName && settingsURL) {
render(
@ -816,7 +816,7 @@ export async function handleCodeHost({
switchMap(([, { rawRepoName, revision }]) =>
resolveRevision({ repoName: rawRepoName, revision, requestGraphQL }).pipe(
retryWhenCloneInProgressError(),
mapTo(true),
map(() => true),
startWith(undefined)
)
),
@ -943,17 +943,9 @@ export async function handleCodeHost({
},
}),
// Retry auth errors after the user closed a sign-in tab
retryWhen(errors =>
errors.pipe(
// Don't swallow non-auth errors
tap(error => {
if (!isHTTPAuthError(error)) {
throw error
}
}),
switchMap(() => signInCloses)
)
),
retry({
delay: error => (isHTTPAuthError(error) ? signInCloses : throwError(() => error)),
}),
catchError(error => {
// Log errors but don't break the handling of other code views
console.error('Could not resolve file info for code view', error)

View File

@ -96,7 +96,7 @@ export const mockRequestGraphQL =
const nameMatch = request.match(/^\s*(?:query|mutation)\s+(\w+)/)
const requestName = nameMatch?.[1]
if (!requestName || !responseMap[requestName]) {
return throwError(new Error(`No mock for GraphQL request ${String(requestName)}`))
return throwError(() => new Error(`No mock for GraphQL request ${String(requestName)}`))
}
return responseMap[requestName](variables, mightContainPrivateInfo)
}

View File

@ -1,5 +1,5 @@
import { from, type Observable } from 'rxjs'
import { delay, filter, map, retryWhen, switchMap } from 'rxjs/operators'
import { from, throwError, timer, type Observable } from 'rxjs'
import { map, retry, switchMap } from 'rxjs/operators'
import { createAggregateError, memoizeObservable, sha256 } from '@sourcegraph/common'
import { dataOrThrowErrors, gql } from '@sourcegraph/http-client'
@ -167,19 +167,9 @@ export const resolveRevision = memoizeObservable(
export function retryWhenCloneInProgressError<T>(): (v: Observable<T>) => Observable<T> {
return (maybeErrors: Observable<T>) =>
maybeErrors.pipe(
retryWhen(errors =>
errors.pipe(
filter(error => {
if (isCloneInProgressErrorLike(error)) {
return true
}
// Don't swallow other errors.
throw error
}),
delay(1000)
)
)
retry({
delay: error => (isCloneInProgressErrorLike(error) ? timer(1000) : throwError(() => error)),
})
)
}

View File

@ -1,6 +1,6 @@
import { isEqual } from 'lodash'
import { EMPTY, NEVER, of, Subject, Subscription } from 'rxjs'
import { delay, distinctUntilChanged, filter, first, map, takeWhile } from 'rxjs/operators'
import { EMPTY, firstValueFrom, lastValueFrom, NEVER, of, Subject, Subscription } from 'rxjs'
import { delay, distinctUntilChanged, filter, map, takeWhile } from 'rxjs/operators'
import { TestScheduler } from 'rxjs/testing'
import { afterEach, beforeEach, describe, it, expect } from 'vitest'
@ -194,12 +194,7 @@ describe('Hoverifier', () => {
character: 6,
})
await hoverifier.hoverStateUpdates
.pipe(
filter(state => !!state.hoverOverlayProps),
first()
)
.toPromise()
await firstValueFrom(hoverifier.hoverStateUpdates.pipe(filter(state => !!state.hoverOverlayProps)))
await new Promise(resolve => setTimeout(resolve, 200))
@ -244,9 +239,10 @@ describe('Hoverifier', () => {
// Click https://sourcegraph.sgdev.org/github.com/gorilla/mux@cb4698366aa625048f3b815af6a0dea8aef9280a/-/blob/mux.go#L5:9
// and wait for the hovered token to be defined.
const hasHoveredToken = hoverifier.hoverStateUpdates
.pipe(takeWhile(({ hoveredTokenElement }) => !isDefined(hoveredTokenElement)))
.toPromise()
const hasHoveredToken = lastValueFrom(
hoverifier.hoverStateUpdates.pipe(takeWhile(({ hoveredTokenElement }) => !isDefined(hoveredTokenElement))),
{ defaultValue: null }
)
dispatchMouseEventAtPositionImpure('click', gitHubCodeView, {
line: 5,
character: 9,
@ -254,9 +250,10 @@ describe('Hoverifier', () => {
await hasHoveredToken
// Scroll down: the hover overlay should get hidden.
const hoverIsHidden = hoverifier.hoverStateUpdates
.pipe(takeWhile(({ hoverOverlayProps }) => isDefined(hoverOverlayProps)))
.toPromise()
const hoverIsHidden = lastValueFrom(
hoverifier.hoverStateUpdates.pipe(takeWhile(({ hoverOverlayProps }) => isDefined(hoverOverlayProps))),
{ defaultValue: null }
)
gitHubCodeView.getCodeElementFromLineNumber(gitHubCodeView.codeView, 2)!.scrollIntoView({ behavior: 'smooth' })
await hoverIsHidden
})
@ -564,12 +561,7 @@ describe('Hoverifier', () => {
character: 6,
})
await hoverifier.hoverStateUpdates
.pipe(
filter(state => !!state.hoverOverlayProps),
first()
)
.toPromise()
await firstValueFrom(hoverifier.hoverStateUpdates.pipe(filter(state => !!state.hoverOverlayProps)))
codeViewSubscription.unsubscribe()
@ -612,12 +604,7 @@ describe('Hoverifier', () => {
character: 6,
})
await hoverifier.hoverStateUpdates
.pipe(
filter(state => !!state.hoverOverlayProps),
first()
)
.toPromise()
await firstValueFrom(hoverifier.hoverStateUpdates.pipe(filter(state => !!state.hoverOverlayProps)))
codeViewSubscription.unsubscribe()

View File

@ -23,7 +23,6 @@ import {
filter,
first,
map,
mapTo,
observeOn,
share,
switchMap,
@ -459,11 +458,11 @@ export function createHoverifier<C extends object, D, A>({
overlayElement === null
? of(value)
: race(
fromEvent(overlayElement, 'mouseover').pipe(mapTo('suppress')),
fromEvent(overlayElement, 'mouseover').pipe(map(() => 'suppress')),
of('emit').pipe(delay(MOUSEOVER_DELAY))
).pipe(
filter(action => action === 'emit'),
mapTo(value)
map(() => value)
)
)
)
@ -744,14 +743,14 @@ export function createHoverifier<C extends object, D, A>({
scrollEvents.pipe(
filter(() => scrollBoundaries.some(elementOverlaps(hoveredTokenElement))),
first(),
mapTo({
map(() => ({
...rest,
hoveredTokenElement,
pinned: false,
hoverOrError: undefined,
hoveredToken: undefined,
actionsOrError: undefined,
})
}))
)
)
}

View File

@ -1,6 +1,6 @@
import { isEqual } from 'lodash'
import { type OperatorFunction, merge, combineLatest, of } from 'rxjs'
import { share, startWith, map, filter, mapTo, delay, endWith, scan, takeUntil, last } from 'rxjs/operators'
import { share, startWith, map, filter, delay, endWith, scan, takeUntil, last } from 'rxjs/operators'
export const LOADING = 'loading' as const
@ -67,7 +67,7 @@ export const emitLoading =
]).pipe(
// Show the loader when the provider is loading and has no result yet
filter(([{ isLoading, result }]) => isLoading && isEqual(result, emptyResultValue)),
mapTo(LOADING)
map(() => LOADING)
),
// Show the provider results (and no more loader) once the source emitted the first result or is no longer loading.
sharedSource.pipe(

View File

@ -1,23 +1,25 @@
import assert from 'assert'
import { of } from 'rxjs'
import { lastValueFrom, of } from 'rxjs'
import { describe, it } from 'vitest'
import { asObservable } from './asObservable'
describe('asObservable', () => {
it('accepts an Observable', async () => {
assert.equal(await asObservable(() => of(1)).toPromise(), 1)
assert.equal(await lastValueFrom(asObservable(() => of(1))), 1)
})
it('accepts a sync value', async () => {
assert.equal(await asObservable(() => 1).toPromise(), 1)
assert.equal(await lastValueFrom(asObservable(() => 1)), 1)
})
it('catches errors', async () => {
await assert.rejects(
() =>
asObservable(() => {
throw new Error('test')
}).toPromise(),
lastValueFrom(
asObservable(() => {
throw new Error('test')
})
),
/test/
)
})

View File

@ -11,6 +11,6 @@ export function asObservable<T>(function_: () => Observable<T> | T): Observable<
}
return of(value)
} catch (error) {
return throwError(error)
return throwError(() => error)
}
}

View File

@ -1,5 +1,5 @@
import type { Observable } from 'rxjs'
import { publishReplay, refCount, tap } from 'rxjs/operators'
import { ReplaySubject, type Observable } from 'rxjs'
import { share, tap } from 'rxjs/operators'
let allCachesResetSeq = 0
@ -40,8 +40,12 @@ export function memoizeObservable<P, T>(
return hit
}
const observable = func(parameters).pipe(
publishReplay(),
refCount(),
share({
connector: () => new ReplaySubject(1),
resetOnError: false,
resetOnComplete: false,
resetOnRefCountZero: false,
}),
tap({
error: () => {
cache.delete(key)

View File

@ -1,5 +1,5 @@
import type { Observable } from 'rxjs'
import { repeatWhen, delay, takeWhile, repeat } from 'rxjs/operators'
import { takeWhile, repeat } from 'rxjs/operators'
/**
* Mirrors values from the source observable and resubscribes to the source observable when it completes,
@ -17,7 +17,7 @@ export const repeatUntil =
) =>
(source: Observable<T>): Observable<T> =>
source.pipe(
options ? repeatWhen(completions => completions.pipe(delay(options.delay))) : repeat(),
repeat(options),
// Inclusive takeWhile so that the first value matching `select()` is emitted.
takeWhile(value => !select(value), true)
)

View File

@ -4,7 +4,7 @@ import { mdiHelpCircleOutline, mdiOpenInNew } from '@mdi/js'
import classNames from 'classnames'
import type * as H from 'history'
import { from, Subject, Subscription } from 'rxjs'
import { catchError, map, mapTo, mergeMap, startWith, tap } from 'rxjs/operators'
import { catchError, map, mergeMap, startWith, tap } from 'rxjs/operators'
import type { ActionContribution, Evaluated } from '@sourcegraph/client-api'
import { asError, type ErrorLike, isExternalLink, logger } from '@sourcegraph/common'
@ -142,7 +142,7 @@ export class ActionItem extends React.PureComponent<ActionItemProps, State, type
)
)
).pipe(
mapTo(null),
map(() => null),
catchError(error => [asError(error)]),
map(actionOrError => ({ actionOrError })),
tap(() => {

View File

@ -1,6 +1,6 @@
import { type Remote, proxy } from 'comlink'
import { type Unsubscribable, Subscription, from, of, lastValueFrom } from 'rxjs'
import { publishReplay, refCount, switchMap } from 'rxjs/operators'
import { type Unsubscribable, Subscription, from, of, lastValueFrom, ReplaySubject } from 'rxjs'
import { share, switchMap } from 'rxjs/operators'
import { logger } from '@sourcegraph/common'
@ -120,13 +120,20 @@ export const initMainThreadAPI = (
getEnabledExtensions: () => {
if (platformContext.getStaticExtensions) {
return proxySubscribable(
platformContext
.getStaticExtensions()
.pipe(
switchMap(staticExtensions =>
staticExtensions ? of(staticExtensions).pipe(publishReplay(1), refCount()) : of([])
)
platformContext.getStaticExtensions().pipe(
switchMap(staticExtensions =>
staticExtensions
? of(staticExtensions).pipe(
share({
connector: () => new ReplaySubject(1),
resetOnError: false,
resetOnComplete: false,
resetOnRefCountZero: false,
})
)
: of([])
)
)
)
}

View File

@ -1,7 +1,7 @@
import { proxy, type Remote } from 'comlink'
import { noop, sortBy } from 'lodash'
import { BehaviorSubject, EMPTY, type Unsubscribable } from 'rxjs'
import { mapTo } from 'rxjs/operators'
import { map } from 'rxjs/operators'
import type * as sourcegraph from 'sourcegraph'
import { logger } from '@sourcegraph/common'
@ -60,9 +60,12 @@ export function createExtensionAPIFactory(
}
return configuration
}
const configuration: typeof sourcegraph['configuration'] = Object.assign(state.settings.pipe(mapTo(undefined)), {
get: getConfiguration,
})
const configuration: typeof sourcegraph['configuration'] = Object.assign(
state.settings.pipe(map(() => undefined)),
{
get: getConfiguration,
}
)
// Workspace
const workspace: typeof sourcegraph['workspace'] = {
@ -80,7 +83,7 @@ export function createExtensionAPIFactory(
},
onDidOpenTextDocument: state.openedTextDocuments.asObservable(),
openedTextDocuments: state.openedTextDocuments.asObservable(),
onDidChangeRoots: state.roots.pipe(mapTo(undefined)),
onDidChangeRoots: state.roots.pipe(map(() => undefined)),
rootChanges: state.rootChanges.asObservable(),
versionContextChanges: EMPTY,
searchContextChanges: state.searchContextChanges.asObservable(),

View File

@ -1,4 +1,4 @@
import { BehaviorSubject, of } from 'rxjs'
import { BehaviorSubject, firstValueFrom, of } from 'rxjs'
import { filter, first } from 'rxjs/operators'
import sinon from 'sinon'
import type sourcegraph from 'sourcegraph'
@ -57,12 +57,12 @@ describe('Extension activation', () => {
)
// Wait for extensions to load to check on the spy
await haveInitialExtensionsLoaded
.pipe(
await firstValueFrom(
haveInitialExtensionsLoaded.pipe(
filter(haveLoaded => haveLoaded),
first()
)
.toPromise()
)
sinon.assert.calledWith(logEvent, 'ExtensionActivation', { extension_id: 'sourcegraph/fixture-extension' })
})

View File

@ -16,7 +16,7 @@ describe('Documents (integration)', () => {
test('adds new text documents', async () => {
const { extensionAPI, extensionHostAPI } = await integrationTestContext()
// const documents = from(extensionAPI.workspace.openedTextDocuments).pipe(take(1)).toPromise()
// const documents = firstValueFrom(from(extensionAPI.workspace.openedTextDocuments))
await extensionHostAPI.addTextDocumentIfNotExists({ uri: 'file:///f2', languageId: 'l2', text: 't2' })
assertToJSON(extensionAPI.workspace.textDocuments, [

View File

@ -1,5 +1,5 @@
import type { Remote } from 'comlink'
import { asyncScheduler, type Observable, of, type Unsubscribable } from 'rxjs'
import { asyncScheduler, type Observable, of, type Unsubscribable, lastValueFrom } from 'rxjs'
import { observeOn, take, toArray, map, first } from 'rxjs/operators'
import type * as sourcegraph from 'sourcegraph'
import { describe, expect, it } from 'vitest'
@ -145,23 +145,23 @@ function testLocationProvider<P>({
const subscription = registerProvider(extensionAPI)(['*'], labeledProvider('a'))
await extensionAPI.internal.sync()
expect(
await getResult('file:///f', extensionHostAPI)
.pipe(
await lastValueFrom(
getResult('file:///f', extensionHostAPI).pipe(
first(({ isLoading }) => !isLoading),
map(({ result }) => result)
)
.toPromise()
)
).toEqual(labeledProviderResults(['a']))
// Unregister the provider and ensure it's removed.
subscription.unsubscribe()
expect(
await getResult('file:///f', extensionHostAPI)
.pipe(
await lastValueFrom(
getResult('file:///f', extensionHostAPI).pipe(
first(({ isLoading }) => !isLoading),
map(({ result }) => result)
)
.toPromise()
)
).toEqual(emptyResultValue)
})
@ -184,12 +184,12 @@ function testLocationProvider<P>({
})
expect(
await getResult('file:///f2', extensionHostAPI)
.pipe(
await lastValueFrom(
getResult('file:///f2', extensionHostAPI).pipe(
first(({ isLoading }) => !isLoading),
map(({ result }) => result)
)
.toPromise()
)
).toEqual(labeledProviderResults(['a']))
subscription.unsubscribe()
@ -207,12 +207,12 @@ function testLocationProvider<P>({
})
)
await extensionAPI.internal.sync()
await getResult('file:///f', extensionHostAPI)
.pipe(
await lastValueFrom(
getResult('file:///f', extensionHostAPI).pipe(
first(({ isLoading }) => !isLoading),
map(({ result }) => result)
)
.toPromise()
)
await wait
})
@ -225,7 +225,7 @@ function testLocationProvider<P>({
await extensionAPI.internal.sync()
// Expect it to emit the first provider's result first (and not block on both providers being ready).
expect(await getResult('file:///f', extensionHostAPI).pipe(take(3), toArray()).toPromise()).toEqual([
expect(await lastValueFrom(getResult('file:///f', extensionHostAPI).pipe(take(3), toArray()))).toEqual([
{ isLoading: true, result: emptyResultValue },
{ isLoading: true, result: labeledProviderResults(['a']) },
{ isLoading: false, result: labeledProviderResults(['a', 'b']) },

View File

@ -1,7 +1,6 @@
/* eslint-disable jsdoc/check-param-names */
import { flatten, sortBy } from 'lodash'
import { from, isObservable, type Observable } from 'rxjs'
import { take } from 'rxjs/operators'
import { firstValueFrom, from, isObservable } from 'rxjs'
import * as sourcegraph from '../api'
import type { FilterDefinitions, LanguageSpec } from '../language-specs/language-spec'
@ -239,7 +238,7 @@ export function createProviders(
}
// Get the first definition and ensure it has a range
const def = asArray(await (from(result) as Observable<sourcegraph.Definition>).pipe(take(1)).toPromise())[0]
const def = asArray(await firstValueFrom(from(result), { defaultValue: undefined }))[0]
if (!def?.range) {
return null
}

View File

@ -1,6 +1,5 @@
import type { Remote } from 'comlink'
import { concat, from, of, Subscription, type Unsubscribable } from 'rxjs'
import { first } from 'rxjs/operators'
import { firstValueFrom, lastValueFrom, Subscription, type Unsubscribable } from 'rxjs'
import type { ActionContributionClientCommandUpdateConfiguration, Evaluated, KeyPath } from '@sourcegraph/client-api'
import { SourcegraphURL } from '@sourcegraph/common'
@ -66,14 +65,10 @@ export function registerBuiltinClientCommands(
registerCommand({
command: 'executeLocationProvider',
run: (id: string, uri: string, position: Position) =>
concat(
firstValueFrom(
wrapRemoteObservable(extensionHost.getLocations(id, { textDocument: { uri }, position })),
// Concat with [] to avoid undefined promise value when the getLocation observable completes
// without emitting. See https://github.com/ReactiveX/rxjs/issues/1736.
of([])
)
.pipe(first())
.toPromise(),
{ defaultValue: [] }
),
})
)
@ -101,13 +96,13 @@ export function registerBuiltinClientCommands(
// is set to `true`. It is up to the client (e.g. browser
// extension) to check that parameter and prevent the request
// from being sent to Sourcegraph.com.
from(
lastValueFrom(
context.requestGraphQL({
request: query,
variables,
mightContainPrivateInfo: true,
})
).toPromise(),
),
})
)

View File

@ -1,4 +1,4 @@
import { from, type Observable, of } from 'rxjs'
import { from, type Observable, of, lastValueFrom, firstValueFrom } from 'rxjs'
import { first } from 'rxjs/operators'
import { TestScheduler } from 'rxjs/testing'
import * as sinon from 'sinon'
@ -358,8 +358,8 @@ describe('getDefinitionURL', () => {
it('emits null if the locations result is empty', () =>
expect(
of({ isLoading: false, result: [] })
.pipe(
lastValueFrom(
of({ isLoading: false, result: [] }).pipe(
getDefinitionURL(
{ urlToFile, requestGraphQL },
{
@ -369,7 +369,7 @@ describe('getDefinitionURL', () => {
),
first(({ isLoading }) => !isLoading)
)
.toPromise()
)
).resolves.toStrictEqual({ isLoading: false, result: null }))
describe('if there is exactly 1 location result', () => {
@ -396,11 +396,11 @@ describe('getDefinitionURL', () => {
Partial<ViewStateSpec>
) => ''
)
await of({
isLoading: false,
result: [{ uri: 'git://r3?c3#f' }],
})
.pipe(
await lastValueFrom(
of({
isLoading: false,
result: [{ uri: 'git://r3?c3#f' }],
}).pipe(
getDefinitionURL(
{ urlToFile, requestGraphQL },
{
@ -410,7 +410,7 @@ describe('getDefinitionURL', () => {
),
first(({ isLoading }) => !isLoading)
)
.toPromise()
)
sinon.assert.calledOnce(urlToFile)
expect(urlToFile.getCalls()[0].args[0]).toMatchObject({
filePath: 'f',
@ -424,11 +424,11 @@ describe('getDefinitionURL', () => {
describe('when the result is inside the current root', () => {
it('emits the definition URL the user input revision (not commit SHA) of the root', () =>
expect(
of({
isLoading: false,
result: [{ uri: 'git://r3?c3#f' }],
})
.pipe(
lastValueFrom(
of({
isLoading: false,
result: [{ uri: 'git://r3?c3#f' }],
}).pipe(
getDefinitionURL(
{ urlToFile, requestGraphQL },
{
@ -438,18 +438,18 @@ describe('getDefinitionURL', () => {
),
first(({ isLoading }) => !isLoading)
)
.toPromise()
)
).resolves.toEqual({ isLoading: false, result: { url: '/r3@v3/-/blob/f', multiple: false } }))
})
describe('when the result is not inside the current root (different repo and/or commit)', () => {
it('emits the definition URL with range', () =>
expect(
of({
isLoading: false,
result: [FIXTURE_LOCATION_CLIENT],
})
.pipe(
lastValueFrom(
of({
isLoading: false,
result: [FIXTURE_LOCATION_CLIENT],
}).pipe(
getDefinitionURL(
{ urlToFile, requestGraphQL },
{
@ -459,16 +459,16 @@ describe('getDefinitionURL', () => {
),
first(({ isLoading }) => !isLoading)
)
.toPromise()
)
).resolves.toEqual({ isLoading: false, result: { url: '/r2@c2/-/blob/f2?L3:3', multiple: false } }))
it('emits the definition URL without range', () =>
expect(
of({
isLoading: false,
result: [{ ...FIXTURE_LOCATION_CLIENT, range: undefined }],
})
.pipe(
lastValueFrom(
of({
isLoading: false,
result: [{ ...FIXTURE_LOCATION_CLIENT, range: undefined }],
}).pipe(
getDefinitionURL(
{ urlToFile, requestGraphQL },
{
@ -478,27 +478,26 @@ describe('getDefinitionURL', () => {
),
first(({ isLoading }) => !isLoading)
)
.toPromise()
)
).resolves.toEqual({ isLoading: false, result: { url: '/r2@c2/-/blob/f2', multiple: false } }))
})
})
it('emits the definition panel URL if there is more than 1 location result', () =>
expect(
of({
isLoading: false,
result: [FIXTURE_LOCATION_CLIENT, { ...FIXTURE_LOCATION, uri: 'other' }],
})
.pipe(
firstValueFrom(
of({
isLoading: false,
result: [FIXTURE_LOCATION_CLIENT, { ...FIXTURE_LOCATION, uri: 'other' }],
}).pipe(
getDefinitionURL(
{ urlToFile, requestGraphQL },
{
getWorkspaceRoots: () => of([{ uri: 'git://r?c', inputRevision: 'v' }]),
},
FIXTURE_PARAMS
),
first()
)
)
.toPromise()
)
).resolves.toEqual({ isLoading: false, result: { url: '/r@v/-/blob/f?L2:2#tab=def', multiple: true } }))
})

View File

@ -24,7 +24,6 @@ import {
switchMap,
takeUntil,
scan,
mapTo,
} from 'rxjs/operators'
import { ContributableMenu, type TextDocumentPositionParameters } from '@sourcegraph/client-api'
@ -179,7 +178,7 @@ export function getHoverActionsContext(
),
definitionURLOrError.pipe(
filter(({ result }) => result !== null),
mapTo(true)
map(() => true)
)
),
]).pipe(

View File

@ -2,7 +2,7 @@
// implementation and therefore shouldn't have any runtime dependencies on
// Monaco
import { Observable, of } from 'rxjs'
import { Observable, lastValueFrom, of } from 'rxjs'
import { delay, takeUntil, switchMap } from 'rxjs/operators'
import type { SearchMatch } from '../stream'
@ -97,24 +97,18 @@ export function createCancelableFetchSuggestions(
})
})
return (
of(query)
.pipe(
// We use a delay here to implement a custom debounce. In the
// next step we check if the current completion request was
// cancelled in the meantime.
// This prevents us from needlessly running multiple suggestion
// queries.
delay(150),
switchMap(query => (aborted ? Promise.resolve([]) : fetchSuggestions(query))),
takeUntil(abort)
)
// toPromise may return undefined if the observable completes before
// a value was emitted . The return type was fixed in newer versions
// (and the method was actually deprecated).
// See https://rxjs.dev/deprecations/to-promise
.toPromise()
.then(result => result ?? [])
return lastValueFrom(
of(query).pipe(
// We use a delay here to implement a custom debounce. In the
// next step we check if the current completion request was
// cancelled in the meantime.
// This prevents us from needlessly running multiple suggestion
// queries.
delay(150),
switchMap(query => (aborted ? Promise.resolve([]) : fetchSuggestions(query))),
takeUntil(abort)
),
{ defaultValue: [] }
)
}
}

View File

@ -1,4 +1,4 @@
import { from } from 'rxjs'
import { from, lastValueFrom } from 'rxjs'
import { first, map, switchMap } from 'rxjs/operators'
import { isErrorLike } from '@sourcegraph/common'
@ -24,8 +24,8 @@ export function updateSettings(
edit: SettingsEditArg | string
) => Promise<void>
): Promise<void> {
return from(settings)
.pipe(
return lastValueFrom(
from(settings).pipe(
first(),
switchMap(settingsCascade => {
if (!settingsCascade.subjects) {
@ -55,8 +55,9 @@ export function updateSettings(
}
)
})
)
.toPromise()
),
{ defaultValue: undefined }
)
}
function toGQLKeyPath(keyPath: (string | number)[]): KeyPathSegment[] {
@ -94,28 +95,28 @@ function editSettings(
lastID: number | null,
edit: ConfigurationEdit
): Promise<void> {
return from(
requestGraphQL({
request: gql`
mutation EditSettings($subject: ID!, $lastID: Int, $edit: ConfigurationEdit!) {
configurationMutation(input: { subject: $subject, lastID: $lastID }) {
editConfiguration(edit: $edit) {
empty {
alwaysNil
return lastValueFrom(
from(
requestGraphQL({
request: gql`
mutation EditSettings($subject: ID!, $lastID: Int, $edit: ConfigurationEdit!) {
configurationMutation(input: { subject: $subject, lastID: $lastID }) {
editConfiguration(edit: $edit) {
empty {
alwaysNil
}
}
}
}
}
`,
variables: { subject, lastID, edit },
mightContainPrivateInfo: false,
})
)
.pipe(
`,
variables: { subject, lastID, edit },
mightContainPrivateInfo: false,
})
).pipe(
map(dataOrThrowErrors),
map(() => undefined)
)
.toPromise()
)
}
/**
@ -132,26 +133,26 @@ export function overwriteSettings(
lastID: number | null,
contents: string
): Promise<void> {
return from(
requestGraphQL({
request: gql`
mutation OverwriteSettings($subject: ID!, $lastID: Int, $contents: String!) {
settingsMutation(input: { subject: $subject, lastID: $lastID }) {
overwriteSettings(contents: $contents) {
empty {
alwaysNil
return lastValueFrom(
from(
requestGraphQL({
request: gql`
mutation OverwriteSettings($subject: ID!, $lastID: Int, $contents: String!) {
settingsMutation(input: { subject: $subject, lastID: $lastID }) {
overwriteSettings(contents: $contents) {
empty {
alwaysNil
}
}
}
}
}
`,
variables: { subject, lastID, contents },
mightContainPrivateInfo: false,
})
)
.pipe(
`,
variables: { subject, lastID, contents },
mightContainPrivateInfo: false,
})
).pipe(
map(dataOrThrowErrors),
map(() => undefined)
)
.toPromise()
)
}

View File

@ -1,7 +1,7 @@
import { type ApolloClient, gql } from '@apollo/client'
import { isEqual } from 'lodash'
import { Observable, of, type Subscription, from, ReplaySubject, type Subscriber, fromEvent } from 'rxjs'
import { distinctUntilChanged, map, mapTo, mergeAll, startWith, switchMap } from 'rxjs/operators'
import { distinctUntilChanged, map, mergeAll, startWith, switchMap } from 'rxjs/operators'
import { logger } from '@sourcegraph/common'
import { fromObservableQuery } from '@sourcegraph/http-client'
@ -264,7 +264,7 @@ class LocalOverrideBackend implements SettingsBackend {
of(temporarySettingsOverrideUpdate, fromEvent(window, 'storage')).pipe(
mergeAll(),
startWith(settings),
mapTo(settings)
map(() => settings)
)
),
map(settings => {

View File

@ -1,4 +1,4 @@
import { take } from 'rxjs/operators'
import { firstValueFrom } from 'rxjs'
import { logger } from '@sourcegraph/common'
@ -77,7 +77,9 @@ export async function migrateLocalStorageToTemporarySettings(storage: TemporaryS
for (const migration of migrations) {
// Use the first value of the setting to check if it exists.
// Only migrate if the setting is not already set.
const temporarySetting = await storage.get(migration.temporarySettingsKey).pipe(take(1)).toPromise()
const temporarySetting = await firstValueFrom(storage.get(migration.temporarySettingsKey), {
defaultValue: undefined,
})
if (temporarySetting === undefined) {
try {
const value = parse(migration.type, localStorage.getItem(migration.localStorageKey))

View File

@ -11,7 +11,7 @@ import { readFile, mkdir } from 'mz/fs'
import pTimeout from 'p-timeout'
import * as prettier from 'prettier'
import { Subject, Subscription, lastValueFrom, throwError } from 'rxjs'
import { first, timeoutWith } from 'rxjs/operators'
import { first, timeout } from 'rxjs/operators'
import { STATIC_ASSETS_PATH } from '@sourcegraph/build-config'
import { logger, asError, keyExistsIn } from '@sourcegraph/common'
@ -294,7 +294,11 @@ export const createSharedIntegrationTestContext = async <
(request: GraphQLRequestEvent<TGraphQlOperationNames>): request is GraphQLRequestEvent<O> =>
request.operationName === operationName
),
timeoutWith(4000, throwError(new Error(`Timeout waiting for GraphQL request "${operationName}"`)))
timeout({
first: 4000,
with: () =>
throwError(() => new Error(`Timeout waiting for GraphQL request "${operationName}"`)),
})
)
)
await triggerRequest()

View File

@ -45,7 +45,7 @@ const NOOP_MOCKS: Mocks = {
settings: of({ final: {}, subjects: [] }),
updateSettings: () => Promise.reject(new Error('Mocks#updateSettings not implemented')),
getGraphQLClient: () => Promise.reject(new Error('Mocks#getGraphQLClient not implemented')),
requestGraphQL: () => throwError(new Error('Mocks#queryGraphQL not implemented')),
requestGraphQL: () => throwError(() => new Error('Mocks#queryGraphQL not implemented')),
clientApplication: 'sourcegraph',
}

View File

@ -1,7 +1,7 @@
import React, { useCallback } from 'react'
import type { Subject } from 'rxjs'
import { delay, repeatWhen, tap } from 'rxjs/operators'
import { repeat, tap } from 'rxjs/operators'
import { H2 } from '@sourcegraph/wildcard'
@ -50,7 +50,7 @@ export const ExternalServiceSyncJobsList: React.FunctionComponent<ExternalServic
}
}
}),
repeatWhen(obs => obs.pipe(delay(1500)))
repeat({ delay: 1500 })
),
[externalServiceID, queryExternalServiceSyncJobs, updateSyncInProgress, updateNumberOfRepos]
)

View File

@ -1,6 +1,7 @@
import React, { useCallback, useState } from 'react'
import { delay, repeatWhen, retryWhen, filter, tap } from 'rxjs/operators'
import { timer } from 'rxjs'
import { repeat, retry, tap } from 'rxjs/operators'
import { FilteredConnection, type FilteredConnectionQueryArguments } from '../../../../../components/FilteredConnection'
import { ConnectionError } from '../../../../../components/FilteredConnection/ui'
@ -45,19 +46,15 @@ export const Workspaces: React.FunctionComponent<React.PropsWithChildren<Workspa
search: filters.search ?? null,
state: filters.state ?? null,
}).pipe(
repeatWhen(notifier => notifier.pipe(delay(2500))),
retryWhen(errors =>
errors.pipe(
filter(error => {
// Capture the error, but don't throw it so the data in the
// connection remains visible.
setError(error)
return true
}),
// Retry after 5s.
delay(5000)
)
),
repeat({ delay: 2500 }),
retry({
delay: error => {
// Capture the error, but don't throw it so the data in the
// connection remains visible.
setError(error)
return timer(5000)
},
}),
tap(() => {
// Reset the error when the query succeeds.
setError(undefined)

View File

@ -1,6 +1,6 @@
import React, { useCallback } from 'react'
import { repeatWhen, delay } from 'rxjs/operators'
import { repeat } from 'rxjs/operators'
import type { ErrorLike } from '@sourcegraph/common'
import { Container } from '@sourcegraph/wildcard'
@ -59,7 +59,7 @@ export const BatchChangeCloseChangesetsList: React.FunctionComponent<React.Props
onlyPublishedByThisBatchChange: true,
search: null,
onlyArchived: false,
}).pipe(repeatWhen(notifier => notifier.pipe(delay(5000)))),
}).pipe(repeat({ delay: 5000 })),
[batchChangeID, queryChangesets]
)

View File

@ -41,7 +41,7 @@ export const DeleteMonitorModal: React.FunctionComponent<React.PropsWithChildren
)
}
return throwError(new Error('Failed to delete: Code monitor ID not provided'))
return throwError(() => new Error('Failed to delete: Code monitor ID not provided'))
})
),
[deleteCodeMonitor, navigate, codeMonitor]

View File

@ -1,6 +1,7 @@
import { forwardRef, type HTMLAttributes, useContext, useLayoutEffect, useMemo, useRef, useState } from 'react'
import classNames from 'classnames'
import { lastValueFrom } from 'rxjs'
import { useMergeRefs } from 'use-callback-ref'
import { isDefined } from '@sourcegraph/common'
@ -120,7 +121,9 @@ export const BackendInsightView = forwardRef<HTMLElement, BackendInsightProps>((
async function handleFilterSave(filters: InsightFilters): Promise<void> {
const insightWithNewFilters = { ...insight, filters }
await updateInsight({ insightId: insight.id, nextInsightData: insightWithNewFilters }).toPromise()
await lastValueFrom(updateInsight({ insightId: insight.id, nextInsightData: insightWithNewFilters }), {
defaultValue: undefined,
})
telemetryService.log('CodeInsightsSearchBasedFilterUpdating')
telemetryRecorder.recordEvent('insights.searchBasedfilter', 'update', { metadata: { location: 0 } })

View File

@ -4,7 +4,8 @@ import { type Observable, of, throwError } from 'rxjs'
import type { CodeInsightsBackend } from './code-insights-backend'
const errorMockMethod = (methodName: string) => () => throwError(new Error(`Implement ${methodName} method first`))
const errorMockMethod = (methodName: string) => () =>
throwError(() => new Error(`Implement ${methodName} method first`))
/**
* Default context api class. Provides mock methods only.

View File

@ -1,6 +1,6 @@
import { type ApolloCache, type ApolloClient, gql } from '@apollo/client'
import { from, type Observable, of } from 'rxjs'
import { catchError, map, mapTo, switchMap } from 'rxjs/operators'
import { catchError, map, switchMap } from 'rxjs/operators'
import { isDefined } from '@sourcegraph/common'
import { fromObservableQuery } from '@sourcegraph/http-client'
@ -166,7 +166,7 @@ export class CodeInsightsGqlBackend implements CodeInsightsBackend {
cache.evict({ id: deletedDashboardReference })
},
})
).pipe(mapTo(undefined))
).pipe(map(() => undefined))
}
public updateDashboard = (input: DashboardUpdateInput): Observable<DashboardUpdateResult> =>

View File

@ -1,7 +1,7 @@
import { useCallback, useEffect, useState } from 'react'
import { escapeRegExp, partition, sum } from 'lodash'
import { defer, type Observable } from 'rxjs'
import { defer, lastValueFrom, type Observable } from 'rxjs'
import { map, retry } from 'rxjs/operators'
import { asError } from '@sourcegraph/common'
@ -114,13 +114,14 @@ async function getLangStats(inputs: GetInsightContentInputs): Promise<Categorica
const pathRegexp = path ? `file:^${escapeRegExp(path)}/` : ''
const query = `repo:^${escapeRegExp(repository)}$ ${pathRegexp}`
const stats = await defer(() => fetchLangStatsInsight(query))
.pipe(
const stats = await lastValueFrom(
defer(() => fetchLangStatsInsight(query)).pipe(
// The search may time out, but a retry is then likely faster because caches are warm
retry(3),
map(data => data.search!.stats)
)
.toPromise()
),
{ defaultValue: undefined }
)
if (!stats || stats.languages.length === 0) {
throw new Error("We couldn't find the language statistics, try changing the repository.")

View File

@ -2,7 +2,7 @@ import { useCallback } from 'react'
import copy from 'copy-to-clipboard'
import { merge, type Observable, of } from 'rxjs'
import { delay, startWith, switchMapTo, tap } from 'rxjs/operators'
import { delay, startWith, switchMap, tap } from 'rxjs/operators'
import { useEventObservable } from '@sourcegraph/wildcard'
@ -23,7 +23,7 @@ export function useCopyURLHandler(): useCopiedHandlerReturn {
(clicks: Observable<URLValue>) =>
clicks.pipe(
tap(copyDashboardURL),
switchMapTo(merge(of(true), of(false).pipe(delay(2000)))),
switchMap(() => merge(of(true), of(false).pipe(delay(2000)))),
startWith(false)
),
[copyDashboardURL]

View File

@ -1,5 +1,7 @@
import { useCallback, useContext, useState } from 'react'
import { lastValueFrom } from 'rxjs'
import { type ErrorLike, logger } from '@sourcegraph/common'
import { BillingCategory, BillingProduct } from '@sourcegraph/shared/src/telemetry'
import { TelemetryRecorder } from '@sourcegraph/telemetry'
@ -40,7 +42,7 @@ export function useDeleteInsight(
setError(undefined)
try {
await deleteInsight(insight.id).toPromise()
await lastValueFrom(deleteInsight(insight.id), { defaultValue: undefined })
const insightType = getTrackingTypeByInsightType(insight.type)
eventLogger.log('InsightRemoval', { insightType }, { insightType })

View File

@ -10,7 +10,7 @@ import {
scheduled,
type Unsubscribable,
} from 'rxjs'
import { mergeMap, map, takeUntil, take, catchError, takeWhile, switchMap, publish, refCount } from 'rxjs/operators'
import { mergeMap, map, takeUntil, take, catchError, takeWhile, switchMap, share } from 'rxjs/operators'
import { type ErrorLike, asError, isErrorLike } from '@sourcegraph/common'
@ -142,7 +142,13 @@ export function createUseParallelRequestsHook<T>({ maxRequests } = { maxRequests
const event: Request<D> = {
request,
// Makes cancel stream a hot observable
cancel: cancelStream.pipe(publish(), refCount()),
cancel: cancelStream.pipe(
share({
resetOnError: false,
resetOnComplete: false,
resetOnRefCountZero: false,
})
),
onComplete: result => {
if (isErrorLike(result)) {
return setState({ data: undefined, loading: false, error: result })
@ -192,7 +198,13 @@ export function createUseParallelRequestsHook<T>({ maxRequests } = { maxRequests
const event: Request<D> = {
request,
// Makes cancel stream a hot observable
cancel: cancelStream.pipe(publish(), refCount()),
cancel: cancelStream.pipe(
share({
resetOnError: false,
resetOnComplete: false,
resetOnRefCountZero: false,
})
),
onComplete: result => {
localRequestPool.current = localRequestPool.current.filter(request => request !== event)

View File

@ -1,5 +1,7 @@
import { useCallback, useContext, useState } from 'react'
import { lastValueFrom } from 'rxjs'
import { type ErrorLike, logger } from '@sourcegraph/common'
import { BillingCategory, BillingProduct } from '@sourcegraph/shared/src/telemetry'
import { TelemetryRecorder } from '@sourcegraph/telemetry'
@ -41,10 +43,13 @@ export function useRemoveInsightFromDashboard(
setError(undefined)
try {
await removeInsightFromDashboard({
insightId: insight.id,
dashboardId: dashboard.id,
}).toPromise()
await lastValueFrom(
removeInsightFromDashboard({
insightId: insight.id,
dashboardId: dashboard.id,
}),
{ defaultValue: undefined }
)
const insightType = getTrackingTypeByInsightType(insight.type)

View File

@ -2,6 +2,7 @@ import { type FC, type ReactElement, type ReactNode, useContext, useState, useMe
import { useApolloClient } from '@apollo/client'
import { mdiClose } from '@mdi/js'
import { lastValueFrom } from 'rxjs'
import { isErrorLike, pluralize } from '@sourcegraph/common'
import {
@ -53,11 +54,14 @@ export const AddInsightModal: FC<AddInsightModalProps> = props => {
try {
const prevInsights = getCachedDashboardInsights(client, dashboard.id)
await assignInsightsToDashboard({
id: dashboard.id,
prevInsightIds: prevInsights.map(insight => insight.id),
nextInsightIds: dashboardInsights.map(insight => insight.id),
}).toPromise()
await lastValueFrom(
assignInsightsToDashboard({
id: dashboard.id,
prevInsightIds: prevInsights.map(insight => insight.id),
nextInsightIds: dashboardInsights.map(insight => insight.id),
}),
{ defaultValue: undefined }
)
setSubmittingOrError(false)
onClose()
} catch (error) {

View File

@ -1,5 +1,7 @@
import { useContext, useState } from 'react'
import { lastValueFrom } from 'rxjs'
import { type ErrorLike, asError } from '@sourcegraph/common'
import { CodeInsightsBackendContext } from '../../../../../../core/backend/code-insights-backend-context'
@ -28,7 +30,7 @@ export function useDeleteDashboardHandler(props: UseDeleteDashboardHandlerProps)
setLoadingOrError(true)
try {
await deleteDashboard({ id: dashboard.id }).toPromise()
await lastValueFrom(deleteDashboard({ id: dashboard.id }), { defaultValue: undefined })
setLoadingOrError(false)
onSuccess()

View File

@ -1,6 +1,7 @@
import { type FC, useContext } from 'react'
import { useNavigate } from 'react-router-dom'
import { lastValueFrom } from 'rxjs'
import { useExperimentalFeatures } from '@sourcegraph/shared/src/settings/settings'
import { TelemetryV2Props } from '@sourcegraph/shared/src/telemetry'
@ -42,7 +43,7 @@ export const InsightCreationPage: FC<InsightCreationPageProps> = props => {
const handleInsightCreateRequest = async (event: InsightCreateEvent): Promise<unknown> => {
const { insight } = event
return createInsight({ insight, dashboardId: dashboardId ?? null }).toPromise()
return lastValueFrom(createInsight({ insight, dashboardId: dashboardId ?? null }), { defaultValue: undefined })
}
const handleInsightSuccessfulCreation = (): void => {

View File

@ -1,6 +1,7 @@
import { useContext } from 'react'
import { useNavigate } from 'react-router-dom'
import { lastValueFrom } from 'rxjs'
import { TelemetryV2Props } from '@sourcegraph/shared/src/telemetry'
import type { SubmissionErrors } from '@sourcegraph/wildcard'
@ -36,10 +37,13 @@ export function useEditPageHandlers(props: Props): useHandleSubmitOutput {
return
}
await updateInsight({
insightId: id,
nextInsightData: newInsight,
}).toPromise()
await lastValueFrom(
updateInsight({
insightId: id,
nextInsightData: newInsight,
}),
{ defaultValue: undefined }
)
const insightType = getTrackingTypeByInsightType(newInsight.type)
eventLogger.log('InsightEdit', { insightType }, { insightType })

View File

@ -2,6 +2,7 @@ import React, { useContext, useMemo, useState } from 'react'
import classNames from 'classnames'
import { useNavigate } from 'react-router-dom'
import { lastValueFrom } from 'rxjs'
import { useQuery } from '@sourcegraph/http-client'
import { TelemetryV2Props } from '@sourcegraph/shared/src/telemetry'
@ -121,7 +122,9 @@ export const StandaloneBackendInsight: React.FunctionComponent<StandaloneBackend
}
const handleFilterSave = async (filters: InsightFilters): Promise<void> => {
await updateInsight({ insightId: insight.id, nextInsightData: { ...insight, filters } }).toPromise()
await lastValueFrom(updateInsight({ insightId: insight.id, nextInsightData: { ...insight, filters } }), {
defaultValue: undefined,
})
setOriginalInsightFilters(filters)
telemetryService.log('CodeInsightsSearchBasedFilterUpdating')
telemetryRecorder.recordEvent('insights.searchBasedFilter', 'update', { metadata: { location: 1 } })

View File

@ -284,6 +284,6 @@ class ScheduleRepositoryPermissionsSyncActionContainer extends React.PureCompone
}
private scheduleRepositoryPermissions = async (): Promise<void> => {
await scheduleRepositoryPermissionsSync({ repository: this.props.repo.id }).toPromise()
await scheduleRepositoryPermissionsSync({ repository: this.props.repo.id })
}
}

View File

@ -53,7 +53,7 @@ export const AuthenticatedEditSearchContextPage: React.FunctionComponent<
repositories: SearchContextRepositoryRevisionsInput[]
): Observable<SearchContextFields> => {
if (!id) {
return throwError(new Error('Cannot update search context with undefined ID'))
return throwError(() => new Error('Cannot update search context with undefined ID'))
}
platformContext.telemetryRecorder.recordEvent('searchContext', 'update')
return updateSearchContext({ id, searchContext, repositories }, platformContext)
@ -67,7 +67,9 @@ export const AuthenticatedEditSearchContextPage: React.FunctionComponent<
fetchSearchContextBySpec(spec, platformContext).pipe(
switchMap(searchContext => {
if (!searchContext.viewerCanManage) {
return throwError(new Error('You do not have sufficient permissions to edit this context.'))
return throwError(
() => new Error('You do not have sufficient permissions to edit this context.')
)
}
return of(searchContext)
}),

View File

@ -267,7 +267,7 @@ export const SearchContextForm: React.FunctionComponent<React.PropsWithChildren<
return parseRepositories().pipe(
switchMap(repositoriesOrError => {
if (repositoriesOrError.type === 'errors') {
return throwError(createAggregateError(repositoriesOrError.errors))
return throwError(() => createAggregateError(repositoriesOrError.errors))
}
return of(repositoriesOrError.repositories)
}),
@ -275,7 +275,7 @@ export const SearchContextForm: React.FunctionComponent<React.PropsWithChildren<
)
}
if (queryState.query.trim().length === 0) {
return throwError(new Error('Search query has to be non-empty.'))
return throwError(() => new Error('Search query has to be non-empty.'))
}
return of({ input: { ...partialInput, query: queryState.query }, repositories: [] })
}),

View File

@ -163,7 +163,7 @@ export const ErrorStory: StoryFn = () => (
{webProps => (
<SearchContextPage
{...webProps}
fetchSearchContextBySpec={() => throwError(new Error('Failed to fetch search context'))}
fetchSearchContextBySpec={() => throwError(() => new Error('Failed to fetch search context'))}
platformContext={NOOP_PLATFORM_CONTEXT}
authenticatedUser={mockAuthenticatedUser}
/>

View File

@ -3,7 +3,7 @@ import React, { useCallback, useEffect } from 'react'
import { mdiPlus } from '@mdi/js'
import { Navigate } from 'react-router-dom'
import { merge, of, type Observable } from 'rxjs'
import { catchError, concatMapTo, map, tap } from 'rxjs/operators'
import { catchError, concatMap, map, tap } from 'rxjs/operators'
import { asError, type ErrorLike, isErrorLike } from '@sourcegraph/common'
import { dataOrThrowErrors, gql } from '@sourcegraph/http-client'
@ -63,7 +63,7 @@ const UserCreateSubscriptionNode: React.FunctionComponent<React.PropsWithChildre
submits.pipe(
tap(event => event.preventDefault()),
tap(() => props.telemetryRecorder.recordEvent('admin.productSubscriptions', 'create')),
concatMapTo(
concatMap(() =>
merge(
of('saving' as const),
createProductSubscription({ accountID: props.node.id }).pipe(

View File

@ -1,7 +1,7 @@
import * as React from 'react'
import { type Observable, Subject, Subscription } from 'rxjs'
import { catchError, filter, map, mapTo, startWith, switchMap, tap } from 'rxjs/operators'
import { catchError, filter, map, startWith, switchMap, tap } from 'rxjs/operators'
import { Timestamp } from '@sourcegraph/branded/src/components/Timestamp'
import { asError, type ErrorLike, isErrorLike, logger } from '@sourcegraph/common'
@ -59,7 +59,10 @@ function deleteExternalAccount(externalAccount: Scalars['ID']): Observable<void>
}
`,
{ externalAccount }
).pipe(map(dataOrThrowErrors), mapTo(undefined))
).pipe(
map(dataOrThrowErrors),
map(() => undefined)
)
}
export interface ExternalAccountNodeProps {
@ -93,7 +96,7 @@ export class ExternalAccountNode extends React.PureComponent<ExternalAccountNode
filter(() => window.confirm('Really delete the association with this external account?')),
switchMap(() =>
deleteExternalAccount(this.props.node.id).pipe(
mapTo(null),
map(() => null),
catchError(error => [asError(error)]),
map(deletionOrError => ({ deletionOrError })),
tap(() => {
@ -105,10 +108,10 @@ export class ExternalAccountNode extends React.PureComponent<ExternalAccountNode
)
)
)
.subscribe(
stateUpdate => this.setState(stateUpdate),
error => logger.error(error)
)
.subscribe({
next: stateUpdate => this.setState(stateUpdate),
error: error => logger.error(error),
})
)
}

View File

@ -249,7 +249,7 @@ class ScheduleUserPermissionsSyncActionContainer extends React.PureComponent<Sch
}
private scheduleUserPermissions = async (): Promise<void> => {
await scheduleUserPermissionsSync({ user: this.props.user.id }).toPromise()
await scheduleUserPermissionsSync({ user: this.props.user.id })
}
}

View File

@ -1,5 +1,5 @@
import type { Observable } from 'rxjs'
import { mapTo, map, tap } from 'rxjs/operators'
import { lastValueFrom } from 'rxjs'
import { map, tap } from 'rxjs/operators'
import { resetAllMemoizationCaches } from '@sourcegraph/common'
import { gql, dataOrThrowErrors } from '@sourcegraph/http-client'
@ -11,20 +11,22 @@ import type {
ScheduleUserPermissionsSyncVariables,
} from '../../../../graphql-operations'
export function scheduleUserPermissionsSync(args: { user: Scalars['ID'] }): Observable<void> {
return requestGraphQL<ScheduleUserPermissionsSyncResult, ScheduleUserPermissionsSyncVariables>(
gql`
mutation ScheduleUserPermissionsSync($user: ID!) {
scheduleUserPermissionsSync(user: $user) {
alwaysNil
export function scheduleUserPermissionsSync(args: { user: Scalars['ID'] }): Promise<void> {
return lastValueFrom(
requestGraphQL<ScheduleUserPermissionsSyncResult, ScheduleUserPermissionsSyncVariables>(
gql`
mutation ScheduleUserPermissionsSync($user: ID!) {
scheduleUserPermissionsSync(user: $user) {
alwaysNil
}
}
}
`,
args
).pipe(
map(dataOrThrowErrors),
tap(() => resetAllMemoizationCaches()),
mapTo(undefined)
`,
args
).pipe(
map(dataOrThrowErrors),
tap(() => resetAllMemoizationCaches()),
map(() => undefined)
)
)
}

View File

@ -4,7 +4,7 @@ import { escapeRegExp } from 'lodash'
// eslint-disable-next-line no-restricted-imports
import { type marked, Renderer } from 'marked'
import { type Observable, forkJoin, of } from 'rxjs'
import { startWith, catchError, mapTo, map, switchMap } from 'rxjs/operators'
import { startWith, catchError, map, switchMap } from 'rxjs/operators'
import * as uuid from 'uuid'
import { renderMarkdown, asError, isErrorLike } from '@sourcegraph/common'
@ -263,11 +263,11 @@ export class Notebook {
}
// Identical if/else if branches to make the TS compiler happy
if (block.type === 'query') {
observables.push(block.output.pipe(mapTo(DONE)))
observables.push(block.output.pipe(map(() => DONE)))
} else if (block.type === 'file') {
observables.push(block.output.pipe(mapTo(DONE)))
observables.push(block.output.pipe(map(() => DONE)))
} else if (block.type === 'symbol') {
observables.push(block.output.pipe(mapTo(DONE)))
observables.push(block.output.pipe(map(() => DONE)))
}
}
// We store output observables and join them into a single observable,

View File

@ -1,4 +1,4 @@
import { of } from 'rxjs'
import { lastValueFrom, of } from 'rxjs'
import { describe, expect, it } from 'vitest'
import { SymbolKind } from '../../graphql-operations'
@ -9,47 +9,48 @@ const SOURCEGRAPH_URL = 'https://sourcegraph.com'
describe('serialize', () => {
it('should serialize empty markdown text', async () => {
const serialized = await serializeBlockInput({ type: 'md', input: { text: '' } }, SOURCEGRAPH_URL).toPromise()
const serialized = await lastValueFrom(
serializeBlockInput({ type: 'md', input: { text: '' } }, SOURCEGRAPH_URL)
)
expect(serialized).toStrictEqual('')
})
it('should serialize markdown text', async () => {
const serialized = await serializeBlockInput(
{ type: 'md', input: { text: '# Title' } },
SOURCEGRAPH_URL
).toPromise()
const serialized = await lastValueFrom(
serializeBlockInput({ type: 'md', input: { text: '# Title' } }, SOURCEGRAPH_URL)
)
expect(serialized).toStrictEqual('# Title')
})
it('should serialize empty query', async () => {
const serialized = await serializeBlockInput(
{ type: 'query', input: { query: '' } },
SOURCEGRAPH_URL
).toPromise()
const serialized = await lastValueFrom(
serializeBlockInput({ type: 'query', input: { query: '' } }, SOURCEGRAPH_URL)
)
expect(serialized).toStrictEqual('')
})
it('should serialize a query', async () => {
const serialized = await serializeBlockInput(
{ type: 'query', input: { query: 'repo:a b' } },
SOURCEGRAPH_URL
).toPromise()
const serialized = await lastValueFrom(
serializeBlockInput({ type: 'query', input: { query: 'repo:a b' } }, SOURCEGRAPH_URL)
)
expect(serialized).toStrictEqual('repo:a b')
})
it('should serialize a file without range', async () => {
const serialized = await serializeBlockInput(
{
type: 'file',
input: {
repositoryName: 'github.com/sourcegraph/sourcegraph',
revision: 'feature',
filePath: 'client/web/index.ts',
lineRange: null,
const serialized = await lastValueFrom(
serializeBlockInput(
{
type: 'file',
input: {
repositoryName: 'github.com/sourcegraph/sourcegraph',
revision: 'feature',
filePath: 'client/web/index.ts',
lineRange: null,
},
},
},
SOURCEGRAPH_URL
).toPromise()
SOURCEGRAPH_URL
)
)
expect(serialized).toStrictEqual(
`${SOURCEGRAPH_URL}/github.com/sourcegraph/sourcegraph@feature/-/blob/client/web/index.ts`
@ -57,21 +58,23 @@ describe('serialize', () => {
})
it('should serialize a file with range', async () => {
const serialized = await serializeBlockInput(
{
type: 'file',
input: {
repositoryName: 'github.com/sourcegraph/sourcegraph',
revision: 'feature',
filePath: 'client/web/index.ts',
lineRange: {
startLine: 100,
endLine: 123,
const serialized = await lastValueFrom(
serializeBlockInput(
{
type: 'file',
input: {
repositoryName: 'github.com/sourcegraph/sourcegraph',
revision: 'feature',
filePath: 'client/web/index.ts',
lineRange: {
startLine: 100,
endLine: 123,
},
},
},
},
SOURCEGRAPH_URL
).toPromise()
SOURCEGRAPH_URL
)
)
expect(serialized).toStrictEqual(
`${SOURCEGRAPH_URL}/github.com/sourcegraph/sourcegraph@feature/-/blob/client/web/index.ts?L101-123`
@ -79,32 +82,34 @@ describe('serialize', () => {
})
it('should serialize a symbol block', async () => {
const serialized = await serializeBlockInput(
{
type: 'symbol',
input: {
repositoryName: 'github.com/sourcegraph/sourcegraph',
revision: 'feature',
filePath: 'client/web/index.ts',
symbolName: 'func a',
symbolContainerName: 'class',
symbolKind: SymbolKind.FUNCTION,
lineContext: 3,
},
output: of({
symbolFoundAtLatestRevision: true,
effectiveRevision: 'effective-feature',
symbolRange: {
start: { line: 1, character: 1 },
end: { line: 1, character: 3 },
const serialized = await lastValueFrom(
serializeBlockInput(
{
type: 'symbol',
input: {
repositoryName: 'github.com/sourcegraph/sourcegraph',
revision: 'feature',
filePath: 'client/web/index.ts',
symbolName: 'func a',
symbolContainerName: 'class',
symbolKind: SymbolKind.FUNCTION,
lineContext: 3,
},
highlightSymbolRange: { startLine: 1, startCharacter: 1, endLine: 1, endCharacter: 3 },
highlightLineRange: { startLine: 0, endLine: 6 },
highlightedLines: [],
}),
},
SOURCEGRAPH_URL
).toPromise()
output: of({
symbolFoundAtLatestRevision: true,
effectiveRevision: 'effective-feature',
symbolRange: {
start: { line: 1, character: 1 },
end: { line: 1, character: 3 },
},
highlightSymbolRange: { startLine: 1, startCharacter: 1, endLine: 1, endCharacter: 3 },
highlightLineRange: { startLine: 0, endLine: 6 },
highlightedLines: [],
}),
},
SOURCEGRAPH_URL
)
)
expect(serialized).toStrictEqual(
`${SOURCEGRAPH_URL}/github.com/sourcegraph/sourcegraph@effective-feature/-/blob/client/web/index.ts?L1:1-1:3#symbolName=func+a&symbolContainerName=class&symbolKind=FUNCTION&lineContext=3`

View File

@ -5,7 +5,7 @@ import AlertCircleIcon from 'mdi-react/AlertCircleIcon'
import MapSearchIcon from 'mdi-react/MapSearchIcon'
import { Route, Routes, type NavigateFunction } from 'react-router-dom'
import { combineLatest, merge, type Observable, of, Subject, Subscription } from 'rxjs'
import { catchError, distinctUntilChanged, map, mapTo, startWith, switchMap } from 'rxjs/operators'
import { catchError, distinctUntilChanged, map, startWith, switchMap } from 'rxjs/operators'
import { type ErrorLike, isErrorLike, asError, logger } from '@sourcegraph/common'
import { gql, dataOrThrowErrors } from '@sourcegraph/http-client'
@ -170,7 +170,7 @@ export class OrgArea extends React.Component<OrgAreaProps> {
// Fetch organization.
this.subscriptions.add(
combineLatest([nameChanges, merge(this.refreshRequests.pipe(mapTo(false)), of(true))])
combineLatest([nameChanges, merge(this.refreshRequests.pipe(map(() => false)), of(true))])
.pipe(
switchMap(([name, forceRefresh]) => {
type PartialStateUpdate = Pick<State, 'orgOrError'>

View File

@ -2,7 +2,7 @@ import * as React from 'react'
import { Navigate } from 'react-router-dom'
import { concat, type Observable, Subject, Subscription } from 'rxjs'
import { catchError, concatMap, distinctUntilKeyChanged, map, mapTo, tap, withLatestFrom } from 'rxjs/operators'
import { catchError, concatMap, distinctUntilKeyChanged, map, tap, withLatestFrom } from 'rxjs/operators'
import { asError, type ErrorLike, isErrorLike, logger } from '@sourcegraph/common'
import { dataOrThrowErrors, gql } from '@sourcegraph/http-client'
@ -204,6 +204,9 @@ export const OrgInvitationPageLegacy = withAuthenticatedUser(
}
`,
args
).pipe(map(dataOrThrowErrors), mapTo(undefined))
).pipe(
map(dataOrThrowErrors),
map(() => undefined)
)
}
)

View File

@ -133,20 +133,20 @@ export function removeUserFromOrganization(args: {
* @returns Observable that emits `undefined`, then completes
*/
export function updateOrganization(id: Scalars['ID'], displayName: string): Promise<void> {
return requestGraphQL<UpdateOrganizationResult, UpdateOrganizationVariables>(
gql`
mutation UpdateOrganization($id: ID!, $displayName: String) {
updateOrganization(id: $id, displayName: $displayName) {
id
return lastValueFrom(
requestGraphQL<UpdateOrganizationResult, UpdateOrganizationVariables>(
gql`
mutation UpdateOrganization($id: ID!, $displayName: String) {
updateOrganization(id: $id, displayName: $displayName) {
id
}
}
`,
{
id,
displayName,
}
`,
{
id,
displayName,
}
)
.pipe(
).pipe(
map(({ data, errors }) => {
if (!data || (errors && errors.length > 0)) {
eventLogger.log('UpdateOrgSettingsFailed')
@ -156,7 +156,7 @@ export function updateOrganization(id: Scalars['ID'], displayName: string): Prom
return
})
)
.toPromise()
)
}
export const ORG_CODE_FEATURE_FLAG_EMAIL_INVITE = 'org-email-invites'

View File

@ -237,20 +237,20 @@ function inviteUserToOrganization(
}
function addUserToOrganization(username: string, organization: Scalars['ID']): Promise<void> {
return requestGraphQL<AddUserToOrganizationResult, AddUserToOrganizationVariables>(
gql`
mutation AddUserToOrganization($organization: ID!, $username: String!) {
addUserToOrganization(organization: $organization, username: $username) {
alwaysNil
return lastValueFrom(
requestGraphQL<AddUserToOrganizationResult, AddUserToOrganizationVariables>(
gql`
mutation AddUserToOrganization($organization: ID!, $username: String!) {
addUserToOrganization(organization: $organization, username: $username) {
alwaysNil
}
}
`,
{
username,
organization,
}
`,
{
username,
organization,
}
)
.pipe(
).pipe(
map(({ data, errors }) => {
if (!data?.addUserToOrganization || (errors && errors.length > 0)) {
eventLogger.log('AddOrgMemberFailed')
@ -259,7 +259,7 @@ function addUserToOrganization(username: string, organization: Scalars['ID']): P
eventLogger.log('OrgMemberAdded')
})
)
.toPromise()
)
}
interface InvitedNotificationProps {

View File

@ -1,6 +1,6 @@
import type { ApolloClient, ApolloQueryResult, ObservableQuery } from '@apollo/client'
import { from } from 'rxjs'
import { map, publishReplay, refCount, shareReplay } from 'rxjs/operators'
import { from, ReplaySubject } from 'rxjs'
import { map, share, shareReplay } from 'rxjs/operators'
import { createAggregateError, asError, logger } from '@sourcegraph/common'
import { fromObservableQueryPromise, getDocumentNode } from '@sourcegraph/http-client'
@ -41,8 +41,12 @@ export function createPlatformContext(props: {
map(mapViewerSettingsResult),
shareReplay(1),
map(gqlToCascade),
publishReplay(1),
refCount()
share({
connector: () => new ReplaySubject(1),
resetOnError: false,
resetOnComplete: false,
resetOnRefCountZero: false,
})
),
updateSettings: async (subject, edit) => {
const settingsQueryWatcher = await settingsQueryWatcherPromise

View File

@ -2,6 +2,7 @@ import delay from 'delay'
import expect from 'expect'
import { applyEdits, parse, modify } from 'jsonc-parser'
import { describe, before, beforeEach, after, afterEach, test } from 'mocha'
import { lastValueFrom } from 'rxjs'
import { map } from 'rxjs/operators'
import { logger } from '@sourcegraph/common'
@ -198,10 +199,9 @@ describe.skip('Core functionality regression test suite', () => {
}
}
`
const response = await gqlClientWithToken
.queryGraphQL(currentUsernameQuery)
.pipe(map(dataOrThrowErrors))
.toPromise()
const response = await lastValueFrom(
gqlClientWithToken.queryGraphQL(currentUsernameQuery).pipe(map(dataOrThrowErrors))
)
expect(response).toEqual({ currentUser: { username: testUsername } })
const gqlClientWithInvalidToken = createGraphQLClient({
@ -210,7 +210,7 @@ describe.skip('Core functionality regression test suite', () => {
})
await expect(
gqlClientWithInvalidToken.queryGraphQL(currentUsernameQuery).pipe(map(dataOrThrowErrors)).toPromise()
lastValueFrom(gqlClientWithInvalidToken.queryGraphQL(currentUsernameQuery).pipe(map(dataOrThrowErrors)))
).rejects.toThrowError('401 Unauthorized')
})

View File

@ -1,5 +1,5 @@
import { describe, test } from 'mocha'
import { merge } from 'rxjs'
import { lastValueFrom, merge } from 'rxjs'
import { fromFetch } from 'rxjs/fetch'
import { catchError } from 'rxjs/operators'
@ -17,14 +17,18 @@ describe('Native integrations regression test suite', () => {
'/.assets/extension/css/contentPage.main.bundle.css',
'/.assets/extension/extensionHostFrame.html',
]
await merge(
...assets.map(asset =>
fromFetch(new URL(asset, sourcegraphBaseUrl).href, { selector: response => [checkOk(response)] }).pipe(
catchError(() => {
throw new Error('Error fetching native integration asset: ' + asset)
})
await lastValueFrom(
merge(
...assets.map(asset =>
fromFetch(new URL(asset, sourcegraphBaseUrl).href, {
selector: response => [checkOk(response)],
}).pipe(
catchError(() => {
throw new Error('Error fetching native integration asset: ' + asset)
})
)
)
)
).toPromise()
)
})
})

View File

@ -2,8 +2,8 @@
* Provides convenience functions for interacting with the Sourcegraph API from tests.
*/
import { zip, timer, concat, throwError, defer, type Observable, lastValueFrom } from 'rxjs'
import { map, tap, retryWhen, delayWhen, take, mergeMap } from 'rxjs/operators'
import { zip, timer, concat, throwError, type Observable, lastValueFrom } from 'rxjs'
import { map, tap, retry, mergeMap } from 'rxjs/operators'
import { isErrorLike, createAggregateError, logger } from '@sourcegraph/common'
import {
@ -102,7 +102,7 @@ export async function waitForRepos(
ensureRepos: string[],
options: WaitForRepoOptions = {}
): Promise<void> {
await zip(...ensureRepos.map(repoName => waitForRepo(gqlClient, repoName, options))).toPromise()
await lastValueFrom(zip(...ensureRepos.map(repoName => waitForRepo(gqlClient, repoName, options))))
}
export function waitForRepo(
@ -147,29 +147,26 @@ export function waitForRepo(
}
throw new Error('Repo exists')
}),
retryWhen(errors =>
concat(
errors.pipe(
delayWhen((error, retryCount) => {
if (isErrorLike(error) && error.message === 'Repo exists') {
// Delay retry by 2s.
if (logStatusMessages) {
logger.log(
`Waiting for ${repoName} to be removed (attempt ${
retryCount + 1
} of ${numberRetries})`
)
}
return timer(retryPeriod)
retry({
delay: (error, retryCount) => {
if (retryCount <= numberRetries) {
if (isErrorLike(error) && error.message === 'Repo exists') {
// Delay retry by 2s.
if (logStatusMessages) {
logger.log(
`Waiting for ${repoName} to be removed (attempt ${
retryCount + 1
} of ${numberRetries})`
)
}
// Throw all errors
throw error
}),
take(numberRetries)
),
defer(() => throwError(new Error(`Could not resolve repo ${repoName}: too many retries`)))
)
)
return timer(retryPeriod)
}
// Throw all errors
return throwError(() => error)
}
return throwError(() => new Error(`Could not resolve repo ${repoName}: too many retries`))
},
})
)
: request.pipe(
map(dataOrThrowErrors),
@ -185,29 +182,26 @@ export function waitForRepo(
throw new CloneInProgressError(repoName)
}
}),
retryWhen(errors =>
concat(
errors.pipe(
delayWhen((error, retryCount) => {
if (isCloneInProgressErrorLike(error)) {
// Delay retry by 2s.
if (logStatusMessages) {
logger.log(
`Waiting for ${repoName} to finish cloning (attempt ${
retryCount + 1
} of ${numberRetries})`
)
}
return timer(retryPeriod)
retry({
delay: (error, retryCount) => {
if (retryCount <= numberRetries) {
if (isCloneInProgressErrorLike(error)) {
// Delay retry by 2s.
if (logStatusMessages) {
logger.log(
`Waiting for ${repoName} to finish cloning (attempt ${
retryCount + 1
} of ${numberRetries})`
)
}
// Throw all errors other than ECLONEINPROGRESS
throw error
}),
take(numberRetries)
),
defer(() => throwError(new Error(`Could not resolve repo ${repoName}: too many retries`)))
)
),
return timer(retryPeriod)
}
// Throw all errors other than ECLONEINPROGRESS
return throwError(() => error)
}
return throwError(() => new Error(`Could not resolve repo ${repoName}: too many retries`))
},
}),
map(() => undefined)
)
}
@ -235,8 +229,8 @@ export async function ensureNoTestExternalServices(
}
for (const externalService of externalServices) {
await gqlClient
.mutateGraphQL<DeleteExternalServiceResult, DeleteExternalServiceVariables>(
await lastValueFrom(
gqlClient.mutateGraphQL<DeleteExternalServiceResult, DeleteExternalServiceVariables>(
gql`
mutation DeleteExternalService($externalService: ID!) {
deleteExternalService(externalService: $externalService) {
@ -246,7 +240,7 @@ export async function ensureNoTestExternalServices(
`,
{ externalService: externalService.id }
)
.toPromise()
)
}
}
@ -302,26 +296,27 @@ export async function updateExternalService(
gqlClient: GraphQLClient,
input: UpdateExternalServiceInput
): Promise<void> {
await gqlClient
.mutateGraphQL<UpdateExternalServiceResult, UpdateExternalServiceVariables>(
gql`
mutation UpdateExternalServiceRegression($input: UpdateExternalServiceInput!) {
updateExternalService(input: $input) {
warning
await lastValueFrom(
gqlClient
.mutateGraphQL<UpdateExternalServiceResult, UpdateExternalServiceVariables>(
gql`
mutation UpdateExternalServiceRegression($input: UpdateExternalServiceInput!) {
updateExternalService(input: $input) {
warning
}
}
}
`,
{ input }
)
.pipe(
map(dataOrThrowErrors),
tap(({ updateExternalService: { warning } }) => {
if (warning) {
logger.warn('updateExternalService warning:', warning)
}
})
)
.toPromise()
`,
{ input }
)
.pipe(
map(dataOrThrowErrors),
tap(({ updateExternalService: { warning } }) => {
if (warning) {
logger.warn('updateExternalService warning:', warning)
}
})
)
)
}
export async function ensureTestExternalService(
@ -356,7 +351,7 @@ export async function ensureTestExternalService(
displayName: externalServiceOptions.uniqueDisplayName,
config: JSON.stringify(externalServiceOptions.config),
}
await addExternalService(input, gqlClient).toPromise()
await lastValueFrom(addExternalService(input, gqlClient))
if (externalServiceOptions.waitForRepos && externalServiceOptions.waitForRepos.length > 0) {
await waitForRepos(gqlClient, externalServiceOptions.waitForRepos, waitForReposOptions)
@ -393,17 +388,19 @@ export async function deleteUser(
}
}
await requestGraphQL<DeleteUserResult, DeleteUserVariables>({
request: gql`
mutation DeleteUser($user: ID!, $hard: Boolean) {
deleteUser(user: $user, hard: $hard) {
alwaysNil
await lastValueFrom(
requestGraphQL<DeleteUserResult, DeleteUserVariables>({
request: gql`
mutation DeleteUser($user: ID!, $hard: Boolean) {
deleteUser(user: $user, hard: $hard) {
alwaysNil
}
}
}
`,
variables: { hard: false, user: user.id },
mightContainPrivateInfo: false,
}).toPromise()
`,
variables: { hard: false, user: user.id },
mightContainPrivateInfo: false,
})
)
}
/**
@ -415,8 +412,8 @@ export async function setUserSiteAdmin(
userID: Scalars['ID'],
siteAdmin: boolean
): Promise<void> {
await gqlClient
.mutateGraphQL<SetUserIsSiteAdminResult, SetUserIsSiteAdminVariables>(
await lastValueFrom(
gqlClient.mutateGraphQL<SetUserIsSiteAdminResult, SetUserIsSiteAdminVariables>(
gql`
mutation SetUserIsSiteAdmin($userID: ID!, $siteAdmin: Boolean!) {
setUserIsSiteAdmin(userID: $userID, siteAdmin: $siteAdmin) {
@ -426,12 +423,12 @@ export async function setUserSiteAdmin(
`,
{ userID, siteAdmin }
)
.toPromise()
)
}
export async function setTosAccepted(gqlClient: GraphQLClient, userID: Scalars['ID']): Promise<void> {
await gqlClient
.mutateGraphQL<SetTosAcceptedResult, SetTosAcceptedVariables>(
await lastValueFrom(
gqlClient.mutateGraphQL<SetTosAcceptedResult, SetTosAcceptedVariables>(
gql`
mutation SetTosAccepted($userID: ID!) {
setTosAccepted(userID: $userID) {
@ -441,7 +438,7 @@ export async function setTosAccepted(gqlClient: GraphQLClient, userID: Scalars['
`,
{ userID }
)
.toPromise()
)
}
/**
@ -494,24 +491,26 @@ export function getViewerSettings({
export function deleteOrganization(
{ requestGraphQL }: Pick<PlatformContext, 'requestGraphQL'>,
organization: Scalars['ID']
): Observable<void> {
return requestGraphQL<DeleteOrganizationResult, DeleteOrganizationVariables>({
request: gql`
mutation DeleteOrganization($organization: ID!) {
deleteOrganization(organization: $organization) {
alwaysNil
): Promise<void> {
return lastValueFrom(
requestGraphQL<DeleteOrganizationResult, DeleteOrganizationVariables>({
request: gql`
mutation DeleteOrganization($organization: ID!) {
deleteOrganization(organization: $organization) {
alwaysNil
}
}
}
`,
variables: { organization },
mightContainPrivateInfo: true,
}).pipe(
map(dataOrThrowErrors),
map(data => {
if (!data.deleteOrganization) {
throw createInvalidGraphQLMutationResponseError('DeleteOrganization')
}
})
`,
variables: { organization },
mightContainPrivateInfo: true,
}).pipe(
map(dataOrThrowErrors),
map(data => {
if (!data.deleteOrganization) {
throw createInvalidGraphQLMutationResponseError('DeleteOrganization')
}
})
)
)
}
@ -612,20 +611,22 @@ export function createUser(
{ requestGraphQL }: Pick<PlatformContext, 'requestGraphQL'>,
username: string,
email: string | null
): Observable<CreateUserResult['createUser']> {
return requestGraphQL<CreateUserResult, CreateUserVariables>({
request: gql`
mutation CreateUser($username: String!, $email: String) {
createUser(username: $username, email: $email) {
resetPasswordURL
): Promise<CreateUserResult['createUser']> {
return lastValueFrom(
requestGraphQL<CreateUserResult, CreateUserVariables>({
request: gql`
mutation CreateUser($username: String!, $email: String) {
createUser(username: $username, email: $email) {
resetPasswordURL
}
}
}
`,
variables: { username, email },
mightContainPrivateInfo: true,
}).pipe(
map(dataOrThrowErrors),
map(data => data.createUser)
`,
variables: { username, email },
mightContainPrivateInfo: true,
}).pipe(
map(dataOrThrowErrors),
map(data => data.createUser)
)
)
}
@ -881,17 +882,19 @@ export function updateSiteConfiguration(
{ requestGraphQL }: Pick<PlatformContext, 'requestGraphQL'>,
lastID: number,
input: string
): Observable<boolean> {
return requestGraphQL<UpdateSiteConfigurationResult, UpdateSiteConfigurationVariables>({
request: gql`
mutation UpdateSiteConfiguration($lastID: Int!, $input: String!) {
updateSiteConfiguration(lastID: $lastID, input: $input)
}
`,
variables: { lastID, input },
mightContainPrivateInfo: true,
}).pipe(
map(dataOrThrowErrors),
map(data => data.updateSiteConfiguration)
): Promise<boolean> {
return lastValueFrom(
requestGraphQL<UpdateSiteConfigurationResult, UpdateSiteConfigurationVariables>({
request: gql`
mutation UpdateSiteConfiguration($lastID: Int!, $input: String!) {
updateSiteConfiguration(lastID: $lastID, input: $input)
}
`,
variables: { lastID, input },
mightContainPrivateInfo: true,
}).pipe(
map(dataOrThrowErrors),
map(data => data.updateSiteConfiguration)
)
)
}

View File

@ -89,31 +89,35 @@ async function createTestUser(
{ username, testUserPassword }: { username: string } & Pick<Config, 'testUserPassword'>
): Promise<void> {
// If there's an error, try to create the user
const passwordResetURL = await gqlClient
.mutateGraphQL<CreateUserResult, CreateUserVariables>(
gql`
mutation CreateUser($username: String!, $email: String) {
createUser(username: $username, email: $email) {
resetPasswordURL
const passwordResetURL = await lastValueFrom(
gqlClient
.mutateGraphQL<CreateUserResult, CreateUserVariables>(
gql`
mutation CreateUser($username: String!, $email: String) {
createUser(username: $username, email: $email) {
resetPasswordURL
}
}
}
`,
{ username, email: null }
)
.pipe(
map(dataOrThrowErrors),
catchError(error =>
throwError(
new Error(
`Could not create user ${JSON.stringify(
username
)} (you may need to update the sudo access token used by the test): ${asError(error).message})`
`,
{ username, email: null }
)
.pipe(
map(dataOrThrowErrors),
catchError(error =>
throwError(
() =>
new Error(
`Could not create user ${JSON.stringify(
username
)} (you may need to update the sudo access token used by the test): ${
asError(error).message
})`
)
)
)
),
map(({ createUser }) => createUser.resetPasswordURL)
)
.toPromise()
),
map(({ createUser }) => createUser.resetPasswordURL)
)
)
if (!passwordResetURL) {
throw new Error('passwordResetURL was empty')
}
@ -174,7 +178,7 @@ export async function ensureNewUser(
throw error
}
}
await createUser({ requestGraphQL }, username, email).toPromise()
await createUser({ requestGraphQL }, username, email)
return () => deleteUser({ requestGraphQL }, username, true)
}
@ -192,11 +196,11 @@ export async function ensureNewOrganization(
throw new Error(`More than one organization name exists with name ${variables.name}`)
}
if (matchingOrgs.length === 1) {
await deleteOrganization({ requestGraphQL }, matchingOrgs[0].id).toPromise()
await deleteOrganization({ requestGraphQL }, matchingOrgs[0].id)
}
const createdOrg = await lastValueFrom(createOrganization({ requestGraphQL }, variables))
return {
destroy: () => deleteOrganization({ requestGraphQL }, createdOrg.id).toPromise(),
destroy: () => deleteOrganization({ requestGraphQL }, createdOrg.id),
result: createdOrg,
}
}
@ -245,14 +249,10 @@ export async function editSiteConfig(
newContents = jsonc.applyEdits(newContents, editFunc(newContents))
}
return {
result: await lastValueFrom(updateSiteConfiguration(gqlClient, origConfig.configuration.id, newContents)),
result: await updateSiteConfiguration(gqlClient, origConfig.configuration.id, newContents),
destroy: async () => {
const site = await lastValueFrom(fetchSiteConfiguration(gqlClient))
await updateSiteConfiguration(
gqlClient,
site.configuration.id,
origConfig.configuration.effectiveContents
).toPromise()
await updateSiteConfiguration(gqlClient, site.configuration.id, origConfig.configuration.effectiveContents)
},
}
}

View File

@ -7,7 +7,7 @@ import MapSearchIcon from 'mdi-react/MapSearchIcon'
import { createPortal } from 'react-dom'
import { Navigate, useLocation, useNavigate } from 'react-router-dom'
import type { Observable } from 'rxjs'
import { catchError, map, mapTo, startWith, switchMap } from 'rxjs/operators'
import { catchError, map, startWith, switchMap } from 'rxjs/operators'
import type { Optional } from 'utility-types'
import type { StreamingSearchResultsListProps } from '@sourcegraph/branded'
@ -239,7 +239,7 @@ export const BlobPage: React.FunctionComponent<BlobPageProps> = ({ className, co
useCallback(
(clicks: Observable<void>) =>
clicks.pipe(
mapTo(true),
map(() => true),
startWith(false),
switchMap(disableTimeout =>
fetchBlob({

View File

@ -1,7 +1,7 @@
import { Facet } from '@codemirror/state'
import { EditorView } from '@codemirror/view'
import { concat, from, of } from 'rxjs'
import { timeoutWith } from 'rxjs/operators'
import { from, merge, timer } from 'rxjs'
import { map, takeWhile } from 'rxjs/operators'
import type { LineOrPositionOrRange } from '@sourcegraph/common'
@ -38,8 +38,13 @@ export const pinnedRange = Facet.define<{ from: number; to: number } | null, { f
showTooltip.computeN([self], state => {
const range = state.facet(self)
if (range) {
const tooltip$ = from(getHoverTooltip(state, range.from))
return [tooltip$.pipe(timeoutWith(50, concat(of(new LoadingTooltip(range.from, range.to)), tooltip$)))]
const loadingTooltip = new LoadingTooltip(range.from, range.to)
return [
// Show loading tooltip after 50ms if the hover tooltip is not yet available
merge(from(getHoverTooltip(state, range.from)), timer(50).pipe(map(() => loadingTooltip))).pipe(
takeWhile(tooltip => tooltip !== loadingTooltip, true)
),
]
}
return []
}),

View File

@ -9,8 +9,8 @@ import {
type TransactionSpec,
} from '@codemirror/state'
import { Decoration, EditorView, keymap } from '@codemirror/view'
import { concat, from, of } from 'rxjs'
import { timeoutWith } from 'rxjs/operators'
import { from, merge, timer } from 'rxjs'
import { map, takeWhile } from 'rxjs/operators'
import type { LineOrPositionOrRange } from '@sourcegraph/common'
@ -239,15 +239,14 @@ const selectedToken = StateField.define<{
return true
}
const tooltip$ = from(getHoverTooltip(view.state, selected.range.from))
const loadingTooltip = new LoadingTooltip(selected.range.from, selected.range.to)
showTokenTooltip(
view,
tooltip$.pipe(
timeoutWith(
50,
concat(of(new LoadingTooltip(selected.range.from, selected.range.to)), tooltip$)
)
)
// Show loading tooltip after 50ms, if the request is still pending
merge(
from(getHoverTooltip(view.state, selected.range.from)),
timer(50).pipe(map(() => loadingTooltip))
).pipe(takeWhile(tooltip => tooltip !== loadingTooltip, true))
)
return true
},

View File

@ -5,7 +5,7 @@ import { VisuallyHidden } from '@reach/visually-hidden'
import classNames from 'classnames'
import { useLocation } from 'react-router-dom'
import { Subject, Subscription } from 'rxjs'
import { catchError, mapTo, switchMap } from 'rxjs/operators'
import { catchError, map, switchMap } from 'rxjs/operators'
import { useCallbackRef } from 'use-callback-ref'
import { logger } from '@sourcegraph/common'
@ -58,7 +58,7 @@ class SavedSearchNode extends React.PureComponent<NodeProps, NodeState> {
.pipe(
switchMap(search =>
deleteSavedSearch(search.id).pipe(
mapTo(undefined),
map(() => undefined),
catchError(error => {
logger.error(error)
return []

View File

@ -2,17 +2,7 @@ import React, { type FC } from 'react'
import { useParams } from 'react-router-dom'
import { concat, of, Subject, Subscription } from 'rxjs'
import {
catchError,
delay,
distinctUntilChanged,
map,
mapTo,
mergeMap,
startWith,
switchMap,
tap,
} from 'rxjs/operators'
import { catchError, delay, distinctUntilChanged, map, mergeMap, startWith, switchMap, tap } from 'rxjs/operators'
import { asError, type ErrorLike, isErrorLike } from '@sourcegraph/common'
import { Alert, LoadingSpinner } from '@sourcegraph/wildcard'
@ -89,7 +79,7 @@ class InnerSavedSearchUpdateForm extends React.Component<Props, State> {
this.props.namespace.__typename === 'User' ? this.props.namespace.id : null,
this.props.namespace.__typename === 'Org' ? this.props.namespace.id : null
).pipe(
mapTo(null),
map(() => null),
tap(() => eventLogger.log('SavedSearchUpdated')),
mergeMap(() =>
concat(

View File

@ -1,5 +1,5 @@
import { type Location, createPath } from 'react-router-dom'
import { Subscription, Subject } from 'rxjs'
import { Subscription, Subject, lastValueFrom } from 'rxjs'
import { tap, last } from 'rxjs/operators'
import { afterEach, beforeEach, describe, expect, it, test } from 'vitest'
@ -198,11 +198,11 @@ describe('updateQueryStateFromURL', () => {
const { wait, done } = createBarrier()
const [locationSubject, location] = createHistoryObservable('q=context:me+test')
getQueryStateFromLocation({
location: locationSubject,
isSearchContextAvailable,
})
.pipe(
lastValueFrom(
getQueryStateFromLocation({
location: locationSubject,
isSearchContextAvailable,
}).pipe(
last(),
tap(({ searchContextSpec, query }) => {
expect(searchContextSpec?.spec).toEqual('me')
@ -210,8 +210,7 @@ describe('updateQueryStateFromURL', () => {
done()
})
)
.toPromise()
.catch(logger.error)
).catch(logger.error)
locationSubject.next(location)
locationSubject.complete()

View File

@ -5,7 +5,7 @@ import '@sourcegraph/shared/src/testing/mockReactVisibilitySensor'
import { cleanup, fireEvent, render, screen } from '@testing-library/react'
import userEvent from '@testing-library/user-event'
import { BrowserRouter } from 'react-router-dom'
import { EMPTY, NEVER, of } from 'rxjs'
import { EMPTY, lastValueFrom, NEVER, of } from 'rxjs'
import { spy, assert } from 'sinon'
import { beforeEach, describe, expect, it, vi } from 'vitest'
@ -98,7 +98,7 @@ describe('StreamingSearchResults', () => {
assert.calledOnce(searchSpy)
const call = searchSpy.getCall(0)
// We have to extract the query from the observable since we can't directly compare observables
const receivedQuery = await call.args[0].toPromise()
const receivedQuery = await lastValueFrom(call.args[0])
const receivedOptions = call.args[1]
expect(receivedQuery).toEqual('r:golang/oauth2 test f:travis')

View File

@ -1,4 +1,4 @@
import { of } from 'rxjs'
import { of, lastValueFrom } from 'rxjs'
import {
type ContentMatch,
@ -249,9 +249,7 @@ export const downloadSearchResults = (
})
: of(results)
// Once we update to RxJS 7, we need to change `toPromise` to `lastValueFrom`.
// See https://rxjs.dev/deprecations/to-promise
return resultsObservable.toPromise().then(results => {
return lastValueFrom(resultsObservable, { defaultValue: undefined }).then(results => {
if (results?.state === 'error') {
const error = results.progress.skipped.find(skipped => skipped.reason === 'error')
if (error) {

View File

@ -2,7 +2,8 @@ import React, { useCallback, useState } from 'react'
import classNames from 'classnames'
import { parseISO } from 'date-fns'
import { map, mapTo } from 'rxjs/operators'
import { lastValueFrom } from 'rxjs'
import { map } from 'rxjs/operators'
import { Timestamp } from '@sourcegraph/branded/src/components/Timestamp'
import { asError, isErrorLike } from '@sourcegraph/common'
@ -41,18 +42,21 @@ export const accessTokenFragment = gql`
`
function deleteAccessToken(tokenID: Scalars['ID']): Promise<void> {
return requestGraphQL<DeleteAccessTokenResult, DeleteAccessTokenVariables>(
gql`
mutation DeleteAccessToken($tokenID: ID!) {
deleteAccessToken(byID: $tokenID) {
alwaysNil
return lastValueFrom(
requestGraphQL<DeleteAccessTokenResult, DeleteAccessTokenVariables>(
gql`
mutation DeleteAccessToken($tokenID: ID!) {
deleteAccessToken(byID: $tokenID) {
alwaysNil
}
}
}
`,
{ tokenID }
`,
{ tokenID }
).pipe(
map(dataOrThrowErrors),
map(() => undefined)
)
)
.pipe(map(dataOrThrowErrors), mapTo(undefined))
.toPromise()
}
export interface AccessTokenNodeProps {

View File

@ -4,8 +4,8 @@ import type { FC } from 'react'
import { type ApolloClient, useApolloClient } from '@apollo/client'
import classNames from 'classnames'
import * as jsonc from 'jsonc-parser'
import { lastValueFrom, Subject, Subscription } from 'rxjs'
import { delay, mergeMap, retryWhen, tap, timeout } from 'rxjs/operators'
import { lastValueFrom, timer, Subject, Subscription } from 'rxjs'
import { delay, mergeMap, retry, tap, timeout } from 'rxjs/operators'
import { logger } from '@sourcegraph/common'
import type { SiteConfiguration } from '@sourcegraph/shared/src/schema/site.schema'
@ -276,12 +276,12 @@ class SiteAdminConfigurationContent extends React.Component<Props, State> {
mergeMap(() =>
// wait for server to restart
fetchSite().pipe(
retryWhen(errors =>
errors.pipe(
tap(() => this.forceUpdate()),
delay(500)
)
),
retry({
delay: () => {
this.forceUpdate()
return timer(500)
},
}),
timeout(10000)
)
),

View File

@ -3,7 +3,7 @@ import React, { useCallback, useMemo } from 'react'
import { mdiAlertCircle, mdiAlert, mdiArrowLeftBold, mdiArrowRightBold } from '@mdi/js'
import classNames from 'classnames'
import { type Observable, of, timer } from 'rxjs'
import { catchError, concatMap, delay, map, repeatWhen, takeWhile } from 'rxjs/operators'
import { catchError, concatMap, map, repeat, takeWhile } from 'rxjs/operators'
import { parse as _parseVersion, type SemVer } from 'semver'
import { Timestamp } from '@sourcegraph/branded/src/components/Timestamp'
@ -93,7 +93,7 @@ export const SiteAdminMigrationsPage: React.FunctionComponent<
concatMap(() =>
fetchAllMigrations().pipe(
catchError((error): [ErrorLike] => [asError(error)]),
repeatWhen(observable => observable.pipe(delay(REFRESH_INTERVAL_MS)))
repeat({ delay: REFRESH_INTERVAL_MS })
)
),
takeWhile(() => true, true)

View File

@ -132,7 +132,6 @@ export function useUserListActions(onEnd: (error?: any) => void): UseUserListAct
([user]: SiteUser[]) => {
if (confirm('Are you sure you want to promote the selected user to site admin?')) {
setUserIsSiteAdmin(user.id, true)
.toPromise()
.then(
createOnSuccess(
<Text as="span">
@ -186,7 +185,6 @@ export function useUserListActions(onEnd: (error?: any) => void): UseUserListAct
([user]: SiteUser[]) => {
if (confirm('Are you sure you want to revoke the selected user from site admin?')) {
setUserIsSiteAdmin(user.id, false)
.toPromise()
.then(
createOnSuccess(
<Text as="span">

View File

@ -1,7 +1,7 @@
import type { QueryResult } from '@apollo/client'
import { parse as parseJSONC } from 'jsonc-parser'
import type { Observable } from 'rxjs'
import { map, mapTo, tap } from 'rxjs/operators'
import { lastValueFrom, type Observable } from 'rxjs'
import { map, tap } from 'rxjs/operators'
import { resetAllMemoizationCaches } from '@sourcegraph/common'
import { createInvalidGraphQLMutationResponseError, dataOrThrowErrors, gql, useQuery } from '@sourcegraph/http-client'
@ -317,20 +317,22 @@ export const CHECK_MIRROR_REPOSITORY_CONNECTION = gql`
}
`
export function scheduleRepositoryPermissionsSync(args: { repository: Scalars['ID'] }): Observable<void> {
return requestGraphQL<ScheduleRepositoryPermissionsSyncResult, ScheduleRepositoryPermissionsSyncVariables>(
gql`
mutation ScheduleRepositoryPermissionsSync($repository: ID!) {
scheduleRepositoryPermissionsSync(repository: $repository) {
alwaysNil
export function scheduleRepositoryPermissionsSync(args: { repository: Scalars['ID'] }): Promise<void> {
return lastValueFrom(
requestGraphQL<ScheduleRepositoryPermissionsSyncResult, ScheduleRepositoryPermissionsSyncVariables>(
gql`
mutation ScheduleRepositoryPermissionsSync($repository: ID!) {
scheduleRepositoryPermissionsSync(repository: $repository) {
alwaysNil
}
}
}
`,
args
).pipe(
map(dataOrThrowErrors),
tap(() => resetAllMemoizationCaches()),
mapTo(undefined)
`,
args
).pipe(
map(dataOrThrowErrors),
tap(() => resetAllMemoizationCaches()),
map(() => undefined)
)
)
}
@ -520,19 +522,21 @@ export function reloadSite(): Observable<void> {
)
}
export function setUserIsSiteAdmin(userID: Scalars['ID'], siteAdmin: boolean): Observable<void> {
return requestGraphQL<SetUserIsSiteAdminResult, SetUserIsSiteAdminVariables>(
gql`
mutation SetUserIsSiteAdmin($userID: ID!, $siteAdmin: Boolean!) {
setUserIsSiteAdmin(userID: $userID, siteAdmin: $siteAdmin) {
alwaysNil
export function setUserIsSiteAdmin(userID: Scalars['ID'], siteAdmin: boolean): Promise<void> {
return lastValueFrom(
requestGraphQL<SetUserIsSiteAdminResult, SetUserIsSiteAdminVariables>(
gql`
mutation SetUserIsSiteAdmin($userID: ID!, $siteAdmin: Boolean!) {
setUserIsSiteAdmin(userID: $userID, siteAdmin: $siteAdmin) {
alwaysNil
}
}
}
`,
{ userID, siteAdmin }
).pipe(
map(dataOrThrowErrors),
map(() => undefined)
`,
{ userID, siteAdmin }
).pipe(
map(dataOrThrowErrors),
map(() => undefined)
)
)
}
@ -572,17 +576,17 @@ export function createUser(username: string, email: string | undefined): Observa
}
export function deleteOrganization(organization: Scalars['ID']): Promise<void> {
return requestGraphQL<DeleteOrganizationResult, DeleteOrganizationVariables>(
gql`
mutation DeleteOrganization($organization: ID!) {
deleteOrganization(organization: $organization) {
alwaysNil
return lastValueFrom(
requestGraphQL<DeleteOrganizationResult, DeleteOrganizationVariables>(
gql`
mutation DeleteOrganization($organization: ID!) {
deleteOrganization(organization: $organization) {
alwaysNil
}
}
}
`,
{ organization }
)
.pipe(
`,
{ organization }
).pipe(
map(dataOrThrowErrors),
map(data => {
if (!data.deleteOrganization) {
@ -590,7 +594,7 @@ export function deleteOrganization(organization: Scalars['ID']): Promise<void> {
}
})
)
.toPromise()
)
}
export const SITE_UPDATE_CHECK = gql`

View File

@ -1,5 +1,5 @@
import { EMPTY, fromEvent, merge, type Observable } from 'rxjs'
import { catchError, map, publishReplay, refCount, take } from 'rxjs/operators'
import { EMPTY, fromEvent, merge, ReplaySubject, type Observable } from 'rxjs'
import { catchError, map, share, take } from 'rxjs/operators'
import * as uuid from 'uuid'
import { isErrorLike, isFirefox, logger } from '@sourcegraph/common'
@ -66,8 +66,12 @@ const browserExtensionMessageReceived: Observable<{ platform?: string; version?:
)
).pipe(
// Replay the same latest value for every subscriber
publishReplay(1),
refCount()
share({
connector: () => new ReplaySubject(1),
resetOnError: false,
resetOnComplete: false,
resetOnRefCountZero: false,
})
)
export class EventLogger implements TelemetryService, SharedEventLogger {

View File

@ -1,6 +1,7 @@
import React, { useCallback, useState } from 'react'
import { useNavigate } from 'react-router-dom'
import { lastValueFrom } from 'rxjs'
import { gql, useMutation } from '@sourcegraph/http-client'
import { Container, Button, Alert, Form } from '@sourcegraph/wildcard'
@ -45,9 +46,7 @@ export const EditUserProfileForm: React.FunctionComponent<React.PropsWithChildre
// In case the edited user is the current user, immediately reflect the changes in the
// UI.
// TODO: Migrate this to use the Apollo cache
refreshAuthenticatedUser()
.toPromise()
.finally(() => {})
lastValueFrom(refreshAuthenticatedUser(), { defaultValue: undefined }).finally(() => {})
},
onError: () => eventLogger.log('UpdateUserFailed'),
})