Skip to content

Commit

Permalink
More CS Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
JimTools committed Jan 17, 2025
1 parent 8e72d15 commit 44437dd
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 38 deletions.
2 changes: 1 addition & 1 deletion pkg/amqp-ext/AmqpConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public function reject(Message $message, bool $requeue = false): void

$this->getExtQueue()->reject(
$message->getDeliveryTag(),
$requeue ? AMQP_REQUEUE : AMQP_NOPARAM
$requeue ? \AMQP_REQUEUE : \AMQP_NOPARAM
);
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp-ext/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public function unbind(InteropAmqpBind $bind): void
public function createTemporaryQueue(): Queue
{
$extQueue = new \AMQPQueue($this->getExtChannel());
$extQueue->setFlags(AMQP_EXCLUSIVE);
$extQueue->setFlags(\AMQP_EXCLUSIVE);

$extQueue->declareQueue();

Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp-ext/AmqpProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private function doSend(AmqpDestination $destination, AmqpMessage $message): voi
} else {
/** @var AmqpQueue $destination */
$amqpExchange = new \AMQPExchange($this->amqpChannel);
$amqpExchange->setType(AMQP_EX_TYPE_DIRECT);
$amqpExchange->setType(\AMQP_EX_TYPE_DIRECT);
$amqpExchange->setName('');

$amqpExchange->publish(
Expand Down
34 changes: 17 additions & 17 deletions pkg/amqp-ext/Flags.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,86 +12,86 @@ class Flags
{
public static function convertMessageFlags(int $interop): int
{
$flags = AMQP_NOPARAM;
$flags = \AMQP_NOPARAM;

if ($interop & InteropAmqpMessage::FLAG_MANDATORY) {
$flags |= AMQP_MANDATORY;
$flags |= \AMQP_MANDATORY;
}

if ($interop & InteropAmqpMessage::FLAG_IMMEDIATE) {
$flags |= AMQP_IMMEDIATE;
$flags |= \AMQP_IMMEDIATE;
}

return $flags;
}

public static function convertTopicFlags(int $interop): int
{
$flags = AMQP_NOPARAM;
$flags = \AMQP_NOPARAM;

$flags |= static::convertDestinationFlags($interop);

if ($interop & InteropAmqpTopic::FLAG_INTERNAL) {
$flags |= AMQP_INTERNAL;
$flags |= \AMQP_INTERNAL;
}

return $flags;
}

public static function convertQueueFlags(int $interop): int
{
$flags = AMQP_NOPARAM;
$flags = \AMQP_NOPARAM;

$flags |= static::convertDestinationFlags($interop);

if ($interop & InteropAmqpQueue::FLAG_EXCLUSIVE) {
$flags |= AMQP_EXCLUSIVE;
$flags |= \AMQP_EXCLUSIVE;
}

return $flags;
}

public static function convertDestinationFlags(int $interop): int
{
$flags = AMQP_NOPARAM;
$flags = \AMQP_NOPARAM;

if ($interop & InteropAmqpDestination::FLAG_PASSIVE) {
$flags |= AMQP_PASSIVE;
$flags |= \AMQP_PASSIVE;
}

if ($interop & InteropAmqpDestination::FLAG_DURABLE) {
$flags |= AMQP_DURABLE;
$flags |= \AMQP_DURABLE;
}

if ($interop & InteropAmqpDestination::FLAG_AUTODELETE) {
$flags |= AMQP_AUTODELETE;
$flags |= \AMQP_AUTODELETE;
}

if ($interop & InteropAmqpDestination::FLAG_NOWAIT) {
$flags |= AMQP_NOWAIT;
$flags |= \AMQP_NOWAIT;
}

return $flags;
}

public static function convertConsumerFlags(int $interop): int
{
$flags = AMQP_NOPARAM;
$flags = \AMQP_NOPARAM;

if ($interop & InteropAmqpConsumer::FLAG_NOLOCAL) {
$flags |= AMQP_NOLOCAL;
$flags |= \AMQP_NOLOCAL;
}

if ($interop & InteropAmqpConsumer::FLAG_NOACK) {
$flags |= AMQP_AUTOACK;
$flags |= \AMQP_AUTOACK;
}

if ($interop & InteropAmqpConsumer::FLAG_EXCLUSIVE) {
$flags |= AMQP_EXCLUSIVE;
$flags |= \AMQP_EXCLUSIVE;
}

if ($interop & InteropAmqpConsumer::FLAG_NOWAIT) {
$flags |= AMQP_NOWAIT;
$flags |= \AMQP_NOWAIT;
}

return $flags;
Expand Down
8 changes: 4 additions & 4 deletions pkg/rdkafka/RdKafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,11 @@ private function doReceive(int $timeout): ?RdKafkaMessage
}

switch ($kafkaMessage->err) {
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
case RD_KAFKA_RESP_ERR__TIMED_OUT:
case RD_KAFKA_RESP_ERR__TRANSPORT:
case \RD_KAFKA_RESP_ERR__PARTITION_EOF:
case \RD_KAFKA_RESP_ERR__TIMED_OUT:
case \RD_KAFKA_RESP_ERR__TRANSPORT:
return null;
case RD_KAFKA_RESP_ERR_NO_ERROR:
case \RD_KAFKA_RESP_ERR_NO_ERROR:
$message = $this->serializer->toMessage($kafkaMessage->payload);
$message->setKey($kafkaMessage->key);
$message->setPartition($kafkaMessage->partition);
Expand Down
6 changes: 3 additions & 3 deletions pkg/rdkafka/RdKafkaContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ public static function getLibrdKafkaVersion(): string
if (!defined('RD_KAFKA_VERSION')) {
throw new \RuntimeException('RD_KAFKA_VERSION constant is not defined. Phprdkafka is probably not installed');
}
$major = (RD_KAFKA_VERSION & 0xFF000000) >> 24;
$minor = (RD_KAFKA_VERSION & 0x00FF0000) >> 16;
$patch = (RD_KAFKA_VERSION & 0x0000FF00) >> 8;
$major = (\RD_KAFKA_VERSION & 0xFF000000) >> 24;
$minor = (\RD_KAFKA_VERSION & 0x00FF0000) >> 16;
$patch = (\RD_KAFKA_VERSION & 0x0000FF00) >> 8;

return "$major.$minor.$patch";
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/rdkafka/RdKafkaProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public function send(Destination $destination, Message $message): void
InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);
InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class);

$partition = $message->getPartition() ?? $destination->getPartition() ?? RD_KAFKA_PARTITION_UA;
$partition = $message->getPartition() ?? $destination->getPartition() ?? \RD_KAFKA_PARTITION_UA;
$payload = $this->serializer->toString($message);
$key = $message->getKey() ?? $destination->getKey() ?? null;

Expand Down
12 changes: 6 additions & 6 deletions pkg/rdkafka/Tests/RdKafkaConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
$destination = new RdKafkaTopic('dest');

$kafkaMessage = new Message();
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
$kafkaMessage->err = \RD_KAFKA_RESP_ERR__TIMED_OUT;

$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
Expand Down Expand Up @@ -61,7 +61,7 @@ public function testShouldPassProperlyConfiguredTopicPartitionOnAssign()
$destination = new RdKafkaTopic('dest');

$kafkaMessage = new Message();
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
$kafkaMessage->err = \RD_KAFKA_RESP_ERR__TIMED_OUT;

$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
Expand Down Expand Up @@ -91,7 +91,7 @@ public function testShouldSubscribeOnFirstReceiveOnly()
$destination = new RdKafkaTopic('dest');

$kafkaMessage = new Message();
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
$kafkaMessage->err = \RD_KAFKA_RESP_ERR__TIMED_OUT;

$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
Expand Down Expand Up @@ -122,7 +122,7 @@ public function testShouldAssignWhenOffsetIsSet()
$destination->setPartition(1);

$kafkaMessage = new Message();
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
$kafkaMessage->err = \RD_KAFKA_RESP_ERR__TIMED_OUT;

$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
Expand Down Expand Up @@ -154,7 +154,7 @@ public function testThrowOnOffsetChangeAfterSubscribing()
$destination = new RdKafkaTopic('dest');

$kafkaMessage = new Message();
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
$kafkaMessage->err = \RD_KAFKA_RESP_ERR__TIMED_OUT;

$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
Expand Down Expand Up @@ -188,7 +188,7 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
$expectedMessage = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']);

$kafkaMessage = new Message();
$kafkaMessage->err = RD_KAFKA_RESP_ERR_NO_ERROR;
$kafkaMessage->err = \RD_KAFKA_RESP_ERR_NO_ERROR;
$kafkaMessage->payload = 'theSerializedMessage';

$kafkaConsumer = $this->createKafkaConsumerMock();
Expand Down
8 changes: 4 additions & 4 deletions pkg/rdkafka/Tests/RdKafkaProducerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public function testShouldUseSerializerToEncodeMessageAndPutToExpectedTube()
->expects($this->once())
->method('producev')
->with(
RD_KAFKA_PARTITION_UA,
\RD_KAFKA_PARTITION_UA,
0,
'theSerializedMessage',
'key',
Expand Down Expand Up @@ -183,7 +183,7 @@ public function testShouldAllowSerializersToSerializeKeys()
->expects($this->once())
->method('producev')
->with(
RD_KAFKA_PARTITION_UA,
\RD_KAFKA_PARTITION_UA,
0,
'theSerializedMessage',
'theSerializedKey'
Expand Down Expand Up @@ -324,7 +324,7 @@ public function testShouldAllowFalsyKeyFromMessage(): void
->expects($this->once())
->method('producev')
->with(
RD_KAFKA_PARTITION_UA,
\RD_KAFKA_PARTITION_UA,
0,
'',
$key
Expand Down Expand Up @@ -354,7 +354,7 @@ public function testShouldAllowFalsyKeyFromDestination(): void
->expects($this->once())
->method('producev')
->with(
RD_KAFKA_PARTITION_UA,
\RD_KAFKA_PARTITION_UA,
0,
'',
$key
Expand Down

0 comments on commit 44437dd

Please sign in to comment.