Skip to content

Commit

Permalink
feat: coldObservableProvider now stops after maxRetries (defaults to …
Browse files Browse the repository at this point in the history
…10) and logs errors
  • Loading branch information
AngelCastilloB committed Dec 5, 2024
1 parent 96e2136 commit a134885
Show file tree
Hide file tree
Showing 20 changed files with 133 additions and 18 deletions.
1 change: 1 addition & 0 deletions packages/tx-construction/src/tx-builder/TxBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ export class GenericTxBuilder implements TxBuilder {
}

const rewardAccounts$ = coldObservableProvider({
logger: contextLogger(this.#logger, 'getOrCreateRewardAccounts'),
pollUntil: (rewardAccounts) =>
allRewardAccounts.every((newAccount) => rewardAccounts.some((acct) => acct.address === newAccount)),
provider: this.#dependencies.txBuilderProviders.rewardAccounts,
Expand Down
3 changes: 2 additions & 1 deletion packages/util-rxjs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
"dependencies": {
"@cardano-sdk/util": "workspace:~",
"backoff-rxjs": "^7.0.0",
"rxjs": "^7.4.0"
"rxjs": "^7.4.0",
"ts-log": "^2.2.7"
},
"devDependencies": {
"@cardano-sdk/util-dev": "workspace:~",
Expand Down
30 changes: 26 additions & 4 deletions packages/util-rxjs/src/coldObservableProvider.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { InvalidStringError, strictEquals } from '@cardano-sdk/util';
import {
EMPTY,
NEVER,
Observable,
Subject,
catchError,
concat,
defer,
distinctUntilChanged,
Expand All @@ -14,6 +15,8 @@ import {
takeUntil,
throwError
} from 'rxjs';
import { InvalidStringError, strictEquals } from '@cardano-sdk/util';
import { Logger } from 'ts-log';
import { RetryBackoffConfig, retryBackoff } from 'backoff-rxjs';

export interface ColdObservableProviderProps<T> {
Expand All @@ -25,6 +28,7 @@ export interface ColdObservableProviderProps<T> {
combinator?: typeof switchMap;
cancel$?: Observable<unknown>;
pollUntil?: (v: T) => boolean;
logger: Logger;
}

export const coldObservableProvider = <T>({
Expand All @@ -35,7 +39,8 @@ export const coldObservableProvider = <T>({
equals = strictEquals,
combinator = switchMap,
cancel$ = NEVER,
pollUntil = () => true
pollUntil = () => true,
logger
}: ColdObservableProviderProps<T>) =>
new Observable<T>((subscriber) => {
const cancelOnFatalError$ = new Subject<boolean>();
Expand All @@ -48,7 +53,7 @@ export const coldObservableProvider = <T>({
mergeMap((v) =>
pollUntil(v)
? of(v)
: // emit value, but also throw error to force retryBackoff to kick in
: // Emit value, but also throw error to force retryBackoff to kick in
concat(
of(v),
throwError(() => new Error('polling'))
Expand All @@ -58,16 +63,33 @@ export const coldObservableProvider = <T>({
).pipe(
retryBackoff({
...retryBackoffConfig,
maxRetries: retryBackoffConfig.maxRetries ?? 10,
shouldRetry: (error) => {
if (retryBackoffConfig.shouldRetry && !retryBackoffConfig.shouldRetry(error)) return false;
logger.error(error);

if (retryBackoffConfig.shouldRetry) {
const shouldRetry = retryBackoffConfig.shouldRetry(error);
logger.debug(`Should retry: ${shouldRetry}`);

if (!shouldRetry) {
return false;
}
}

if (error instanceof InvalidStringError) {
onFatalError?.(error);
cancelOnFatalError$.next(true);
return false;
}

return true;
}
}),
catchError((error) => {
onFatalError?.(error);
cancelOnFatalError$.next(true);

return EMPTY;
})
)
),
Expand Down
49 changes: 47 additions & 2 deletions packages/util-rxjs/test/coldObservableProvider.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { BehaviorSubject, EmptyError, Subject, firstValueFrom, lastValueFrom, tap } from 'rxjs';
import { InvalidStringError } from '@cardano-sdk/util';
import { Logger } from 'ts-log';
import { RetryBackoffConfig, retryBackoff } from 'backoff-rxjs';
import { coldObservableProvider } from '../src';

Expand All @@ -8,11 +9,24 @@ jest.mock('backoff-rxjs', () => ({
retryBackoff: jest.fn().mockImplementation((...args) => jest.requireActual('backoff-rxjs').retryBackoff(...args))
}));

const createMockLogger = (): Logger => ({
debug: jest.fn(),
error: jest.fn(),
info: jest.fn(),
trace: jest.fn(),
warn: jest.fn()
});

describe('coldObservableProvider', () => {
it('returns an observable that calls underlying provider on each subscription and uses retryBackoff', async () => {
const underlyingProvider = jest.fn().mockResolvedValue(true);
const backoffConfig: RetryBackoffConfig = { initialInterval: 1 };
const provider$ = coldObservableProvider({ provider: underlyingProvider, retryBackoffConfig: backoffConfig });
const logger = createMockLogger();
const provider$ = coldObservableProvider({
logger,
provider: underlyingProvider,
retryBackoffConfig: backoffConfig
});
expect(await firstValueFrom(provider$)).toBe(true);
expect(await firstValueFrom(provider$)).toBe(true);
expect(underlyingProvider).toBeCalledTimes(2);
Expand All @@ -24,8 +38,10 @@ describe('coldObservableProvider', () => {
const underlyingProvider = () => firstValueFrom(fakeProviderSubject);
const backoffConfig: RetryBackoffConfig = { initialInterval: 1 };
const cancel$ = new BehaviorSubject<boolean>(true);
const logger = createMockLogger();
const provider$ = coldObservableProvider({
cancel$,
logger,
provider: underlyingProvider,
retryBackoffConfig: backoffConfig
});
Expand All @@ -39,9 +55,10 @@ describe('coldObservableProvider', () => {
});

it('retries using retryBackoff, when underlying provider rejects', async () => {
const logger = createMockLogger();
const underlyingProvider = jest.fn().mockRejectedValueOnce(false).mockResolvedValue(true);
const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1 };
const provider$ = coldObservableProvider({ provider: underlyingProvider, retryBackoffConfig });
const provider$ = coldObservableProvider({ logger, provider: underlyingProvider, retryBackoffConfig });
const resolvedValue = await firstValueFrom(provider$);
expect(underlyingProvider).toBeCalledTimes(2);
expect(resolvedValue).toBeTruthy();
Expand All @@ -50,6 +67,7 @@ describe('coldObservableProvider', () => {
it('does not retry, when underlying provider rejects with InvalidStringError', async () => {
const testValue = { test: 'value' };
const testError = new InvalidStringError('Test invalid string error');
const logger = createMockLogger();
const underlyingProvider = jest
.fn()
.mockRejectedValueOnce(new Error('Test error'))
Expand All @@ -59,6 +77,7 @@ describe('coldObservableProvider', () => {
const onFatalError = jest.fn();
const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1, shouldRetry: () => true };
const provider$ = coldObservableProvider({
logger,
onFatalError,
provider: underlyingProvider,
retryBackoffConfig
Expand All @@ -67,6 +86,7 @@ describe('coldObservableProvider', () => {
await expect(firstValueFrom(provider$)).rejects.toThrow(EmptyError);
expect(underlyingProvider).toBeCalledTimes(3);
expect(onFatalError).toBeCalledWith(testError);
expect(logger.error).toBeCalledWith(testError);
});

it('polls the provider until the pollUntil condition is satisfied', async () => {
Expand All @@ -77,8 +97,10 @@ describe('coldObservableProvider', () => {
.mockResolvedValueOnce('c')
.mockResolvedValue('Never reached');
const backoffConfig: RetryBackoffConfig = { initialInterval: 1 };
const logger = createMockLogger();

const provider$ = coldObservableProvider({
logger,
pollUntil: (v) => v === 'c',
provider: underlyingProvider,
retryBackoffConfig: backoffConfig
Expand All @@ -89,4 +111,27 @@ describe('coldObservableProvider', () => {
expect(providerValues).toEqual(['a', 'b', 'c']);
expect(underlyingProvider).toBeCalledTimes(3);
});

it('stops retrying after maxRetries attempts and handles the error in catchError', async () => {
const testError = new Error('Test error');
const underlyingProvider = jest.fn().mockRejectedValue(testError);
const maxRetries = 3;
const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1, maxRetries };
const onFatalError = jest.fn();
const logger = createMockLogger();

const provider$ = coldObservableProvider({
logger,
onFatalError,
provider: underlyingProvider,
retryBackoffConfig
});

await expect(firstValueFrom(provider$)).resolves.toBeUndefined();

expect(underlyingProvider).toBeCalledTimes(maxRetries + 1);
expect(onFatalError).toBeCalledWith(expect.any(Error));
expect(logger.error).toHaveBeenCalled();
expect(logger.error).toHaveBeenCalledWith(testError);
});
});
6 changes: 6 additions & 0 deletions packages/wallet/src/Wallets/BaseWallet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ export class BaseWallet implements ObservableWallet {
this.#addressTracker = createAddressTracker({
addressDiscovery$: coldObservableProvider({
cancel$,
logger: this.#logger,
onFatalError,
provider: () => {
const credManager = this.#publicCredentialsManager as Bip32PublicCredentialsManager;
Expand Down Expand Up @@ -403,6 +404,7 @@ export class BaseWallet implements ObservableWallet {
minPollInterval: pollInterval,
provider$: coldObservableProvider({
cancel$,
logger: this.#logger,
onFatalError,
provider: this.networkInfoProvider.ledgerTip,
retryBackoffConfig
Expand All @@ -426,6 +428,7 @@ export class BaseWallet implements ObservableWallet {
coldObservableProvider({
cancel$,
equals: deepEquals,
logger: this.#logger,
onFatalError,
provider: this.networkInfoProvider.eraSummaries,
retryBackoffConfig,
Expand All @@ -449,6 +452,7 @@ export class BaseWallet implements ObservableWallet {
coldObservableProvider({
cancel$,
equals: isEqual,
logger: this.#logger,
onFatalError,
provider: this.networkInfoProvider.protocolParameters,
retryBackoffConfig,
Expand All @@ -460,6 +464,7 @@ export class BaseWallet implements ObservableWallet {
coldObservableProvider({
cancel$,
equals: isEqual,
logger: this.#logger,
onFatalError,
provider: this.networkInfoProvider.genesisParameters,
retryBackoffConfig,
Expand Down Expand Up @@ -593,6 +598,7 @@ export class BaseWallet implements ObservableWallet {
coldObservableProvider({
cancel$,
equals: isEqual,
logger: this.#logger,
onFatalError,
provider: () => this.handleProvider.getPolicyIds(),
retryBackoffConfig
Expand Down
3 changes: 3 additions & 0 deletions packages/wallet/src/services/AssetsTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ export const createAssetService =
assetCache$: Observable<Assets>,
totalBalance$: Observable<Cardano.Value>,
retryBackoffConfig: RetryBackoffConfig,
logger: Logger,
onFatalError?: (value: unknown) => void,
maxAssetInfoCacheAge: Milliseconds = ONE_WEEK
// eslint-disable-next-line max-params
Expand All @@ -139,6 +140,7 @@ export const createAssetService =
concatAndCombineLatest(
chunk(assetIds, ASSET_INFO_FETCH_CHUNK_SIZE).map((assetIdsChunk) =>
coldObservableProvider({
logger,
onFatalError,
pollUntil: isEveryAssetInfoComplete,
provider: () =>
Expand Down Expand Up @@ -189,6 +191,7 @@ export const createAssetsTracker = (
assetsCache$,
total$,
retryBackoffConfig,
logger,
onFatalError,
maxAssetInfoCacheAge
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ export const createBlockEpochProvider =
(
chainHistoryProvider: ChainHistoryProvider,
retryBackoffConfig: RetryBackoffConfig,
logger: Logger,
onFatalError?: (value: unknown) => void
) =>
(ids: Cardano.BlockId[]) =>
coldObservableProvider({
logger,
onFatalError,
provider: () => chainHistoryProvider.blocksByHashes({ ids }),
retryBackoffConfig
Expand Down Expand Up @@ -131,6 +133,7 @@ export const createDelegationTracker = ({
stakePoolProvider,
stores.stakePools,
retryBackoffConfig,
logger,
onFatalError
),
rewardsHistoryProvider = createRewardsHistoryProvider(rewardsTracker, retryBackoffConfig),
Expand All @@ -139,6 +142,7 @@ export const createDelegationTracker = ({
transactionsTracker.outgoing.onChain$,
rewardsTracker,
retryBackoffConfig,
logger,
onFatalError
),
slotEpochCalc$ = eraSummaries$.pipe(map((eraSummaries) => createSlotEpochCalc(eraSummaries)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
tap
} from 'rxjs';
import { KeyValueStore } from '../../persistence';
import { Logger } from 'ts-log';
import { OutgoingOnChainTx, TxInFlight } from '../types';
import { PAGE_SIZE } from '../TransactionsTracker';
import { RetryBackoffConfig } from 'backoff-rxjs';
Expand Down Expand Up @@ -56,6 +57,7 @@ export const createQueryStakePoolsProvider =
stakePoolProvider: TrackedStakePoolProvider,
store: KeyValueStore<Cardano.PoolId, Cardano.StakePool>,
retryBackoffConfig: RetryBackoffConfig,
logger: Logger,
onFatalError?: (value: unknown) => void
) =>
(poolIds: Cardano.PoolId[]) => {
Expand All @@ -66,6 +68,7 @@ export const createQueryStakePoolsProvider =
return merge(
store.getValues(poolIds),
coldObservableProvider({
logger,
onFatalError,
provider: () => allStakePoolsByPoolIds(stakePoolProvider, { poolIds }),
retryBackoffConfig
Expand Down Expand Up @@ -109,13 +112,16 @@ export const createRewardsProvider =
txOnChain$: Observable<OutgoingOnChainTx>,
rewardsProvider: RewardsProvider,
retryBackoffConfig: RetryBackoffConfig,
logger: Logger,
onFatalError?: (value: unknown) => void
// eslint-disable-next-line max-params
) =>
(rewardAccounts: Cardano.RewardAccount[], equals = isEqual): Observable<Cardano.Lovelace[]> =>
combineLatest(
rewardAccounts.map((rewardAccount) =>
coldObservableProvider({
equals,
logger,
onFatalError,
provider: () => rewardsProvider.rewardAccountBalance({ rewardAccount }),
retryBackoffConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ export const createRewardsHistoryProvider =
(
rewardAccounts: Cardano.RewardAccount[],
lowerBound: Cardano.EpochNo | null,
logger: Logger,
onFatalError?: (value: unknown) => void
): Observable<Map<Cardano.RewardAccount, Reward[]>> => {
if (lowerBound) {
return coldObservableProvider({
logger,
onFatalError,
provider: () =>
rewardsProvider.rewardsHistory({
Expand Down Expand Up @@ -76,7 +78,7 @@ export const createRewardsHistoryTracker = (
firstDelegationEpoch$(transactions$, rewardAccounts).pipe(
tap((firstEpoch) => logger.debug(`Fetching history rewards since epoch ${firstEpoch}`)),
switchMap((firstEpoch) =>
rewardsHistoryProvider(rewardAccounts, Cardano.EpochNo(firstEpoch!), onFatalError)
rewardsHistoryProvider(rewardAccounts, Cardano.EpochNo(firstEpoch!), logger, onFatalError)
),
tap((allRewards) =>
rewardsHistoryStore.setAll([...allRewards.entries()].map(([key, value]) => ({ key, value })))
Expand Down
3 changes: 2 additions & 1 deletion packages/wallet/src/services/DrepInfoTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ type DrepInfoObservableProps = {

/** Use DRepProvider to fetch DRepInfos with retry backoff logic */
export const createDrepInfoColdObservable =
({ drepProvider, retryBackoffConfig, refetchTrigger$ }: DrepInfoObservableProps) =>
({ drepProvider, retryBackoffConfig, refetchTrigger$, logger }: DrepInfoObservableProps) =>
(drepIds: Cardano.DRepID[]) =>
coldObservableProvider<DRepInfo[]>({
logger,
provider: () => drepProvider.getDRepsInfo({ ids: drepIds }),
retryBackoffConfig,
trigger$: merge(of(true), refetchTrigger$)
Expand Down
Loading

0 comments on commit a134885

Please sign in to comment.