Skip to content

Commit

Permalink
fix: retry all ProviderErrors except BadRequest and NotImplemented
Browse files Browse the repository at this point in the history
BaseWallet was retrying all errors, which could potentially hide
bugs by retrying something that will never recover

BREAKING CHANGE: BaseWallet observables error instead of emitting fatalError$
- remove ObservableError.fatalError$
- 'poll' util observable errors instead of calling onFatalError
- remove PollProps.onFatalError
- 'poll' no longer checks for InvalidStringError, it's up to consumer
  • Loading branch information
mkazlauskas committed Dec 20, 2024
1 parent 9bad2df commit bf4a8b9
Show file tree
Hide file tree
Showing 21 changed files with 188 additions and 314 deletions.
100 changes: 38 additions & 62 deletions packages/util-rxjs/src/poll.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
import { InvalidStringError, strictEquals } from '@cardano-sdk/util';
import { Logger } from 'ts-log';
import {
NEVER,
Observable,
Subject,
catchError,
concat,
defer,
distinctUntilChanged,
from,
merge,
mergeMap,
of,
switchMap,
takeUntil,
throwError
} from 'rxjs';
import { RetryBackoffConfig, retryBackoff } from 'backoff-rxjs';
import { strictEquals } from '@cardano-sdk/util';

const POLL_UNTIL_RETRY = Symbol('POLL_UNTIL_RETRY');

export interface PollProps<T> {
sample: () => Promise<T>;
retryBackoffConfig: RetryBackoffConfig;
onFatalError?: (value: unknown) => void;
trigger$?: Observable<unknown>;
equals?: (t1: T, t2: T) => boolean;
combinator?: typeof switchMap;
Expand All @@ -33,71 +31,49 @@ export interface PollProps<T> {
export const poll = <T>({
sample,
retryBackoffConfig,
onFatalError,
trigger$ = of(true),
equals = strictEquals,
combinator = switchMap,
cancel$ = NEVER,
pollUntil = () => true,
logger
}: PollProps<T>) =>
new Observable<T>((subscriber) => {
const cancelOnFatalError$ = new Subject<boolean>();
const internalCancel$ = merge(cancel$, cancelOnFatalError$);
const sub = trigger$
.pipe(
combinator(() =>
defer(() =>
from(sample()).pipe(
mergeMap((v) =>
pollUntil(v)
? of(v)
: // Emit value, but also throw error to force retryBackoff to kick in
concat(
of(v),
throwError(() => new Error('polling'))
)
)
)
).pipe(
retryBackoff({
...retryBackoffConfig,
shouldRetry: (error) => {
logger.error(error);

if (retryBackoffConfig.shouldRetry) {
const shouldRetry = retryBackoffConfig.shouldRetry(error);
logger.debug(`Should retry: ${shouldRetry}`);

if (!shouldRetry) {
return false;
}
}
trigger$.pipe(
combinator(() =>
defer(() =>
from(sample()).pipe(
mergeMap((v) =>
pollUntil(v)
? of(v)
: // Emit value, but also throw error to force retryBackoff to kick in
concat(
of(v),
throwError(() => POLL_UNTIL_RETRY)
)
)
)
).pipe(
retryBackoff({
...retryBackoffConfig,
shouldRetry: (error) => {
if (error === POLL_UNTIL_RETRY) {
logger.warn('"pollUntil" condition not met, will retry');
return true;
}

if (error instanceof InvalidStringError) {
onFatalError?.(error);
cancelOnFatalError$.next(true);
return false;
}
logger.error(error);

return true;
}
}),
catchError((error) => {
onFatalError?.(error);
if (retryBackoffConfig.shouldRetry) {
const shouldRetry = retryBackoffConfig.shouldRetry(error);
logger.debug(`Should retry: ${shouldRetry}`);
return shouldRetry;
}

// Re-throw the error to propagate it to the subscriber and complete the observable
return throwError(() => error);
})
)
),
distinctUntilChanged(equals),
takeUntil(internalCancel$)
return true;
}
})
)
.subscribe(subscriber);

return () => {
sub.unsubscribe();
cancelOnFatalError$.complete();
};
});
),
distinctUntilChanged(equals),
takeUntil(cancel$)
);
17 changes: 7 additions & 10 deletions packages/util-rxjs/test/poll.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ describe('poll', () => {
expect(resolvedValue).toBeTruthy();
});

it('does not retry, when sample rejects with InvalidStringError', async () => {
it('does not retry, when shouldRetry returns false', async () => {
const testValue = { test: 'value' };
const testError = new InvalidStringError('Test invalid string error');
const sample = jest
Expand All @@ -69,23 +69,23 @@ describe('poll', () => {
.mockResolvedValueOnce(testValue)
.mockRejectedValueOnce(testError)
.mockResolvedValueOnce(testValue);
const onFatalError = jest.fn();
const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1, shouldRetry: () => true };
const retryBackoffConfig: RetryBackoffConfig = {
initialInterval: 1,
shouldRetry: (error) => !(error instanceof InvalidStringError)
};
const values$ = poll({
logger,
onFatalError,
retryBackoffConfig,
sample
});
await expect(firstValueFrom(values$)).resolves.toBe(testValue);
await expect(firstValueFrom(values$)).rejects.toThrow(EmptyError);
await expect(firstValueFrom(values$)).rejects.toThrow(testError);
expect(sample).toBeCalledTimes(3);
expect(onFatalError).toBeCalledWith(testError);
expect(logger.messages).toStrictEqual([
{ level: 'error', message: [new Error(testErrorStr)] },
{ level: 'debug', message: ['Should retry: true'] },
{ level: 'error', message: [testError] },
{ level: 'debug', message: ['Should retry: true'] }
{ level: 'debug', message: ['Should retry: false'] }
]);
});

Expand Down Expand Up @@ -116,19 +116,16 @@ describe('poll', () => {
const sample = jest.fn().mockRejectedValue(testError);
const maxRetries = 3;
const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1, maxRetries };
const onFatalError = jest.fn();

const values$ = poll({
logger,
onFatalError,
retryBackoffConfig,
sample
});

await expect(firstValueFrom(values$)).rejects.toThrow(testError);

expect(sample).toBeCalledTimes(maxRetries + 1);
expect(onFatalError).toBeCalledWith(expect.any(Error));
expect(logger.messages).toStrictEqual([
{ level: 'error', message: [testError] },
{ level: 'error', message: [testError] },
Expand Down
33 changes: 9 additions & 24 deletions packages/wallet/src/Wallets/BaseWallet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ import {
createUtxoTracker,
createWalletUtil,
currentEpochTracker,
distinctEraSummaries
distinctEraSummaries,
pollProvider
} from '../services';
import { AddressType, Bip32Account, GroupedAddress, WitnessedTx, Witnesser, util } from '@cardano-sdk/key-management';
import {
Expand All @@ -71,7 +72,7 @@ import {
TxSubmitProvider,
UtxoProvider
} from '@cardano-sdk/core';
import { BehaviorObservable, TrackerSubject, poll } from '@cardano-sdk/util-rxjs';
import { BehaviorObservable, TrackerSubject } from '@cardano-sdk/util-rxjs';
import {
BehaviorSubject,
EMPTY,
Expand Down Expand Up @@ -282,7 +283,6 @@ export class BaseWallet implements ObservableWallet {
readonly protocolParameters$: TrackerSubject<Cardano.ProtocolParameters>;
readonly genesisParameters$: TrackerSubject<Cardano.CompactGenesis>;
readonly assetInfo$: TrackerSubject<Assets>;
readonly fatalError$: Subject<unknown>;
readonly syncStatus: SyncStatus;
readonly name: string;
readonly util: WalletUtil;
Expand Down Expand Up @@ -357,10 +357,6 @@ export class BaseWallet implements ObservableWallet {

this.witnesser = witnesser;

this.fatalError$ = new Subject();

const onFatalError = this.fatalError$.next.bind(this.fatalError$);

this.name = name;
const cancel$ = connectionStatusTracker$.pipe(
tap((status) => (status === ConnectionStatus.up ? 'Connection UP' : 'Connection DOWN')),
Expand All @@ -369,10 +365,9 @@ export class BaseWallet implements ObservableWallet {

if (isBip32PublicCredentialsManager(this.#publicCredentialsManager)) {
this.#addressTracker = createAddressTracker({
addressDiscovery$: poll({
addressDiscovery$: pollProvider({
cancel$,
logger: contextLogger(this.#logger, 'addressDiscovery$'),
onFatalError,
retryBackoffConfig,
sample: () => {
const credManager = this.#publicCredentialsManager as Bip32PublicCredentialsManager;
Expand Down Expand Up @@ -403,10 +398,9 @@ export class BaseWallet implements ObservableWallet {
logger: contextLogger(this.#logger, 'tip$'),
maxPollInterval: maxInterval,
minPollInterval: pollInterval,
provider$: poll({
provider$: pollProvider({
cancel$,
logger: contextLogger(this.#logger, 'tip$'),
onFatalError,
retryBackoffConfig,
sample: this.networkInfoProvider.ledgerTip
}),
Expand All @@ -426,11 +420,10 @@ export class BaseWallet implements ObservableWallet {
// Era summaries
const eraSummariesTrigger = new BehaviorSubject<void>(void 0);
this.eraSummaries$ = new PersistentDocumentTrackerSubject(
poll({
pollProvider({
cancel$,
equals: deepEquals,
logger: contextLogger(this.#logger, 'eraSummaries$'),
onFatalError,
retryBackoffConfig,
sample: this.networkInfoProvider.eraSummaries,
trigger$: eraSummariesTrigger.pipe(tap(() => 'Trigger request era summaries'))
Expand All @@ -450,23 +443,21 @@ export class BaseWallet implements ObservableWallet {
tap((epoch) => this.#logger.debug(`Current epoch is ${epoch}`))
);
this.protocolParameters$ = new PersistentDocumentTrackerSubject(
poll({
pollProvider({
cancel$,
equals: isEqual,
logger: contextLogger(this.#logger, 'protocolParameters$'),
onFatalError,
retryBackoffConfig,
sample: this.networkInfoProvider.protocolParameters,
trigger$: epoch$
}),
stores.protocolParameters
);
this.genesisParameters$ = new PersistentDocumentTrackerSubject(
poll({
pollProvider({
cancel$,
equals: isEqual,
logger: contextLogger(this.#logger, 'genesisParameters$'),
onFatalError,
retryBackoffConfig,
sample: this.networkInfoProvider.genesisParameters,
trigger$: epoch$
Expand All @@ -487,7 +478,6 @@ export class BaseWallet implements ObservableWallet {
inFlightTransactionsStore: stores.inFlightTransactions,
logger: contextLogger(this.#logger, 'transactions'),
newTransactions: this.#newTransactions,
onFatalError,
retryBackoffConfig,
signedTransactionsStore: stores.signedTransactions,
tip$: this.tip$,
Expand Down Expand Up @@ -521,7 +511,6 @@ export class BaseWallet implements ObservableWallet {
addresses$,
history$: this.transactions.history$,
logger: contextLogger(this.#logger, 'utxo'),
onFatalError,
retryBackoffConfig,
stores,
transactionsInFlight$: this.transactions.outgoing.inFlight$,
Expand All @@ -546,7 +535,6 @@ export class BaseWallet implements ObservableWallet {
eraSummaries$,
knownAddresses$: this.addresses$,
logger: contextLogger(this.#logger, 'delegation'),
onFatalError,
retryBackoffConfig,
rewardAccountAddresses$: this.addresses$.pipe(
map((addresses) => uniq(addresses.map((groupedAddress) => groupedAddress.rewardAccount)))
Expand Down Expand Up @@ -592,7 +580,6 @@ export class BaseWallet implements ObservableWallet {
balanceTracker: this.balance,
logger: contextLogger(this.#logger, 'assets$'),
maxAssetInfoCacheAge,
onFatalError,
retryBackoffConfig,
transactionsTracker: this.transactions
}),
Expand All @@ -602,11 +589,10 @@ export class BaseWallet implements ObservableWallet {
this.handles$ = this.handleProvider
? this.initializeHandles(
new PersistentDocumentTrackerSubject(
poll({
pollProvider({
cancel$,
equals: isEqual,
logger: contextLogger(this.#logger, 'handles$'),
onFatalError,
retryBackoffConfig,
sample: () => this.handleProvider.getPolicyIds()
}),
Expand Down Expand Up @@ -798,7 +784,6 @@ export class BaseWallet implements ObservableWallet {
this.currentEpoch$.complete();
this.delegation.shutdown();
this.assetInfo$.complete();
this.fatalError$.complete();
this.syncStatus.shutdown();
this.#newTransactions.failedToSubmit$.complete();
this.#newTransactions.pending$.complete();
Expand Down
Loading

0 comments on commit bf4a8b9

Please sign in to comment.