Skip to content

Commit

Permalink
Merge pull request #9 from VivekRajagopal/feat/gae-js-tasks/allow-app…
Browse files Browse the repository at this point in the history
…-engine-host-override

feat(tasks): allow app engine host override
  • Loading branch information
mbyrne00 authored Oct 17, 2023
2 parents b7f6bab + 2b65d63 commit 4f31c51
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 36 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/gae-js-tasks/src/tasks/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./task-queue-service";
export * from "./tasks-provider";
export * from "./types";
export { localTasksServiceAccountEmailKey } from "./local-tasks";
42 changes: 32 additions & 10 deletions packages/gae-js-tasks/src/tasks/local-tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,31 @@ import { CreateTaskRequest } from "./types";
const taskNames = new Set<string>();
const logger = createLogger("LocalTasks");

export const localTasksServiceAccountEmailKey = "x-local-tasks-service-account-email";
export const createLocalTask = async (targetHost: string, createTaskRequest: CreateTaskRequest) => {
const { parent, task } = createTaskRequest;
if (!parent || !task) throw new BadRequestError("parent and task must be supplied");

const { appEngineHttpRequest } = task;
if (!appEngineHttpRequest) throw new BadRequestError("Only supports app engine task requests");
const { appEngineHttpRequest, httpRequest } = task;
if (!appEngineHttpRequest && !httpRequest)
throw new BadRequestError("appEngineHttpRequest or httpRequest must be defined");

const { relativeUri } = appEngineHttpRequest;
if (!relativeUri) throw new BadRequestError("relativeUri must be supplied");
const getEndpoint = () => {
if (appEngineHttpRequest) {
return `${targetHost}${appEngineHttpRequest.relativeUri}`;
}

if (httpRequest?.url) {
const url = new URL(httpRequest.url);
return `${targetHost}${url.pathname}`;
}
};

const endpoint = getEndpoint();

if (!endpoint) {
throw new BadRequestError("endpoint could not be resolved");
}

if (task.name) {
if (taskNames.has(task.name)) {
Expand All @@ -26,11 +42,16 @@ export const createLocalTask = async (targetHost: string, createTaskRequest: Cre
taskNames.add(task.name);
}

const endpoint = `${targetHost}${appEngineHttpRequest.relativeUri}`;
const delayMs = task.scheduleTime?.seconds ? Number(task.scheduleTime?.seconds) * 1000 - new Date().getTime() : 0;
const body = appEngineHttpRequest.body
? Buffer.from(appEngineHttpRequest.body as string, "base64").toString("ascii")
: undefined;
const getBody = () => {
if (appEngineHttpRequest) {
return Buffer.from(appEngineHttpRequest.body as string, "base64").toString("ascii");
}

if (httpRequest) {
return Buffer.from(httpRequest.body as string, "base64").toString("ascii");
}
};

// Intentionally don't return this promise because we want the task to be executed
// asynchronously - i.e. a tiny bit like a task queue would work. Otherwise, if the caller
Expand All @@ -40,10 +61,11 @@ export const createLocalTask = async (targetHost: string, createTaskRequest: Cre
.then(() => {
return fetch(endpoint, {
method: "POST",
body,
body: getBody(),
headers: {
"content-type": "application/json",
"x-appengine-taskname": relativeUri,
"x-appengine-taskname": endpoint,
[localTasksServiceAccountEmailKey]: httpRequest?.oidcToken?.serviceAccountEmail || "",
},
});
})
Expand Down
32 changes: 32 additions & 0 deletions packages/gae-js-tasks/src/tasks/task-queue-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,31 @@ describe("TaskQueueService", () => {
})
);

it(
"creates task params for host override routing",
withEnvVars({ [ENV_VAR_RUNTIME_ENVIRONMENT]: "appengine" }, async () => {
tasksProvider.init();
taskQueueService = new TaskQueueService({
appEngineHost: "https://my-host.com",
oidcServiceAccountEmail: "sacount@gnet.com",
});

await taskQueueService.enqueue("test-task", { data: { key: "value1" } });

expectTaskParams({
httpRequest: {
url: "https://my-host.com/tasks/test-task",
httpMethod: "POST",
headers: {
"Content-Type": "application/json",
},
body: Buffer.from(JSON.stringify({ key: "value1" })).toString("base64"),
oidcToken: { serviceAccountEmail: "sacount@gnet.com" },
},
});
})
);

it(
"creates task params for throttling",
withEnvVars({ [ENV_VAR_RUNTIME_ENVIRONMENT]: "appengine" }, async () => {
Expand Down Expand Up @@ -187,6 +212,13 @@ describe("TaskQueueService", () => {
await waitUntil(() => scope.isDone());
});

it("posts to local task service given appEngineHost override", async () => {
const scope = nock("http://127.0.0.1").post("/tasks/local-task").reply(204);
taskQueueService = new TaskQueueService({ appEngineHost: "https://my-host.com" });
await taskQueueService.enqueue("/local-task");
await waitUntil(() => scope.isDone());
});

interface TestPayload {
some: string;
}
Expand Down
52 changes: 38 additions & 14 deletions packages/gae-js-tasks/src/tasks/task-queue-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import { tasksProvider } from "./tasks-provider";
import { createLocalTask } from "./local-tasks";
import { isGoogleGaxError } from "../utils/errors";
import { google } from "@google-cloud/tasks/build/protos/protos";

export class TaskQueueService {
private logger = createLogger("taskQueueService");
Expand Down Expand Up @@ -85,14 +86,7 @@ export class TaskQueueService {
return {
parent: queuePath,
task: {
appEngineHttpRequest: {
relativeUri: `${this.fullTaskPath(path)}`,
headers: {
"Content-Type": "application/json",
},
body: requestPayload,
...this.taskRouting(),
},
...this.taskRouting(path, requestPayload),
...this.taskSchedule(inSeconds),
...this.taskThrottle(queuePath, throttle),
},
Expand All @@ -109,17 +103,47 @@ export class TaskQueueService {
: {};
}

private taskRouting() {
const { tasksRoutingService, tasksRoutingVersion } = this.options;
private taskRouting(path: string, requestPayload?: string) {
const { tasksRoutingService, tasksRoutingVersion, appEngineHost, oidcServiceAccountEmail } = this.options;

if (appEngineHost) {
const httpRequest: google.cloud.tasks.v2.IHttpRequest = {
url: `${appEngineHost}${this.fullTaskPath(path)}`,
httpMethod: "POST",
body: requestPayload,
headers: {
"Content-Type": "application/json",
},
...(oidcServiceAccountEmail ? { oidcToken: { serviceAccountEmail: oidcServiceAccountEmail } } : {}),
};
return { httpRequest };
}

if (tasksRoutingVersion || tasksRoutingService) {
return {
appEngineRouting: {
...(tasksRoutingService ? { service: tasksRoutingService } : {}),
...(tasksRoutingVersion ? { version: tasksRoutingVersion } : {}),
appEngineHttpRequest: {
relativeUri: `${this.fullTaskPath(path)}`,
headers: {
"Content-Type": "application/json",
},
body: requestPayload,
appEngineRouting: {
...(tasksRoutingService ? { service: tasksRoutingService } : {}),
...(tasksRoutingVersion ? { version: tasksRoutingVersion } : {}),
},
},
};
}
return {};

return {
appEngineHttpRequest: {
relativeUri: `${this.fullTaskPath(path)}`,
headers: {
"Content-Type": "application/json",
},
body: requestPayload,
},
};
}

private taskThrottle(queuePath: string, options?: TaskThrottle) {
Expand Down
43 changes: 33 additions & 10 deletions packages/gae-js-tasks/src/tasks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,37 @@ type Mandatory<T, K extends keyof T> = Pick<Required<T>, K> & Omit<T, K>;

export type CreateTaskRequest = Parameters<CloudTasksClient["createTask"]>[0];

export interface CreateTaskQueueServiceOptions {
export type CreateTaskQueueServiceRouting =
| {
/**
* The specific App Engine version to dispatch requests to.
*/
tasksRoutingVersion?: string;
/**
* The specific App Engine service to dispatch requests to.
*/
tasksRoutingService?: string;
appEngineHost?: never;
oidcServiceAccountEmail?: never;
}
| {
tasksRoutingVersion?: never;
tasksRoutingService?: never;

/**
* Override the appEngineHost when using a push queue. This will create a task with `httpRequest` params.
* Use this when you want the request to be routed to a different host than the default GAE appspot domain.
*/
appEngineHost?: string;

/**
* Should be the email of an existing Service Account in the same project.
* Authorizes the request with a Bearer JWT id token.
*/
oidcServiceAccountEmail?: string;
};

export type CreateTaskQueueServiceOptions = {
/**
* Tasks projectId - most likely the same project as your application.
* Defaults to application projectId configuration
Expand Down Expand Up @@ -32,19 +62,12 @@ export interface CreateTaskQueueServiceOptions {
* Defaults to application "host" configuration.
*/
localBaseUrl?: string;
/**
* The specific App Engine version to dispatch requests to.
*/
tasksRoutingVersion?: string;
/**
* The specific App Engine service to dispatch requests to.
*/
tasksRoutingService?: string;

/**
* Tasks client to use (if not using tasksProvider)
*/
tasksClient?: CloudTasksClient;
}
} & CreateTaskQueueServiceRouting;

export type TaskQueueServiceOptions = Mandatory<
CreateTaskQueueServiceOptions,
Expand Down

0 comments on commit 4f31c51

Please sign in to comment.