Skip to content

Commit

Permalink
feat: asset tracker now uses local cache before fetching asset metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
AngelCastilloB committed Dec 2, 2024
1 parent 1db47b9 commit 7056add
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 35 deletions.
1 change: 1 addition & 0 deletions packages/core/src/Asset/types/AssetInfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ export interface AssetInfo {
tokenMetadata?: TokenMetadata | null;
/** CIP-0025. `undefined` if not loaded, `null` if no metadata found */
nftMetadata?: NftMetadata | null;
lastFetchedAt?: Date | null;
}
11 changes: 11 additions & 0 deletions packages/wallet/src/Wallets/BaseWallet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
DelegationTracker,
DynamicChangeAddressResolver,
FailedTx,
Milliseconds,
OutgoingTx,
PersistentDocumentTrackerSubject,
PollingConfig,
Expand Down Expand Up @@ -76,6 +77,8 @@ import {
Subject,
Subscription,
catchError,
defaultIfEmpty,
defer,
distinctUntilChanged,
filter,
firstValueFrom,
Expand Down Expand Up @@ -113,6 +116,7 @@ export interface BaseWalletProps {
readonly name: string;
readonly polling?: PollingConfig;
readonly retryBackoffConfig?: RetryBackoffConfig;
readonly maxAssetInfoCacheAge?: Milliseconds;
}

export enum PublicCredentialsManagerType {
Expand Down Expand Up @@ -289,6 +293,7 @@ export class BaseWallet implements ObservableWallet {
constructor(
{
name,
maxAssetInfoCacheAge,
polling: {
interval: pollInterval = DEFAULT_POLLING_CONFIG.pollInterval,
maxInterval = pollInterval * DEFAULT_POLLING_CONFIG.maxIntervalMultiplier,
Expand Down Expand Up @@ -547,10 +552,16 @@ export class BaseWallet implements ObservableWallet {
: new TrackerSubject(of(new Array<PubStakeKeyAndStatus>()));

this.balance = createBalanceTracker(this.protocolParameters$, this.utxo, this.delegation);

const assetsCache$ = defer(() => stores.assets.get().pipe(defaultIfEmpty(new Map())));

this.assetInfo$ = new PersistentDocumentTrackerSubject(
createAssetsTracker({
assetProvider: this.assetProvider,
assetsCache$,
balanceTracker: this.balance,
logger: contextLogger(this.#logger, 'assets$'),
maxAssetInfoCacheAge,
onFatalError,
retryBackoffConfig,
transactionsTracker: this.transactions
Expand Down
144 changes: 126 additions & 18 deletions packages/wallet/src/services/AssetsTracker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { Asset, Cardano } from '@cardano-sdk/core';
import { Assets } from '../types';
import { BalanceTracker, Milliseconds, TransactionsTracker } from './types';
import { Logger } from 'ts-log';
import {
Observable,
Expand All @@ -17,7 +19,6 @@ import {
} from 'rxjs';
import { RetryBackoffConfig } from 'backoff-rxjs';
import { TrackedAssetProvider } from './ProviderTracker';
import { TransactionsTracker } from './types';
import { coldObservableProvider, concatAndCombineLatest } from '@cardano-sdk/util-rxjs';
import { deepEquals, isNotNil } from '@cardano-sdk/util';
import { newTransactions$ } from './TransactionsTracker';
Expand All @@ -35,36 +36,123 @@ const bufferTick =
source$.pipe(connect((shared$) => shared$.pipe(buffer(shared$.pipe(debounceTime(1))))));

const ASSET_INFO_FETCH_CHUNK_SIZE = 100;
const ONE_WEEK = 7 * 24 * 60 * 60 * 1000;

const isInBalance = (assetId: Cardano.AssetId, balance: Cardano.Value): boolean =>
balance.assets?.has(assetId) ?? false;

/**
* Splits a list of asset IDs into cached and uncached groups based on their presence in the cache,
* their freshness, and their balance status:
*
* 1. Assets not in Balance:
* - Always use the cached version if present in the cache, ignoring freshness.
* 2. Assets in Balance:
* - Use the cached version only if it exists and its `lastFetchedAt` timestamp is within the past week.
* 3. Uncached Assets:
* - If an asset is not in the cache or does not meet the above criteria, mark it as uncached.
*/
const splitCachedAndUncachedAssets = (
cache: Assets,
balance: Cardano.Value,
assetIds: Cardano.AssetId[],
maxAssetInfoCacheAge: Milliseconds
): { cachedAssets: Assets; uncachedAssetIds: Cardano.AssetId[] } => {
const cachedAssets: Assets = new Map();
const uncachedAssetIds: Cardano.AssetId[] = [];
const oneWeekAgo = new Date(Date.now() - maxAssetInfoCacheAge);

for (const id of assetIds) {
const cachedAssetInfo = cache.get(id);

if (cachedAssetInfo) {
const { lastFetchedAt } = cachedAssetInfo;

if (!isInBalance(id, balance)) {
// If the asset is NOT in balance, always use the cached version if present in cache
cachedAssets.set(id, cachedAssetInfo);
} else if (lastFetchedAt && new Date(lastFetchedAt) >= oneWeekAgo) {
// If the asset is in balance, use the cache only if freshness is within the last week
cachedAssets.set(id, cachedAssetInfo);
} else {
uncachedAssetIds.push(id);
}
} else {
uncachedAssetIds.push(id);
}
}

return { cachedAssets, uncachedAssetIds };
};

export const createAssetService =
(
assetProvider: TrackedAssetProvider,
assetCache$: Observable<Assets>,
totalBalance$: Observable<Cardano.Value>,
retryBackoffConfig: RetryBackoffConfig,
onFatalError?: (value: unknown) => void
onFatalError?: (value: unknown) => void,
maxAssetInfoCacheAge: Milliseconds = ONE_WEEK
// eslint-disable-next-line max-params
) =>
(assetIds: Cardano.AssetId[]) =>
concatAndCombineLatest(
chunk(assetIds, ASSET_INFO_FETCH_CHUNK_SIZE).map((assetIdsChunk) =>
coldObservableProvider({
onFatalError,
pollUntil: isEveryAssetInfoComplete,
provider: () =>
assetProvider.getAssets({
assetIds: assetIdsChunk,
extraData: { nftMetadata: true, tokenMetadata: true }
}),
retryBackoffConfig,
trigger$: of(true) // fetch only once
})
assetCache$.pipe(
switchMap((cache) =>
totalBalance$.pipe(
take(1),
switchMap((totalValue) => {
const { cachedAssets, uncachedAssetIds } = splitCachedAndUncachedAssets(
cache,
totalValue,
assetIds,
maxAssetInfoCacheAge
);

if (uncachedAssetIds.length === 0) {
return of([...cachedAssets.values()]);
}

return concatAndCombineLatest(
chunk(uncachedAssetIds, ASSET_INFO_FETCH_CHUNK_SIZE).map((assetIdsChunk) =>
coldObservableProvider({
onFatalError,
pollUntil: isEveryAssetInfoComplete,
provider: () =>
assetProvider.getAssets({
assetIds: assetIdsChunk,
extraData: { nftMetadata: true, tokenMetadata: true }
}),
retryBackoffConfig,
trigger$: of(true) // fetch only once
})
)
).pipe(
map((fetchedAssets) => {
// Update lastFetchedAt for all fetched assets and include a random delta to avoid having them expire at the same time.
for (const asset of fetchedAssets.flat()) {
const randomDelta = Math.floor(Math.random() * 2 * 24 * 60 * 60 * 1000); // Random time within 2 days
cachedAssets.set(asset.assetId, { ...asset, lastFetchedAt: new Date(Date.now() + randomDelta) });
}

return [...cachedAssets.values()];
})
);
})
)
)
).pipe(map((arr) => arr.flat())); // concat the chunk results
);

export type AssetService = ReturnType<typeof createAssetService>;

export interface AssetsTrackerProps {
transactionsTracker: TransactionsTracker;
assetProvider: TrackedAssetProvider;
retryBackoffConfig: RetryBackoffConfig;
logger: Logger;
assetsCache$: Observable<Assets>;
balanceTracker: BalanceTracker;
onFatalError?: (value: unknown) => void;
maxAssetInfoCacheAge?: Milliseconds;
}

interface AssetsTrackerInternals {
Expand All @@ -76,8 +164,28 @@ const uniqueAssetIds = ({ body: { outputs } }: Cardano.OnChainTx) =>
const flatUniqueAssetIds = (txes: Cardano.OnChainTx[]) => uniq(txes.flatMap(uniqueAssetIds));

export const createAssetsTracker = (
{ assetProvider, transactionsTracker: { history$ }, retryBackoffConfig, logger, onFatalError }: AssetsTrackerProps,
{ assetService = createAssetService(assetProvider, retryBackoffConfig, onFatalError) }: AssetsTrackerInternals = {}
{
assetProvider,
assetsCache$,
transactionsTracker: { history$ },
balanceTracker: {
utxo: { total$ }
},
retryBackoffConfig,
logger,
onFatalError,
maxAssetInfoCacheAge
}: AssetsTrackerProps,
{
assetService = createAssetService(
assetProvider,
assetsCache$,
total$,
retryBackoffConfig,
onFatalError,
maxAssetInfoCacheAge
)
}: AssetsTrackerInternals = {}
) =>
new Observable<Map<Cardano.AssetId, Asset.AssetInfo>>((subscriber) => {
let fetchedAssetInfoMap = new Map<Cardano.AssetId, Asset.AssetInfo>();
Expand Down
22 changes: 13 additions & 9 deletions packages/wallet/test/PersonalWallet/load.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,7 @@ import {
} from '../../src';
import { AddressType, AsyncKeyAgent, Bip32Account, GroupedAddress, util } from '@cardano-sdk/key-management';
import {
AssetId,
createStubStakePoolProvider,
generateRandomBigInt,
generateRandomHexString,
mockProviders as mocks,
somePartialStakePools
} from '@cardano-sdk/util-dev';
import {
Asset,
Cardano,
ChainHistoryProvider,
HandleProvider,
Expand All @@ -28,6 +21,14 @@ import {
UtxoProvider,
coalesceValueQuantities
} from '@cardano-sdk/core';
import {
AssetId,
createStubStakePoolProvider,
generateRandomBigInt,
generateRandomHexString,
mockProviders as mocks,
somePartialStakePools
} from '@cardano-sdk/util-dev';
import { InvalidConfigurationError } from '@cardano-sdk/tx-construction';
import { InvalidStringError } from '@cardano-sdk/util';
import { ReplaySubject, firstValueFrom } from 'rxjs';
Expand Down Expand Up @@ -140,6 +141,9 @@ const createWallet = async (props: CreateWalletProps) => {
);
};

const removeLastFetchedAt = (assetInfos: Map<Cardano.AssetId, Asset.AssetInfo>) =>
new Map([...assetInfos.entries()].map(([key, value]) => [key, { ...value, lastFetchedAt: undefined }]));

const assertWalletProperties = async (
wallet: BaseWallet,
expectedDelegateeId: Cardano.PoolId | undefined,
Expand Down Expand Up @@ -186,7 +190,7 @@ const assertWalletProperties = async (
expect(rewardAccounts[0].delegatee?.nextNextEpoch?.id).toEqual(expectedDelegateeId);
expect(rewardAccounts[0].rewardBalance).toBe(mocks.rewardAccountBalance);
// assets$
expect(await firstValueFrom(wallet.assetInfo$)).toEqual(
expect(removeLastFetchedAt(await firstValueFrom(wallet.assetInfo$))).toEqual(
new Map([
[AssetId.TSLA, mocks.asset],
[handleAssetId, handleAssetInfo]
Expand Down
Loading

0 comments on commit 7056add

Please sign in to comment.