Skip to content

Commit

Permalink
add retry logic to maintenance [no ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
YaroShkvorets committed Feb 12, 2025
1 parent 3c33f49 commit 2b1b015
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 113 deletions.
56 changes: 56 additions & 0 deletions src/utils/retry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
const DEFAULT_BATCH_SIZE = 30;
const DEFAULT_RETRY_SLEEP_MS = 1000;
const DEFAULT_MAX_RETRY_ATTEMPTS = 3;

// calls processor function on each item of items, in parallel, up to batchSize items at a time
export async function processQueue<T, S>(
items: T[],
processor: (item: T) => Promise<S>,
batchSize: number = DEFAULT_BATCH_SIZE,
): Promise<S[]> {
console.log(`Processing queue with ${items.length} items`);
const queue = [...items];
const inProgress = new Map<Promise<S>, number>();
const results: S[] = new Array(items.length);
let nextIndex = 0;

while (queue.length > 0 || inProgress.size > 0) {
while (inProgress.size < batchSize && queue.length > 0) {
const item = queue.shift()!;
const index = nextIndex++;
const promise = processor(item).then((result) => {
results[index] = result;
inProgress.delete(promise);
return result;
});
inProgress.set(promise, index);
}
if (inProgress.size > 0) {
await Promise.race(inProgress.keys());
}
}

return results;
}

// Helper function to implement retry logic
export async function withRetry<T>(
operation: () => Promise<T>,
maxAttempts: number = DEFAULT_MAX_RETRY_ATTEMPTS,
delayMs: number = DEFAULT_RETRY_SLEEP_MS,
): Promise<T> {
let lastError: any;

for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error;
if (attempt < maxAttempts) {
await new Promise((resolve) => setTimeout(resolve, delayMs));
console.log(`Retrying... (attempt ${attempt + 1}/${maxAttempts})`);
}
}
}
throw lastError;
}
1 change: 0 additions & 1 deletion src/validate_firehose.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ async function validateSingleEndpoint(
network: Network,
endpoint: string,
): Promise<void> {
console.log(` ${network.id} @ ${endpoint} `);
try {
const command = `grpcurl -H "X-Api-Key: ${process.env.SF_API_KEY}" ${endpoint} sf.firehose.v2.EndpointInfo/Info`;
const { stdout } = await execAsync(command);
Expand Down
193 changes: 81 additions & 112 deletions src/validate_urls.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,12 @@
import { printErrorsAndWarnings } from "./print";
import { Network } from "./types/registry";
import { applyEnvVars } from "./utils/env";
import { processQueue, withRetry } from "./utils/retry";
import { loadNetworks } from "./utils/fs";

const ERRORS: string[] = [];
const WARNINGS: string[] = [];
const TIMEOUT = 10000;
const FETCH_BATCH_SIZE = 30;

// calls processor function on each item of items, in parallel, up to batchSize items at a time
async function processQueue<T, S>(
items: T[],
processor: (item: T) => Promise<S>,
batchSize: number = FETCH_BATCH_SIZE,
): Promise<S[]> {
console.log(`Processing queue with ${items.length} items`);
const queue = [...items];
const inProgress = new Map<Promise<S>, number>();
const results: S[] = new Array(items.length);
let nextIndex = 0;

while (queue.length > 0 || inProgress.size > 0) {
while (inProgress.size < batchSize && queue.length > 0) {
const item = queue.shift()!;
const index = nextIndex++;
const promise = processor(item).then((result) => {
results[index] = result;
inProgress.delete(promise);
return result;
});
inProgress.set(promise, index);
}
if (inProgress.size > 0) {
await Promise.race(inProgress.keys());
}
}

return results;
}
const FETCH_TIMEOUT_MS = 10000;

async function testURL({
url,
Expand All @@ -47,29 +16,28 @@ async function testURL({
networkId: string;
}): Promise<boolean> {
try {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), TIMEOUT);
const response = await fetch(url, {
method: "HEAD",
signal: controller.signal,
});
await withRetry(async () => {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS);
const response = await fetch(url, {
method: "HEAD",
signal: controller.signal,
});

clearTimeout(timeout);
clearTimeout(timeout);

if (!response.ok) {
const err = `${networkId} - ${url} returned ${response.status}`;
//acceptable error codes
if ([308, 307, 403].includes(response.status)) {
console.warn(err);
return true;
if (!response.ok) {
const err = `${networkId} - ${url} returned ${response.status}`;
//acceptable error codes
if ([308, 307, 403].includes(response.status)) {
console.warn(err);
return;
}
throw new Error(err);
}
console.error(err);
WARNINGS.push(err);
return false;
}
console.log(` ${networkId} - URL is valid and accessible: ${url}`);
});
} catch (e) {
// we only care about thrown connection errors
// Only add warning/error after all retries have failed
console.error(` ${networkId} - exception at ${url}: ${e.message}`);
ERRORS.push(`\`${networkId}\` - unreachable URL: ${url}`);
return false;
Expand All @@ -87,24 +55,24 @@ async function testAPI({
networkId: string;
}): Promise<boolean> {
try {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), TIMEOUT);
const response = await fetch(url, {
method: "HEAD",
signal: controller.signal,
});
await withRetry(async () => {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS);
const response = await fetch(url, {
method: "HEAD",
signal: controller.signal,
});

clearTimeout(timeout);
clearTimeout(timeout);

if (!response.ok) {
console.log(
` ${networkId} - URL returned an error, which is probably fine: ${url} - ${response.status}`,
);
} else {
console.log(` ${networkId} - URL is valid and accessible: ${url}`);
}
if (!response.ok) {
console.log(
` ${networkId} - URL returned an error, which is probably fine: ${url} - ${response.status}`,
);
}
});
} catch (e) {
// we only care about thrown connection errors for now
// Only add warning after all retries have failed
console.error(` ${networkId} - exception at ${url}: ${e.message}`);
WARNINGS.push(`\`${networkId}\` - unreachable API: ${url}`);
return false;
Expand All @@ -120,56 +88,57 @@ async function testRpc({
url: string;
}): Promise<boolean> {
try {
const urlExpanded = applyEnvVars(url);
if (urlExpanded === "") {
return false;
}
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), TIMEOUT);
const response = await fetch(urlExpanded, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "eth_getBlockByNumber",
params: [`0x${network.genesis?.height.toString(16)}`, false],
}),
signal: controller.signal,
});

clearTimeout(timeout);
await withRetry(async () => {
const urlExpanded = applyEnvVars(url);
if (urlExpanded === "") {
return false;
}
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS);
const response = await fetch(urlExpanded, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "eth_getBlockByNumber",
params: [`0x${network.genesis?.height.toString(16)}`, false],
}),
signal: controller.signal,
});
clearTimeout(timeout);

if (!response.ok) {
const err = `\`${network.id}\` - bad response from RPC endpoint: ${url}`;
WARNINGS.push(err);
console.error(err + " - " + response.statusText);
return false;
}
if (!response.ok) {
throw new Error(`bad status: ${response.status}`);
}

const data = await response.json();
if (data.error || !data.result) {
const err = `\`${network.id}\` - non-archive RPC endpoint: ${url}`;
WARNINGS.push(err);
console.warn(err);
return false;
}
const data = await response.json();
if (data.error) {
throw new Error("bad response");
}
if (!data.result) {
throw new Error("empty response");
}

const genesisHash = data.result.hash;
if (genesisHash.toLowerCase() !== network.genesis?.hash.toLowerCase()) {
const err = `\`${network.id}\` - mismatched genesis hash at RPC endpoint: ${url}`;
ERRORS.push(err);
console.error(err);
return false;
}
console.log(` ${network.id}: genesis validated at ${url}: ${genesisHash}`);
const genesisHash = data.result.hash;
if (genesisHash?.toLowerCase() !== network.genesis?.hash.toLowerCase()) {
throw new Error("mismatched genesis hash");
}
});
} catch (e) {
if (e instanceof Error && e.name === "AbortError") {
WARNINGS.push(`\`${network.id}\` - RPC request timed out: ${url}`);
} else {
WARNINGS.push(`\`${network.id}\` - unreachable RPC: ${url}`);
let errorMessage = "unknown error";
if (e instanceof Error) {
if (e.message.includes("Unable to connect")) {
errorMessage = "unreachable host";
} else if (e.name === "AbortError") {
errorMessage = "request timeout";
} else {
errorMessage = e.message;
}
}
console.error(`\`${network.id}\` - exception at ${url}: ${e.message}`);
const err = `\`${network.id}\` - ${errorMessage} at RPC endpoint: ${url}`;
WARNINGS.push(err);
console.error(err);
return false;
}
return true;
Expand Down

0 comments on commit 2b1b015

Please sign in to comment.