Skip to content

Commit

Permalink
Merge pull request #67 from getlift/queue-new-commands
Browse files Browse the repository at this point in the history
New `queue` commands: logs and "send message"
  • Loading branch information
mnapoli authored Jul 5, 2021
2 parents 88d2fc7 + 4eede23 commit 1d5f71c
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 8 deletions.
16 changes: 16 additions & 0 deletions docs/queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,27 @@ functions:
The following commands are available on `queue` constructs:

```
serverless <construct-name>:logs
serverless <construct-name>:send
serverless <construct-name>:failed
serverless <construct-name>:failed:purge
serverless <construct-name>:failed:retry
```

- `serverless <construct-name>:logs`

This command displays the logs of the Lambda "worker" function.

It is an alias to `serverless logs --function <construct-name>Worker` and supports the same options, for example `--tail` to tail logs live.

- `serverless <construct-name>:send`

Send a message into the SQS queue.

This command can be useful while developing to push sample messages into the queue.

When the command runs, it will prompt for the body of the SQS message. It is also possible to provide the body via the `--body="message body here"` option.

- `serverless <construct-name>:failed`

This command lists the failed messages stored in the dead letter queue.
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"chalk": "^4.1.1",
"change-case": "^4.1.2",
"cidr-split": "^0.1.2",
"inquirer": "^7.3.3",
"js-yaml": "^3.14.1",
"lodash": "^4.17.21",
"log-symbols": "^4.1.0",
Expand All @@ -35,6 +36,7 @@
"@serverless/test": "^8.2.0",
"@serverless/typescript": "^2.49.0",
"@types/chai": "^4.2.19",
"@types/inquirer": "^7.3.2",
"@types/jest": "^26.0.23",
"@types/js-yaml": "^3.12.5",
"@types/json-schema": "^7.0.7",
Expand Down
6 changes: 4 additions & 2 deletions src/classes/Construct.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { PolicyStatement } from "../CloudFormation";
import { AwsProvider } from "./AwsProvider";
import { CliOptions } from "../types/serverless";

/**
* Defines which methods a Lift construct must expose.
Expand Down Expand Up @@ -44,11 +45,12 @@ export interface StaticConstructInterface {
export type ConstructCommands = Record<string, ConstructCommandDefinition>;
type ConstructCommandDefinition = {
usage: string;
handler: (opt: Record<string, string>) => void | Promise<void>;
handler: (options: CliOptions) => void | Promise<void>;
options?: {
[name: string]: {
usage: string;
required: boolean;
type: string;
required?: boolean;
shortcut?: string;
};
};
Expand Down
92 changes: 88 additions & 4 deletions src/constructs/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import { Subscription, SubscriptionProtocol, Topic } from "@aws-cdk/aws-sns";
import { AlarmActionConfig } from "@aws-cdk/aws-cloudwatch/lib/alarm-action";
import { Construct as CdkConstruct, CfnOutput, Duration } from "@aws-cdk/core";
import chalk from "chalk";
import { PurgeQueueRequest } from "aws-sdk/clients/sqs";
import { PurgeQueueRequest, SendMessageRequest } from "aws-sdk/clients/sqs";
import ora from "ora";
import { spawnSync } from "child_process";
import * as inquirer from "inquirer";
import { AwsConstruct, AwsProvider } from "../classes";
import { pollMessages, retryMessages } from "./queue/sqs";
import { sleep } from "../utils/sleep";
import { PolicyStatement } from "../CloudFormation";
import { ConstructCommands } from "../classes/Construct";
import { CliOptions } from "../types/serverless";

const QUEUE_DEFINITION = {
type: "object",
Expand Down Expand Up @@ -43,16 +46,50 @@ export class Queue extends AwsConstruct {
public static type = "queue";
public static schema = QUEUE_DEFINITION;
public static commands: ConstructCommands = {
logs: {
usage: "Output the logs of the queue's worker function",
handler: Queue.prototype.displayLogs,
options: {
tail: {
usage: "Tail the log output",
shortcut: "t",
type: "boolean",
},
startTime: {
usage: "Logs before this time will not be displayed. Default: `10m` (last 10 minutes logs only)",
type: "string",
},
filter: {
usage: "A filter pattern",
type: "string",
},
interval: {
usage: "Tail polling interval in milliseconds. Default: `1000`",
shortcut: "i",
type: "string",
},
},
},
send: {
usage: "Send a new message to the SQS queue",
handler: Queue.prototype.sendMessage,
options: {
body: {
usage: "Body of the SQS message",
type: "string",
},
},
},
failed: {
usage: "List failed messages from the dead letter queue.",
usage: "List failed messages from the dead letter queue",
handler: Queue.prototype.listDlq,
},
"failed:purge": {
usage: "Purge failed messages from the dead letter queue.",
usage: "Purge failed messages from the dead letter queue",
handler: Queue.prototype.purgeDlq,
},
"failed:retry": {
usage: "Retry failed messages from the dead letter queue by moving them to the main queue.",
usage: "Retry failed messages from the dead letter queue by moving them to the main queue",
handler: Queue.prototype.retryDlq,
},
};
Expand Down Expand Up @@ -330,6 +367,40 @@ export class Queue extends AwsConstruct {
progress.succeed(`${totalMessagesRetried} failed message(s) moved to the main queue to be retried 💪`);
}

async sendMessage(options: CliOptions): Promise<void> {
const queueUrl = await this.getQueueUrl();
if (queueUrl === undefined) {
console.log(
chalk.red("Could not find the queue in the deployed stack. Try running 'serverless deploy' first?")
);

return;
}

const body = typeof options.body === "string" ? options.body : await this.askMessageBody();

await this.provider.request<SendMessageRequest, never>("SQS", "sendMessage", {
QueueUrl: queueUrl,
MessageBody: body,
});
}

displayLogs(options: CliOptions): void {
const args = ["logs", "--function", `${this.id}Worker`];
for (const [option, value] of Object.entries(options)) {
args.push(option.length === 1 ? `-${option}` : `--${option}`);
if (typeof value === "string") {
args.push(value);
}
}
console.log(chalk.gray(`serverless ${args.join(" ")}`));
args.unshift(process.argv[1]);
spawnSync(process.argv[0], args, {
cwd: process.cwd(),
stdio: "inherit",
});
}

private formatMessageBody(body: string): string {
try {
// If it's valid JSON, we'll format it nicely
Expand All @@ -341,4 +412,17 @@ export class Queue extends AwsConstruct {
return body;
}
}

private async askMessageBody(): Promise<string> {
const responses = await inquirer.prompt({
message: "What is the body of the SQS message to send (can be JSON or any string)",
type: "editor",
name: "body",
validate: (input: string) => {
return input.length > 0 ? true : "The message body cannot be empty";
},
});

return (responses.body as string).trim();
}
}
2 changes: 1 addition & 1 deletion src/constructs/queue/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
SendMessageBatchRequest,
SendMessageBatchResult,
} from "aws-sdk/clients/sqs";
import { AwsProvider } from "../../classes/AwsProvider";
import { AwsProvider } from "../../classes";
import { log } from "../../utils/logger";
import { sleep } from "../../utils/sleep";

Expand Down
4 changes: 3 additions & 1 deletion src/types/serverless.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ export type CommandsDefinition = Record<
options?: {
[name: string]: {
usage: string;
required: boolean;
required?: boolean;
shortcut?: string;
};
};
}
>;

export type CliOptions = Record<string, string | boolean | string[]>;
21 changes: 21 additions & 0 deletions test/unit/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,25 @@ describe("queues", () => {
],
});
});

it("should send a message to the queue", async () => {
const awsMock = mockAws();
sinon.stub(CloudFormationHelpers, "getStackOutput").resolves("queue-url");
const sendSpy = awsMock.mockService("SQS", "sendMessage").resolves();

await runServerless({
fixture: "queues",
configExt: pluginConfigExt,
command: "emails:send",
options: {
body: "Message body",
},
});

expect(sendSpy.callCount).toBe(1);
expect(sendSpy.firstCall.firstArg).toStrictEqual({
QueueUrl: "queue-url",
MessageBody: "Message body",
});
});
});
1 change: 1 addition & 0 deletions test/utils/@serverless__test.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ declare module "@serverless/test/run-serverless" {

type RunServerlessBaseOptions = Partial<{
command: string;
options: Record<string, boolean | string>;
configExt: Partial<AWS> | Record<string, unknown>;
env: Record<string, string>;
awsRequestStubMap: unknwon;
Expand Down

0 comments on commit 1d5f71c

Please sign in to comment.