Skip to content

Commit

Permalink
Fix #70 Add DLQ rebumitter tool
Browse files Browse the repository at this point in the history
  • Loading branch information
regevbr committed Nov 22, 2019
1 parent f4d9375 commit 4445ed2
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 58 deletions.
68 changes: 49 additions & 19 deletions src/Resubmitter.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
'use strict';

import {SQS_MAX_RECEIVE_BATCH, Squiss} from './Squiss';
import {ResubmitterConfig, ResubmitterMutator} from './Types';
import {ISquissOptions} from './Types';
import {Message} from './Message';
import {IMessageAttributes} from './attributeUtils';

const DEFAULT_SQUISS_OPTS = {
receiveAttributes: ['All'],
Expand All @@ -11,24 +12,47 @@ const DEFAULT_SQUISS_OPTS = {
unwrapSns: false,
};

type MessageData = ResubmitterMutatorData;

export interface ResubmitterMutatorData {
attributes: IMessageAttributes;
body?: string | any;
}

export type ResubmitterMutator = (message: ResubmitterMutatorData) => Promise<ResubmitterMutatorData>;

export interface ResubmitterConfig {
queues: {
resubmitFromQueueConfig: ISquissOptions;
resubmitToQueueConfig: ISquissOptions;
};
limit: number;
customMutator?: ResubmitterMutator;
releaseTimeoutSeconds: number;
keepHandledMessages?: boolean;
continueOnFail?: boolean;
sendMessageDelaySeconds?: number;
}

export class Resubmitter {

public squissFrom: Squiss;
public squissTo: Squiss;
public _squissFrom: Squiss;
public _squissTo: Squiss;
private _numHandledMessages = 0;
private _handledMessages = new Set<string>();
private readonly _handledMessages = new Set<string>();
private readonly _limit: number;
private readonly _customMutator?: ResubmitterMutator;
private readonly _releaseTimeoutSeconds: number;
private readonly _keepHandledMessages?: boolean;
private readonly _continueOnFail?: boolean;
private readonly _sendMessageDelaySeconds?: number;

constructor(config: ResubmitterConfig) {
this.squissFrom = new Squiss({
this._squissFrom = new Squiss({
...config.queues.resubmitFromQueueConfig,
...DEFAULT_SQUISS_OPTS,
});
this.squissTo = new Squiss({
this._squissTo = new Squiss({
...config.queues.resubmitToQueueConfig,
...DEFAULT_SQUISS_OPTS,
});
Expand All @@ -37,11 +61,12 @@ export class Resubmitter {
this._releaseTimeoutSeconds = config.releaseTimeoutSeconds;
this._keepHandledMessages = config.keepHandledMessages;
this._continueOnFail = config.continueOnFail;
this._sendMessageDelaySeconds = config.sendMessageDelaySeconds;
}

public run() {
this._numHandledMessages = 0;
this._handledMessages = new Set<string>();
this._handledMessages.clear();
return Promise.resolve()
.then(() => {
return this._iteration();
Expand All @@ -52,24 +77,29 @@ export class Resubmitter {
return message.changeVisibility(this._releaseTimeoutSeconds);
}

private _getMessageToSend(message: Message): Promise<MessageData> {
const data: MessageData = {
body: message.body,
attributes: message.attributes,
};
if (this._customMutator) {
return this._customMutator(data);
}
return Promise.resolve(data);
}

private _handleMessage(message: Message): Promise<void> {
console.log(`${++this._numHandledMessages} messages handled`);
const location = message.raw.MessageId ?? '';
if (this._numHandledMessages > this._limit || this._handledMessages.has(location)) {
return this._changeMessageVisibility(message);
}
this._handledMessages.add(location);
let body = message.body;
let attributes = message.attributes;
if (this._customMutator) {
const mutateResults = this._customMutator({
body,
attributes,
});
body = mutateResults.body;
attributes = mutateResults.attributes;
}
return this.squissTo.sendMessage(body, undefined, attributes)
return this._getMessageToSend(message)
.then((messageData) => {
return this._squissTo.sendMessage(
messageData.body, this._sendMessageDelaySeconds, messageData.attributes);
})
.then(() => {
if (this._keepHandledMessages) {
return this._changeMessageVisibility(message);
Expand All @@ -93,7 +123,7 @@ export class Resubmitter {
return Promise.resolve();
}
const numberOfMessageToRead = Math.min(SQS_MAX_RECEIVE_BATCH, remainingMessagesToHandle);
return this.squissFrom.getManualBatch(numberOfMessageToRead)
return this._squissFrom.getManualBatch(numberOfMessageToRead)
.then((messages) => {
if (!messages.length) {
this._numHandledMessages = this._limit;
Expand Down
12 changes: 6 additions & 6 deletions src/Squiss.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
});
}

public sendMessage(message: IMessageToSend, delay?: number, attributes?: IMessageAttributes)
public sendMessage(message: IMessageToSend, delaySeconds?: number, attributes?: IMessageAttributes)
: Promise<SQS.Types.SendMessageResult> {
return Promise.all([
this._prepareMessageRequest(message, delay, attributes),
this._prepareMessageRequest(message, delaySeconds, attributes),
this.getQueueUrl(),
])
.then((data) => {
Expand Down Expand Up @@ -516,9 +516,9 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
});
}

private _prepareMessageParams(message: IMessageToSend, delay?: number, attributes?: IMessageAttributes) {
private _prepareMessageParams(message: IMessageToSend, delaySeconds?: number, attributes?: IMessageAttributes) {
const messageStr = isString(message) ? message : JSON.stringify(message);
const params: ISendMessageRequest = {MessageBody: messageStr, DelaySeconds: delay};
const params: ISendMessageRequest = {MessageBody: messageStr, DelaySeconds: delaySeconds};
attributes = Object.assign({}, attributes);
params.MessageGroupId = attributes.FIFO_MessageGroupId;
delete attributes.FIFO_MessageGroupId;
Expand Down Expand Up @@ -563,15 +563,15 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
});
}

private _prepareMessageRequest(message: IMessageToSend, delay?: number, attributes?: IMessageAttributes)
private _prepareMessageRequest(message: IMessageToSend, delaySeconds?: number, attributes?: IMessageAttributes)
: Promise<ISendMessageRequest> {
if (attributes && attributes[GZIP_MARKER]) {
return Promise.reject(new Error(`Using of internal attribute ${GZIP_MARKER} is not allowed`));
}
if (attributes && attributes[S3_MARKER]) {
return Promise.reject(new Error(`Using of internal attribute ${S3_MARKER} is not allowed`));
}
return this._prepareMessageParams(message, delay, attributes)
return this._prepareMessageParams(message, delaySeconds, attributes)
.then(this._handleLargeMessagePrepare.bind(this))
.then((params) => {
return removeEmptyKeys(params);
Expand Down
20 changes: 0 additions & 20 deletions src/Types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {BatchResultErrorEntry} from 'aws-sdk/clients/sqs';
import {IS3Upload} from './s3Utils';
import {StrictEventEmitter} from './EventEmitterTypesHelper';
import {EventEmitter} from 'events';
import {IMessageAttributes} from './attributeUtils';

export interface IMessageDeletedEventPayload {
msg: Message;
Expand Down Expand Up @@ -145,22 +144,3 @@ export interface ISquissEvents {
}

export type SquissEmitter = StrictEventEmitter<EventEmitter, ISquissEvents>;

export interface ResubmitterMutatorData {
attributes: IMessageAttributes;
body?: string | any;
}

export type ResubmitterMutator = (message: ResubmitterMutatorData) => ResubmitterMutatorData;

export interface ResubmitterConfig {
queues: {
resubmitFromQueueConfig: ISquissOptions;
resubmitToQueueConfig: ISquissOptions;
};
limit: number;
customMutator?: ResubmitterMutator;
releaseTimeoutSeconds: number;
keepHandledMessages?: boolean;
continueOnFail?: boolean;
}
9 changes: 5 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ export {
IObject,
ISendMessageRequest,
ISquissOptions,
ResubmitterConfig,
ResubmitterMutator,
ResubmitterMutatorData
} from './Types';
export {IMessageAttributes} from './attributeUtils';
export {Message} from './Message';
export {Resubmitter} from './Resubmitter';
export {
Resubmitter, ResubmitterConfig,
ResubmitterMutator,
ResubmitterMutatorData
} from './Resubmitter';
18 changes: 9 additions & 9 deletions src/test/src/Resubmitter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ describe('resubmitter', () => {
},
},
});
resubmitter.squissFrom = squissFrom;
resubmitter.squissTo = squissTo;
resubmitter._squissFrom = squissFrom;
resubmitter._squissTo = squissTo;
return resubmitter.run();
});

Expand All @@ -47,11 +47,11 @@ describe('resubmitter', () => {
},
},
customMutator: (obj) => {
return obj;
return Promise.resolve(obj);
},
});
resubmitter.squissFrom = squissFrom;
resubmitter.squissTo = squissTo;
resubmitter._squissFrom = squissFrom;
resubmitter._squissTo = squissTo;
return resubmitter.run();
});

Expand All @@ -73,8 +73,8 @@ describe('resubmitter', () => {
},
},
});
resubmitter.squissFrom = squissFrom;
resubmitter.squissTo = squissTo;
resubmitter._squissFrom = squissFrom;
resubmitter._squissTo = squissTo;
return resubmitter.run();
});

Expand All @@ -96,8 +96,8 @@ describe('resubmitter', () => {
},
},
});
resubmitter.squissFrom = squissFrom;
resubmitter.squissTo = squissTo;
resubmitter._squissFrom = squissFrom;
resubmitter._squissTo = squissTo;
return resubmitter.run();
});

Expand Down

0 comments on commit 4445ed2

Please sign in to comment.