From 761d47b612a1acdfd10dfaaec1a17b12486ab306 Mon Sep 17 00:00:00 2001 From: Tomasz Maruszak Date: Fri, 19 Jun 2026 00:47:50 +0200 Subject: [PATCH] Fix SQS topic producer path metadata lookup Signed-off-by: Tomasz Maruszak --- .../SqsMessageBus.cs | 24 ++-- .../SqsMessageBusTest.cs | 112 ++++++++++++++++++ 2 files changed, 127 insertions(+), 9 deletions(-) create mode 100644 src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SqsMessageBusTest.cs diff --git a/src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs b/src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs index c4236c66..c8bed457 100644 --- a/src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs +++ b/src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs @@ -131,13 +131,19 @@ public override async Task ProvisionTopology() await provisioningService.ProvisionTopology(CancellationToken); // provisioning happens asynchronously } - private async Task<(IMessageSerializer messageSerializer, SqsPathMeta pathMeta)> GetMetaForPath(string path, CancellationToken cancellationToken) - { - var messageSerializer = GetMessageSerializer(path); - - // Note: When a path not declared during bus producer/consumer declarations (it is dynamic), e.g. for RequestResponse - the path kind is not known at this point, so we assume it is a queue - // See SqsRequestResponseBuilderExtensions.ReplyToQueue - var pathMeta = await TopologyCache.GetMetaWithPreloadOrException(path, PathKind.Queue, cancellationToken); + private async Task<(IMessageSerializer messageSerializer, SqsPathMeta pathMeta)> GetMetaForPath(string path, Type messageType, CancellationToken cancellationToken) + { + var messageSerializer = GetMessageSerializer(path); + + var producerSettings = GetProducerSettings(messageType); + + // Note: When a path not declared during bus producer/consumer declarations (it is dynamic), e.g. for RequestResponse - the path kind is not known at this point, so we assume it is a queue + // See SqsRequestResponseBuilderExtensions.ReplyToQueue + var pathKind = ReferenceEquals(producerSettings, MarkerProducerSettingsForResponses) + ? PathKind.Queue + : producerSettings.PathKind; + + var pathMeta = await TopologyCache.GetMetaWithPreloadOrException(path, pathKind, cancellationToken); return (messageSerializer, pathMeta); } @@ -146,7 +152,7 @@ public override async Task ProduceToTransport(object message, Type messageType, { OnProduceToTransport(message, messageType, path, messageHeaders); - var (messageSerializer, pathMeta) = await GetMetaForPath(path, cancellationToken); + var (messageSerializer, pathMeta) = await GetMetaForPath(path, messageType, cancellationToken); try { @@ -186,7 +192,7 @@ public override async Task> ProduceToTransportBu var dispatched = new List(envelopes.Count); try { - var (messageSerializer, pathMeta) = await GetMetaForPath(path, cancellationToken); + var (messageSerializer, pathMeta) = await GetMetaForPath(path, envelopes.First().MessageType, cancellationToken); if (pathMeta.PathKind == PathKind.Queue) { diff --git a/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SqsMessageBusTest.cs b/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SqsMessageBusTest.cs new file mode 100644 index 00000000..15043aee --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SqsMessageBusTest.cs @@ -0,0 +1,112 @@ +namespace SlimMessageBus.Host.AmazonSQS.Test; + +using Amazon.SimpleNotificationService; +using Amazon.SimpleNotificationService.Model; +using Amazon.SQS; +using Amazon.SQS.Model; + +using Microsoft.Extensions.Logging.Abstractions; + +using SlimMessageBus.Host.Collections; +using SlimMessageBus.Host.Serialization; + +public class SqsMessageBusTest : IDisposable +{ + private readonly Mock _sqsClientMock; + private readonly Mock _snsClientMock; + private readonly SqsMessageBus _subject; + + public SqsMessageBusTest() + { + _sqsClientMock = new Mock(); + _snsClientMock = new Mock(); + + var sqsClientProviderMock = new Mock(); + sqsClientProviderMock.SetupGet(x => x.Client).Returns(_sqsClientMock.Object); + sqsClientProviderMock.Setup(x => x.EnsureClientAuthenticated()).Returns(Task.CompletedTask); + + var snsClientProviderMock = new Mock(); + snsClientProviderMock.SetupGet(x => x.Client).Returns(_snsClientMock.Object); + snsClientProviderMock.Setup(x => x.EnsureClientAuthenticated()).Returns(Task.CompletedTask); + + var messageSerializerProviderMock = new Mock(); + messageSerializerProviderMock + .Setup(x => x.GetSerializer(It.IsAny())) + .Returns(new TestMessageSerializer()); + + var serviceProviderMock = new Mock(); + serviceProviderMock.Setup(x => x.GetService(typeof(ISqsClientProvider))).Returns(sqsClientProviderMock.Object); + serviceProviderMock.Setup(x => x.GetService(typeof(ISnsClientProvider))).Returns(snsClientProviderMock.Object); + serviceProviderMock.Setup(x => x.GetService(typeof(IMessageSerializerProvider))).Returns(messageSerializerProviderMock.Object); + serviceProviderMock.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(new AssemblyQualifiedNameMessageTypeResolver()); + serviceProviderMock.Setup(x => x.GetService(typeof(TimeProvider))).Returns(TimeProvider.System); + serviceProviderMock.Setup(x => x.GetService(typeof(RuntimeTypeCache))).Returns(new RuntimeTypeCache()); + serviceProviderMock.Setup(x => x.GetService(typeof(IPendingRequestManager))).Returns(() => new PendingRequestManager(new InMemoryPendingRequestStore(), TimeProvider.System, NullLoggerFactory.Instance)); + serviceProviderMock.Setup(x => x.GetService(It.Is(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)))).Returns((Type t) => Array.CreateInstance(t.GetGenericArguments()[0], 0)); + + var messageBusSettings = new MessageBusSettings + { + ServiceProvider = serviceProviderMock.Object + }; + + var producerSettings = new ProducerSettings(); + new ProducerBuilder(producerSettings).DefaultTopic("test-topic"); + messageBusSettings.Producers.Add(producerSettings); + + _subject = new SqsMessageBus(messageBusSettings, new SqsMessageBusSettings + { + TopologyProvisioning = new SqsTopologySettings + { + Enabled = false + } + }); + } + + public void Dispose() + { + _subject.Dispose(); + GC.SuppressFinalize(this); + } + + [Fact] + public async Task When_Publish_Given_TopicProducerAndProvisioningDisabled_Then_LooksUpTopicAndPublishesToSns() + { + // arrange + const string topicName = "test-topic"; + const string topicArn = "arn:aws:sns:eu-central-1:123456789012:test-topic"; + + _snsClientMock.Setup(x => x.FindTopicAsync(topicName)) + .ReturnsAsync(new Topic { TopicArn = topicArn }); + _snsClientMock.Setup(x => x.PublishAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(new PublishResponse()); + + // act + await _subject.ProducePublish(new TopicMessage()); + + // assert + _snsClientMock.Verify(x => x.FindTopicAsync(topicName), Times.Once); + _snsClientMock.Verify(x => x.PublishAsync( + It.Is(r => r.TopicArn == topicArn), + It.IsAny()), Times.Once); + + _sqsClientMock.Verify(x => x.GetQueueUrlAsync(It.IsAny(), It.IsAny()), Times.Never); + _sqsClientMock.Verify(x => x.SendMessageAsync(It.IsAny(), It.IsAny()), Times.Never); + } + + private record TopicMessage; + + private class TestMessageSerializer : IMessageSerializer, IMessageSerializer + { + byte[] IMessageSerializer.Serialize(Type messageType, IDictionary headers, object message, object transportMessage) + => []; + + object IMessageSerializer.Deserialize(Type messageType, IReadOnlyDictionary headers, byte[] payload, object transportMessage) + => throw new NotSupportedException(); + + string IMessageSerializer.Serialize(Type messageType, IDictionary headers, object message, object transportMessage) + => "{}"; + + object IMessageSerializer.Deserialize(Type messageType, IReadOnlyDictionary headers, string payload, object transportMessage) + => throw new NotSupportedException(); + } +}