Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,19 @@
await provisioningService.ProvisionTopology(CancellationToken); // provisioning happens asynchronously
}

private async Task<(IMessageSerializer<string> messageSerializer, SqsPathMeta pathMeta)> GetMetaForPath(string path, CancellationToken cancellationToken)
{
var messageSerializer = GetMessageSerializer(path);

// Note: When a path not declared during bus producer/consumer declarations (it is dynamic), e.g. for RequestResponse - the path kind is not known at this point, so we assume it is a queue
// See SqsRequestResponseBuilderExtensions.ReplyToQueue
var pathMeta = await TopologyCache.GetMetaWithPreloadOrException(path, PathKind.Queue, cancellationToken);
private async Task<(IMessageSerializer<string> messageSerializer, SqsPathMeta pathMeta)> GetMetaForPath(string path, Type messageType, CancellationToken cancellationToken)
{
var messageSerializer = GetMessageSerializer(path);

var producerSettings = GetProducerSettings(messageType);

// Note: When a path not declared during bus producer/consumer declarations (it is dynamic), e.g. for RequestResponse - the path kind is not known at this point, so we assume it is a queue
// See SqsRequestResponseBuilderExtensions.ReplyToQueue
var pathKind = ReferenceEquals(producerSettings, MarkerProducerSettingsForResponses)
? PathKind.Queue
: producerSettings.PathKind;

var pathMeta = await TopologyCache.GetMetaWithPreloadOrException(path, pathKind, cancellationToken);

return (messageSerializer, pathMeta);
}
Expand All @@ -146,7 +152,7 @@
{
OnProduceToTransport(message, messageType, path, messageHeaders);

var (messageSerializer, pathMeta) = await GetMetaForPath(path, cancellationToken);
var (messageSerializer, pathMeta) = await GetMetaForPath(path, messageType, cancellationToken);

try
{
Expand Down Expand Up @@ -186,7 +192,7 @@
var dispatched = new List<T>(envelopes.Count);
try
{
var (messageSerializer, pathMeta) = await GetMetaForPath(path, cancellationToken);
var (messageSerializer, pathMeta) = await GetMetaForPath(path, envelopes.First().MessageType, cancellationToken);

if (pathMeta.PathKind == PathKind.Queue)
{
Expand Down Expand Up @@ -280,7 +286,7 @@
{
if (messageHeaders is null)
{
return null;

Check warning on line 289 in src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs

View workflow job for this annotation

GitHub Actions / sonar

Return an empty collection instead of null.
}

var messageAttributes = new Dictionary<string, THeaderValue>(messageHeaders.Count);
Expand Down
112 changes: 112 additions & 0 deletions src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SqsMessageBusTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
namespace SlimMessageBus.Host.AmazonSQS.Test;

using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using Amazon.SQS;
using Amazon.SQS.Model;

using Microsoft.Extensions.Logging.Abstractions;

using SlimMessageBus.Host.Collections;
using SlimMessageBus.Host.Serialization;

public class SqsMessageBusTest : IDisposable
{
private readonly Mock<IAmazonSQS> _sqsClientMock;
private readonly Mock<IAmazonSimpleNotificationService> _snsClientMock;
private readonly SqsMessageBus _subject;

public SqsMessageBusTest()
{
_sqsClientMock = new Mock<IAmazonSQS>();
_snsClientMock = new Mock<IAmazonSimpleNotificationService>();

var sqsClientProviderMock = new Mock<ISqsClientProvider>();
sqsClientProviderMock.SetupGet(x => x.Client).Returns(_sqsClientMock.Object);
sqsClientProviderMock.Setup(x => x.EnsureClientAuthenticated()).Returns(Task.CompletedTask);

var snsClientProviderMock = new Mock<ISnsClientProvider>();
snsClientProviderMock.SetupGet(x => x.Client).Returns(_snsClientMock.Object);
snsClientProviderMock.Setup(x => x.EnsureClientAuthenticated()).Returns(Task.CompletedTask);

var messageSerializerProviderMock = new Mock<IMessageSerializerProvider>();
messageSerializerProviderMock
.Setup(x => x.GetSerializer(It.IsAny<string>()))
.Returns(new TestMessageSerializer());

var serviceProviderMock = new Mock<IServiceProvider>();
serviceProviderMock.Setup(x => x.GetService(typeof(ISqsClientProvider))).Returns(sqsClientProviderMock.Object);
serviceProviderMock.Setup(x => x.GetService(typeof(ISnsClientProvider))).Returns(snsClientProviderMock.Object);
serviceProviderMock.Setup(x => x.GetService(typeof(IMessageSerializerProvider))).Returns(messageSerializerProviderMock.Object);
serviceProviderMock.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(new AssemblyQualifiedNameMessageTypeResolver());
serviceProviderMock.Setup(x => x.GetService(typeof(TimeProvider))).Returns(TimeProvider.System);
serviceProviderMock.Setup(x => x.GetService(typeof(RuntimeTypeCache))).Returns(new RuntimeTypeCache());
serviceProviderMock.Setup(x => x.GetService(typeof(IPendingRequestManager))).Returns(() => new PendingRequestManager(new InMemoryPendingRequestStore(), TimeProvider.System, NullLoggerFactory.Instance));
serviceProviderMock.Setup(x => x.GetService(It.Is<Type>(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)))).Returns((Type t) => Array.CreateInstance(t.GetGenericArguments()[0], 0));

var messageBusSettings = new MessageBusSettings
{
ServiceProvider = serviceProviderMock.Object
};

var producerSettings = new ProducerSettings();
new ProducerBuilder<TopicMessage>(producerSettings).DefaultTopic("test-topic");
messageBusSettings.Producers.Add(producerSettings);

_subject = new SqsMessageBus(messageBusSettings, new SqsMessageBusSettings
{
TopologyProvisioning = new SqsTopologySettings
{
Enabled = false
}
});
}

public void Dispose()
{
_subject.Dispose();
GC.SuppressFinalize(this);
}

[Fact]
public async Task When_Publish_Given_TopicProducerAndProvisioningDisabled_Then_LooksUpTopicAndPublishesToSns()
{
// arrange
const string topicName = "test-topic";
const string topicArn = "arn:aws:sns:eu-central-1:123456789012:test-topic";

_snsClientMock.Setup(x => x.FindTopicAsync(topicName))
.ReturnsAsync(new Topic { TopicArn = topicArn });
_snsClientMock.Setup(x => x.PublishAsync(It.IsAny<PublishRequest>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new PublishResponse());

// act
await _subject.ProducePublish(new TopicMessage());

// assert
_snsClientMock.Verify(x => x.FindTopicAsync(topicName), Times.Once);
_snsClientMock.Verify(x => x.PublishAsync(
It.Is<PublishRequest>(r => r.TopicArn == topicArn),
It.IsAny<CancellationToken>()), Times.Once);

_sqsClientMock.Verify(x => x.GetQueueUrlAsync(It.IsAny<string>(), It.IsAny<CancellationToken>()), Times.Never);
_sqsClientMock.Verify(x => x.SendMessageAsync(It.IsAny<SendMessageRequest>(), It.IsAny<CancellationToken>()), Times.Never);
}

private record TopicMessage;

private class TestMessageSerializer : IMessageSerializer, IMessageSerializer<string>
{
byte[] IMessageSerializer<byte[]>.Serialize(Type messageType, IDictionary<string, object> headers, object message, object transportMessage)
=> [];

object IMessageSerializer<byte[]>.Deserialize(Type messageType, IReadOnlyDictionary<string, object> headers, byte[] payload, object transportMessage)
=> throw new NotSupportedException();

string IMessageSerializer<string>.Serialize(Type messageType, IDictionary<string, object> headers, object message, object transportMessage)
=> "{}";

object IMessageSerializer<string>.Deserialize(Type messageType, IReadOnlyDictionary<string, object> headers, string payload, object transportMessage)
=> throw new NotSupportedException();
}
}
Loading