Skip to content

Commit

Permalink
Fix #70 Add DLQ rebumitter tool
Browse files Browse the repository at this point in the history
  • Loading branch information
regevbr committed Nov 30, 2019
1 parent 4445ed2 commit ff12892
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 5 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
},
"homepage": "https://github.com/PruvoNet/squiss-ts#readme",
"dependencies": {
"aws-sdk": "^2.577.0",
"aws-sdk": "^2.578.0",
"iltorb": "2.4.3",
"linked-list": "^2.1.0",
"ts-type-guards": "^0.6.1",
Expand Down
2 changes: 1 addition & 1 deletion src/Resubmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ export class Resubmitter {
return Promise.resolve();
}
const numberOfMessageToRead = Math.min(SQS_MAX_RECEIVE_BATCH, remainingMessagesToHandle);
return this._squissFrom.getManualBatch(numberOfMessageToRead)
return this._squissFrom.getManualBatch(numberOfMessageToRead, this._releaseTimeoutSeconds)
.then((messages) => {
if (!messages.length) {
this._numHandledMessages = this._limit;
Expand Down
10 changes: 7 additions & 3 deletions src/Squiss.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,22 +312,26 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
return this._s3;
}

public getManualBatch(maxMessagesToGet: number): Promise<Message[]> {
public getManualBatch(maxMessagesToGet: number, failedMessageVisibility?: number): Promise<Message[]> {
return this.getQueueUrl()
.then((queueUrl) => {
return this._getBatchRequest(queueUrl, Math.min(maxMessagesToGet, SQS_MAX_RECEIVE_BATCH)).promise();
})
.then((data) => {
const parsedMessage: Message[] = [];
const messages = data?.Messages ?? [];
const messages = data.Messages ?? [];
const parseMessagesPromises = messages.map((msg) => {
const message = this._createMessageInstance(msg);
return message.parse()
.then(() => {
parsedMessage.push(message);
})
.catch((e: Error) => {
message.release();
if (failedMessageVisibility){
message.changeVisibility(failedMessageVisibility);
} else {
message.release();
}
});
});
return Promise.all(parseMessagesPromises)
Expand Down
147 changes: 147 additions & 0 deletions src/test/src/Resubmitter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import {Resubmitter} from '../../index';
import {SQSStub} from '../stubs/SQSStub';
import {Squiss, SQS} from '../../index';
// @ts-ignore
import * as sinon from 'sinon';

describe('resubmitter', () => {

Expand Down Expand Up @@ -101,4 +103,149 @@ describe('resubmitter', () => {
return resubmitter.run();
});

it('should release message if failed to parse it', function() {
this.timeout(2000000);
const squissFrom = new Squiss({queueUrl: 'foo_DLQ', deleteWaitMs: 1, bodyFormat: 'json'});
const stub = new SQSStub(0, 0);
squissFrom!.sqs = stub as any as SQS;
stub.sendMessage({
MessageBody: '{',
QueueUrl: 'url__',
});
const visibilitySpy = sinon.spy(squissFrom!.sqs, 'changeMessageVisibility');
const squissTo = new Squiss({queueUrl: 'foo'});
squissTo!.sqs = new SQSStub(0, 0) as any as SQS;
const spy = sinon.spy();
const resubmitter = new Resubmitter({
limit: 1,
releaseTimeoutSeconds: 45,
queues: {
resubmitFromQueueConfig: {
queueUrl: 'foo_DLQ',
},
resubmitToQueueConfig: {
queueUrl: 'foo',
},
},
customMutator: spy,
});
resubmitter._squissFrom = squissFrom;
resubmitter._squissTo = squissTo;
return resubmitter.run()
.then(() => {
spy.should.not.be.called();
visibilitySpy.should.be.calledWith({
QueueUrl: 'foo_DLQ',
ReceiptHandle: 'url__',
VisibilityTimeout: 45,
});
});
});

it('should release message if failed to parse it and no visibility timeout', function() {
this.timeout(2000000);
const squissFrom = new Squiss({queueUrl: 'foo_DLQ', deleteWaitMs: 1, bodyFormat: 'json'});
const stub = new SQSStub(0, 0);
squissFrom!.sqs = stub as any as SQS;
stub.sendMessage({
MessageBody: '{',
QueueUrl: 'url__',
});
const visibilitySpy = sinon.spy(squissFrom!.sqs, 'changeMessageVisibility');
const squissTo = new Squiss({queueUrl: 'foo'});
squissTo!.sqs = new SQSStub(0, 0) as any as SQS;
const spy = sinon.spy();
const resubmitter = new Resubmitter({
limit: 1,
releaseTimeoutSeconds: 0,
queues: {
resubmitFromQueueConfig: {
queueUrl: 'foo_DLQ',
},
resubmitToQueueConfig: {
queueUrl: 'foo',
},
},
customMutator: spy,
});
resubmitter._squissFrom = squissFrom;
resubmitter._squissTo = squissTo;
return resubmitter.run()
.then(() => {
spy.should.not.be.called();
visibilitySpy.should.be.calledWith({
QueueUrl: 'foo_DLQ',
ReceiptHandle: 'url__',
VisibilityTimeout: 0,
});
});
});

it('should properly handle messages without id', function() {
this.timeout(2000000);
const squissFrom = new Squiss({queueUrl: 'foo_DLQ', deleteWaitMs: 1});
const stub = new SQSStub(1, 0);
squissFrom!.sqs = stub as any as SQS;
if (stub.msgs[0]) {
stub.msgs[0].MessageId = undefined;
}
const squissTo = new Squiss({queueUrl: 'foo'});
squissTo!.sqs = new SQSStub(0, 0) as any as SQS;
const spy = sinon.stub().resolvesArg(0);
const resubmitter = new Resubmitter({
limit: 1,
releaseTimeoutSeconds: 45,
queues: {
resubmitFromQueueConfig: {
queueUrl: 'foo_DLQ',
},
resubmitToQueueConfig: {
queueUrl: 'foo',
},
},
customMutator: spy,
});
resubmitter._squissFrom = squissFrom;
resubmitter._squissTo = squissTo;
return resubmitter.run()
.then(() => {
spy.should.have.callCount(1);
});
});

it('should properly handle duplicate messages without same id', function() {
this.timeout(2000000);
const squissFrom = new Squiss({queueUrl: 'foo_DLQ', deleteWaitMs: 1});
const stub = new SQSStub(2, 0);
squissFrom!.sqs = stub as any as SQS;
if (stub.msgs[0]) {
stub.msgs[0].MessageId = 'myId';
}
if (stub.msgs[1]) {
stub.msgs[1].MessageId = 'myId';
}
const squissTo = new Squiss({queueUrl: 'foo'});
squissTo!.sqs = new SQSStub(0, 0) as any as SQS;
const spy = sinon.stub().resolvesArg(0);
const resubmitter = new Resubmitter({
limit: 2,
releaseTimeoutSeconds: 45,
queues: {
resubmitFromQueueConfig: {
queueUrl: 'foo_DLQ',
},
resubmitToQueueConfig: {
queueUrl: 'foo',
},
},
customMutator: spy,
});
resubmitter._squissFrom = squissFrom;
resubmitter._squissTo = squissTo;
return resubmitter.run()
.then(() => {
spy.should.have.callCount(1);
});
});

});

0 comments on commit ff12892

Please sign in to comment.