接口用例编写测试用例(测试用例编写方法七种方法)
【Kafka 】| 总结/Edison Zhou
1可用的Kafka .NET客户端
目前.NET圈子主流使用的是 Confluent.Kafka
confluent-kafka-dotnet: https://github.com/confluentinc/confluent-kafka-dotnet
其他主流的客户端还有rdkafka-dotnet项目,但是其已经被并入confluent-kakfa-dotnet项目进行维护了。
因此,推荐使用confluent-kafka-dotnet,其配置友好,功能也更全面。
NCC千星项目CAP的Kafka扩展包(DotNetCore.CAP.Kafka)内部也是基于Confluent.Kafka来实现的:
2基于Confluent.Kafka的Sample
要完成本文示例,首先得有一个启动好的Kafka Broker服务。关于如何搭建Kafka,请参考上一篇:通过Docker部署Kafka集群。
安装相关组件
在.NET Core项目中新建一个类库,暂且命名为EDT.Kafka.Core,安装Confluent.Kafka组件:
编写KafkaService
编写IKafkaService接口:
Task SubscribeAsync<T>(IEnumerable< string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) whereT : class; }}
编写KafkaService实现类:
publicasyncTask PublishAsync<T>( stringtopicName, T message) whereT : class{varconfig = newProducerConfig { BootstrapServers = KAFKA_SERVERS,BatchSize = 16384, // 修改批次大小为16KLingerMs = 20// 修改等待时间为20ms};using( varproducer = newProducerBuilder< string, string>(config).Build) {awaitproducer.ProduceAsync(topicName, newMessage< string, string> {Key = Guid.NewGuid.ToString,Value = JsonConvert.SerializeObject(message)}); ;}}
展开全文
publicasyncTask SubscribeAsync<T>(IEnumerable< string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) whereT : class{varconfig = newConsumerConfig {BootstrapServers = KAFKA_SERVERS,GroupId = "Consumer", EnableAutoCommit = false, // 禁止AutoCommitAcks = Acks.Leader, // 假设只需要Leader响应即可AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起};using( varconsumer = newConsumerBuilder<Ignore, string>(config).Build) {consumer.Subscribe(topics);try{while( true) {try{varconsumeResult = consumer.Consume(cancellationToken); Console.WriteLine( $"Consumed message ' {consumeResult.Message?.Value}' at: ' {consumeResult?.TopicPartitionOffset}'." ); if(consumeResult.IsPartitionEOF) {Console.WriteLine( $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}已经到底了: {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}." ); continue; }T messageResult = null; try{messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);}catch(Exception ex) {varerrorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value: {consumeResult.Message.Value}】 : {ex.StackTrace?.ToString}" ; Console.WriteLine(errorMessage);messageResult = null; }if(messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/) {messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch(KafkaException e) {Console.WriteLine(e.Message);}}}catch(ConsumeException e) {Console.WriteLine( $"Consume error: {e.Error.Reason}" ); }}}catch(OperationCanceledException) {Console.WriteLine( "Closing consumer."); consumer.Close;}}
awaitTask.CompletedTask; }}}
为了方便后续的演示,在此项目中再创建一个类 EventData:
publicstringMessage { get; set; }
publicDateTime EventTime { get; set; } }
编写Producer
新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Producer,其主体内容如下:
编写Consumer
新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Consumer,其主体内容如下:
测试Pub/Sub效果
将Producer和Consumer两个项目都启动起来,可以看到当Consumer消费完50条消息并一一确认之后,Producer这边就算发布结束。
3基于CAP项目的Sample
模拟场景说明
Catalog API
新建一个ASP.NET Core WebAPI项目,然后分别安装以下组件:
在Startup中的ConfigureServices方法中注入CAP:
......services.AddCap( x=> {x.UseMongoDB( "mongodb://account:password@mongodb-server:27017/products?authSource=admin"); x.UseKafka( "kafka1:9091,kafka2:9092,kafka3:9093"); });}
新建一个ProductController,实现一个Update产品价格的接口,在其中通过CapPublisher完成发布消息到Kafka:
privatereadonlyICapPublisher _publisher; privatereadonlyIMapper _mapper;
publicProductController( ICapPublisher publisher, IMapper mapper) {_publisher = publisher;_mapper = mapper;}
[ HttpGet] publicIList<ProductDTO> Get( ) {return_mapper.Map<IList<ProductDTO>>(Products); ; }
[ HttpPut] publicasyncTask<IActionResult> UpdatePrice( stringid, decimalnewPrice ) {// 业务代码varproduct = Products.FirstOrDefault(p => p.Id == id); product.Price = newPrice;
// 发布消息await_publisher.PublishAsync( "ProductPriceChanged", newProductDTO { Id = product.Id, Name = product.Name, Price = product.Price});
returnNoContent; }}}
Basket API
参照Catalog API项目创建ASP.NET Core WebAPI项目,并安装对应组件,在ConfigureServices方法中注入CAP。
新建一个BasketController,用于订阅Kafka对应Topic:ProductPriceChanged 的消息。
[ HttpGet] publicIList<MyBasketDTO> Get( ) {returnBaskets; }
[ NonAction] [ CapSubscribe( "ProductPriceChanged") ] publicasyncTask RefreshBasketProductPrice( ProductDTO productDTO) {if(productDTO == null) return;
foreach( varbasket inBaskets) {foreach( varcatalog inbasket.Catalogs) {if(catalog.Product.Id == productDTO.Id) {catalog.Product.Price = productDTO.Price;break; }}}
awaitTask.CompletedTask; }}}
测试效果
同时启动Catalog API 和 Basket API两个项目。
首先,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品是199.9元。
然后,通过Swagger在Catalog API中更新Id为0002的商品的价格至499.9元。
最后,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品已更新至499.9元。
End总结
本文总结了.NET Core如何通过对应客户端操作Kafka,基于Confluent.Kafka项目和CAP项目可以方便的实现发布订阅的效果。
参考资料
阿星Plus,《.NET Core下使用Kafka》:https://blog.csdn.net/meowv/article/details/108675741
麦比乌斯皇,《.NET使用Kafka小结》:https://www.cnblogs.com/hsxian/p/12907542.html
Tony,《.NET Core事件总线解决方案:CAP基于Kafka》:https://www.cnblogs.com/Tony100/archive/2019/01/29/10333440.html
极客时间,胡夕《Kafka核心技术与实战》
B站,尚硅谷《Kafka 3.x入门到精通教程》
年终总结:Edison的2020年终总结
数字化转型: 我在传统企业做数字化转型
C#刷题: C#刷剑指Offer算法题系列文章目录
.NET面试:. NET开发面试知识体系
.NET大会: 2020年中国.NET开发者大会PDF资料