当前位置:首页 > 网站源码 > 正文内容

接口用例编写测试用例(测试用例编写方法七种方法)

网站源码1年前 (2023-09-20)287

【Kafka 】| 总结/Edison Zhou

1可用的Kafka .NET客户端

目前.NET圈子主流使用的是 Confluent.Kafka

confluent-kafka-dotnet: https://github.com/confluentinc/confluent-kafka-dotnet

其他主流的客户端还有rdkafka-dotnet项目,但是其已经被并入confluent-kakfa-dotnet项目进行维护了。

因此,推荐使用confluent-kafka-dotnet,其配置友好,功能也更全面。

NCC千星项目CAP的Kafka扩展包(DotNetCore.CAP.Kafka)内部也是基于Confluent.Kafka来实现的:

2基于Confluent.Kafka的Sample

要完成本文示例,首先得有一个启动好的Kafka Broker服务。关于如何搭建Kafka,请参考上一篇:通过Docker部署Kafka集群。

安装相关组件

在.NET Core项目中新建一个类库,暂且命名为EDT.Kafka.Core,安装Confluent.Kafka组件:

编写KafkaService

编写IKafkaService接口:

Task SubscribeAsync<T>(IEnumerable< string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) whereT : class; }}

编写KafkaService实现类:

publicasyncTask PublishAsync<T>( stringtopicName, T message) whereT : class{varconfig = newProducerConfig { BootstrapServers = KAFKA_SERVERS,BatchSize = 16384, // 修改批次大小为16KLingerMs = 20// 修改等待时间为20ms};using( varproducer = newProducerBuilder< string, string>(config).Build) {awaitproducer.ProduceAsync(topicName, newMessage< string, string> {Key = Guid.NewGuid.ToString,Value = JsonConvert.SerializeObject(message)}); ;}}

展开全文

publicasyncTask SubscribeAsync<T>(IEnumerable< string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) whereT : class{varconfig = newConsumerConfig {BootstrapServers = KAFKA_SERVERS,GroupId = "Consumer", EnableAutoCommit = false, // 禁止AutoCommitAcks = Acks.Leader, // 假设只需要Leader响应即可AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起};using( varconsumer = newConsumerBuilder<Ignore, string>(config).Build) {consumer.Subscribe(topics);try{while( true) {try{varconsumeResult = consumer.Consume(cancellationToken); Console.WriteLine( $"Consumed message ' {consumeResult.Message?.Value}' at: ' {consumeResult?.TopicPartitionOffset}'." ); if(consumeResult.IsPartitionEOF) {Console.WriteLine( $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}已经到底了: {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}." ); continue; }T messageResult = null; try{messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);}catch(Exception ex) {varerrorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value: {consumeResult.Message.Value}】 : {ex.StackTrace?.ToString}" ; Console.WriteLine(errorMessage);messageResult = null; }if(messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/) {messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch(KafkaException e) {Console.WriteLine(e.Message);}}}catch(ConsumeException e) {Console.WriteLine( $"Consume error: {e.Error.Reason}" ); }}}catch(OperationCanceledException) {Console.WriteLine( "Closing consumer."); consumer.Close;}}

awaitTask.CompletedTask; }}}

为了方便后续的演示,在此项目中再创建一个类 EventData:

publicstringMessage { get; set; }

publicDateTime EventTime { get; set; } }

编写Producer

新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Producer,其主体内容如下:

编写Consumer

新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Consumer,其主体内容如下:

测试Pub/Sub效果

将Producer和Consumer两个项目都启动起来,可以看到当Consumer消费完50条消息并一一确认之后,Producer这边就算发布结束。

3基于CAP项目的Sample

模拟场景说明

Catalog API

新建一个ASP.NET Core WebAPI项目,然后分别安装以下组件:

在Startup中的ConfigureServices方法中注入CAP:

......services.AddCap( x=> {x.UseMongoDB( "mongodb://account:password@mongodb-server:27017/products?authSource=admin"); x.UseKafka( "kafka1:9091,kafka2:9092,kafka3:9093"); });}

新建一个ProductController,实现一个Update产品价格的接口,在其中通过CapPublisher完成发布消息到Kafka:

privatereadonlyICapPublisher _publisher; privatereadonlyIMapper _mapper;

publicProductController( ICapPublisher publisher, IMapper mapper) {_publisher = publisher;_mapper = mapper;}

[ HttpGet] publicIList<ProductDTO> Get( ) {return_mapper.Map<IList<ProductDTO>>(Products); ; }

[ HttpPut] publicasyncTask<IActionResult> UpdatePrice( stringid, decimalnewPrice ) {// 业务代码varproduct = Products.FirstOrDefault(p => p.Id == id); product.Price = newPrice;

// 发布消息await_publisher.PublishAsync( "ProductPriceChanged", newProductDTO { Id = product.Id, Name = product.Name, Price = product.Price});

returnNoContent; }}}

Basket API

参照Catalog API项目创建ASP.NET Core WebAPI项目,并安装对应组件,在ConfigureServices方法中注入CAP。

新建一个BasketController,用于订阅Kafka对应Topic:ProductPriceChanged 的消息。

[ HttpGet] publicIList<MyBasketDTO> Get( ) {returnBaskets; }

[ NonAction] [ CapSubscribe( "ProductPriceChanged") ] publicasyncTask RefreshBasketProductPrice( ProductDTO productDTO) {if(productDTO == null) return;

foreach( varbasket inBaskets) {foreach( varcatalog inbasket.Catalogs) {if(catalog.Product.Id == productDTO.Id) {catalog.Product.Price = productDTO.Price;break; }}}

awaitTask.CompletedTask; }}}

测试效果

同时启动Catalog API 和 Basket API两个项目。

首先,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品是199.9元。

然后,通过Swagger在Catalog API中更新Id为0002的商品的价格至499.9元。

接口用例编写测试用例(测试用例编写方法七种方法)

最后,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品已更新至499.9元。

End总结

本文总结了.NET Core如何通过对应客户端操作Kafka,基于Confluent.Kafka项目和CAP项目可以方便的实现发布订阅的效果。

参考资料

阿星Plus,《.NET Core下使用Kafka》:https://blog.csdn.net/meowv/article/details/108675741

麦比乌斯皇,《.NET使用Kafka小结》:https://www.cnblogs.com/hsxian/p/12907542.html

Tony,《.NET Core事件总线解决方案:CAP基于Kafka》:https://www.cnblogs.com/Tony100/archive/2019/01/29/10333440.html

极客时间,胡夕《Kafka核心技术与实战》

B站,尚硅谷《Kafka 3.x入门到精通教程》

年终总结:Edison的2020年终总结

数字化转型: 我在传统企业做数字化转型

C#刷题: C#刷剑指Offer算法题系列文章目录

.NET面试:. NET开发面试知识体系

.NET大会: 2020年中国.NET开发者大会PDF资料

扫描二维码推送至手机访问。

版权声明:本文由我的模板布,如需转载请注明出处。


本文链接:http://60200875.com/post/31575.html

分享给朋友:

“接口用例编写测试用例(测试用例编写方法七种方法)” 的相关文章

区块链技术应用是什么(区块链是应用于什么技术)

区块链技术应用是什么(区块链是应用于什么技术)

今天给各位分享区块链技术应用是什么的知识,其中也会对区块链是应用于什么技术进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!本文目录一览: 1、区块链技术是什么?可以用于哪方面...

抖音直播音乐电台能赚钱吗(抖音音乐电台直播怎么做)

抖音直播音乐电台能赚钱吗(抖音音乐电台直播怎么做)

本篇文章给大家谈谈抖音直播音乐电台能赚钱吗,以及抖音音乐电台直播怎么做对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。 本文目录一览: 1、抖音电台类主播怎么赚钱 2、抖音电台主播赚钱吗...

简单编程代码大全(简单编程代码大全软件)

简单编程代码大全(简单编程代码大全软件)

今天给各位分享简单编程代码大全的知识,其中也会对简单编程代码大全软件进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!本文目录一览: 1、怎么编程一个最简单游戏代码? 2、初学...

怎么看笔记本有没有无线网卡驱动(怎么看自己电脑有没有无线网卡驱动)

怎么看笔记本有没有无线网卡驱动(怎么看自己电脑有没有无线网卡驱动)

本篇文章给大家谈谈怎么看笔记本有没有无线网卡驱动,以及怎么看自己电脑有没有无线网卡驱动对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。 本文目录一览: 1、如何查看无线网卡驱动? 2、怎么查...

微信小程序传奇至尊破解版(传奇至尊小程序礼包领取)

微信小程序传奇至尊破解版(传奇至尊小程序礼包领取)

本篇文章给大家谈谈微信小程序传奇至尊破解版,以及传奇至尊小程序礼包领取对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。 本文目录一览: 1、小程序传奇至尊有app么 2、微信小程序传奇至尊进...

原生影视app源码(最新影视app源码)

原生影视app源码(最新影视app源码)

本篇文章给大家谈谈原生影视app源码,以及最新影视app源码对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。 本文目录一览: 1、短视频app源码常见的基本功能包括哪些? 2、手机直播原生源...