当前位置:首页 > 网站源码 > 正文内容

注入卡密验证软件(注入卡密的软件)

网站源码1年前 (2023-09-26)304

【Kafka 】| 总结/Edison Zhou

1可用的Kafka .NET客户端

目前.NET圈子主流使用的是 Confluent.Kafka

confluent-kafka-dotnet: https://github.com/confluentinc/confluent-kafka-dotnet

其他主流的客户端还有rdkafka-dotnet项目,但是其已经被并入confluent-kakfa-dotnet项目进行维护了。

因此,推荐使用confluent-kafka-dotnet,其配置友好,功能也更全面。

NCC千星项目CAP的Kafka扩展包(DotNetCore.CAP.Kafka)内部也是基于Confluent.Kafka来实现的:

2基于Confluent.Kafka的Sample

要完成本文示例,首先得有一个启动好的Kafka Broker服务。关于如何搭建Kafka,请参考上一篇:通过Docker部署Kafka集群。

安装相关组件

在.NET Core项目中新建一个类库,暂且命名为EDT.Kafka.Core,安装Confluent.Kafka组件:

编写KafkaService

编写IKafkaService接口:

Task SubscribeAsync<T>(IEnumerable< string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) whereT : class; }}

编写KafkaService实现类:

publicasyncTask PublishAsync<T>( stringtopicName, T message) whereT : class{varconfig = newProducerConfig { BootstrapServers = KAFKA_SERVERS,BatchSize = 16384, // 修改批次大小为16KLingerMs = 20// 修改等待时间为20ms};using( varproducer = newProducerBuilder< string, string>(config).Build) {awaitproducer.ProduceAsync(topicName, newMessage< string, string> {Key = Guid.NewGuid.ToString,Value = JsonConvert.SerializeObject(message)}); ;}}

展开全文

publicasyncTask SubscribeAsync<T>(IEnumerable< string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) whereT : class{varconfig = newConsumerConfig {BootstrapServers = KAFKA_SERVERS,GroupId = "Consumer", EnableAutoCommit = false, // 禁止AutoCommitAcks = Acks.Leader, // 假设只需要Leader响应即可AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起};using( varconsumer = newConsumerBuilder<Ignore, string>(config).Build) {consumer.Subscribe(topics);try{while( true) {try{varconsumeResult = consumer.Consume(cancellationToken); Console.WriteLine( $"Consumed message ' {consumeResult.Message?.Value}' at: ' {consumeResult?.TopicPartitionOffset}'." ); if(consumeResult.IsPartitionEOF) {Console.WriteLine( $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}已经到底了: {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}." ); continue; }T messageResult = null; try{messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);}catch(Exception ex) {varerrorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value: {consumeResult.Message.Value}】 : {ex.StackTrace?.ToString}" ; Console.WriteLine(errorMessage);messageResult = null; }if(messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/) {messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch(KafkaException e) {Console.WriteLine(e.Message);}}}catch(ConsumeException e) {Console.WriteLine( $"Consume error: {e.Error.Reason}" ); }}}catch(OperationCanceledException) {Console.WriteLine( "Closing consumer."); consumer.Close;}}

awaitTask.CompletedTask; }}}

为了方便后续的演示,在此项目中再创建一个类 EventData:

publicstringMessage { get; set; }

publicDateTime EventTime { get; set; } }

编写Producer

新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Producer,其主体内容如下:

编写Consumer

新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Consumer,其主体内容如下:

测试Pub/Sub效果

将Producer和Consumer两个项目都启动起来,可以看到当Consumer消费完50条消息并一一确认之后,Producer这边就算发布结束。

3基于CAP项目的Sample

模拟场景说明

Catalog API

新建一个ASP.NET Core WebAPI项目,然后分别安装以下组件:

在Startup中的ConfigureServices方法中注入CAP:

......services.AddCap( x=> {x.UseMongoDB( "mongodb://account:password@mongodb-server:27017/products?authSource=admin"); x.UseKafka( "kafka1:9091,kafka2:9092,kafka3:9093"); });}

新建一个ProductController,实现一个Update产品价格的接口,在其中通过CapPublisher完成发布消息到Kafka:

privatereadonlyICapPublisher _publisher; privatereadonlyIMapper _mapper;

publicProductController( ICapPublisher publisher, IMapper mapper) {_publisher = publisher;_mapper = mapper;}

[ HttpGet] publicIList<ProductDTO> Get( ) {return_mapper.Map<IList<ProductDTO>>(Products); ; }

注入卡密验证软件(注入卡密的软件)

[ HttpPut] publicasyncTask<IActionResult> UpdatePrice( stringid, decimalnewPrice ) {// 业务代码varproduct = Products.FirstOrDefault(p => p.Id == id); product.Price = newPrice;

// 发布消息await_publisher.PublishAsync( "ProductPriceChanged", newProductDTO { Id = product.Id, Name = product.Name, Price = product.Price});

returnNoContent; }}}

Basket API

参照Catalog API项目创建ASP.NET Core WebAPI项目,并安装对应组件,在ConfigureServices方法中注入CAP。

新建一个BasketController,用于订阅Kafka对应Topic:ProductPriceChanged 的消息。

[ HttpGet] publicIList<MyBasketDTO> Get( ) {returnBaskets; }

[ NonAction] [ CapSubscribe( "ProductPriceChanged") ] publicasyncTask RefreshBasketProductPrice( ProductDTO productDTO) {if(productDTO == null) return;

foreach( varbasket inBaskets) {foreach( varcatalog inbasket.Catalogs) {if(catalog.Product.Id == productDTO.Id) {catalog.Product.Price = productDTO.Price;break; }}}

awaitTask.CompletedTask; }}}

测试效果

同时启动Catalog API 和 Basket API两个项目。

首先,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品是199.9元。

然后,通过Swagger在Catalog API中更新Id为0002的商品的价格至499.9元。

最后,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品已更新至499.9元。

End总结

本文总结了.NET Core如何通过对应客户端操作Kafka,基于Confluent.Kafka项目和CAP项目可以方便的实现发布订阅的效果。

参考资料

阿星Plus,《.NET Core下使用Kafka》:https://blog.csdn.net/meowv/article/details/108675741

麦比乌斯皇,《.NET使用Kafka小结》:https://www.cnblogs.com/hsxian/p/12907542.html

Tony,《.NET Core事件总线解决方案:CAP基于Kafka》:https://www.cnblogs.com/Tony100/archive/2019/01/29/10333440.html

极客时间,胡夕《Kafka核心技术与实战》

B站,尚硅谷《Kafka 3.x入门到精通教程》

年终总结:Edison的2020年终总结

数字化转型: 我在传统企业做数字化转型

C#刷题: C#刷剑指Offer算法题系列文章目录

.NET面试:. NET开发面试知识体系

.NET大会: 2020年中国.NET开发者大会PDF资料

扫描二维码推送至手机访问。

版权声明:本文由我的模板布,如需转载请注明出处。


本文链接:http://60200875.com/post/32399.html

分享给朋友:

“注入卡密验证软件(注入卡密的软件)” 的相关文章

在线打印系统源码(打印源代码)

在线打印系统源码(打印源代码)

本篇文章给大家谈谈在线打印系统源码,以及打印源代码对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。 本文目录一览: 1、什么?把源代码打印出来?程序猿呆住了... 2、用java实现打印功能...

金螳螂装修报价表明细(装修报价单明细表完整 清单)

金螳螂装修报价表明细(装修报价单明细表完整 清单)

今天给各位分享金螳螂装修报价表明细的知识,其中也会对装修报价单明细表完整 清单进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!本文目录一览: 1、金螳螂装修公司中国排名第几...

cctv13新闻直播手机版(cctv13新闻在线直播手机版)

cctv13新闻直播手机版(cctv13新闻在线直播手机版)

本篇文章给大家谈谈cctv13新闻直播手机版,以及cctv13新闻在线直播手机版对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。 本文目录一览: 1、手机上怎么看新闻联播直播 2、怎么在手机...

逃跑吧少年视频解说南天直播(逃跑吧少年南天解说视频最新年)

逃跑吧少年视频解说南天直播(逃跑吧少年南天解说视频最新年)

今天给各位分享逃跑吧少年视频解说南天直播的知识,其中也会对逃跑吧少年南天解说视频最新年进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!本文目录一览: 1、逃跑吧少年少年主播南天的...

如何上传本地文件到云服务器(如何上传本地文件到云服务器上)

如何上传本地文件到云服务器(如何上传本地文件到云服务器上)

本篇文章给大家谈谈如何上传本地文件到云服务器,以及如何上传本地文件到云服务器上对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。 本文目录一览: 1、如何将数据上传到云端 2、100G文件如何...

抖音小程序源码网(抖音小程序开发平台)

抖音小程序源码网(抖音小程序开发平台)

今天给各位分享抖音小程序源码网的知识,其中也会对抖音小程序开发平台进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!本文目录一览: 1、微信小程序转换抖音小程序,很棒 2、教育...