毕设做网站具体步骤,什么是a站,北京新站优化,君隆做网站怎么样本章目标理解RabbitMQ RPC模式的工作原理和适用场景。掌握回调队列#xff08;Callback Queue#xff09;和关联ID#xff08;Correlation Id#xff09;的使用。实现基于RabbitMQ的异步RPC调用。学习RPC模式下的错误处理和超时机制。构建完整的微服务间同步通信解决方案。…本章目标理解RabbitMQ RPC模式的工作原理和适用场景。掌握回调队列Callback Queue和关联IDCorrelation Id的使用。实现基于RabbitMQ的异步RPC调用。学习RPC模式下的错误处理和超时机制。构建完整的微服务间同步通信解决方案。一、理论部分1. RPC模式简介RPCRemote Procedure Call模式允许客户端应用程序调用远程服务器上的方法就像调用本地方法一样。在RabbitMQ中RPC是通过消息队列实现的异步RPC。与传统HTTP RPC的区别HTTP RPC同步直接连接需要服务端在线消息队列RPC异步通过消息代理支持解耦和负载均衡2. RabbitMQ RPC核心组件请求队列Request Queue客户端发送请求的队列回复队列Reply Queue服务器返回响应的队列关联IDCorrelation Id匹配请求和响应的唯一标识消息属性使用IBasicProperties.ReplyTo和IBasicProperties.CorrelationId3. RPC工作流程复制代码Client端1. 生成唯一CorrelationId2. 创建临时回复队列3. 发送请求到请求队列设置ReplyTo和CorrelationId4. 监听回复队列等待匹配的CorrelationIdServer端1. 监听请求队列2. 处理请求3. 将响应发送到请求中的ReplyTo队列4. 设置相同的CorrelationIdClient端5. 收到响应根据CorrelationId匹配请求6. 处理响应复制代码4. 适用场景需要同步响应的异步操作微服务间的同步通信计算密集型任务的分布式处理需要负载均衡的同步调用二、实操部分构建分布式计算服务我们将创建一个分布式斐波那契数列计算服务演示完整的RPC模式实现。第1步创建项目结构复制代码# 创建解决方案dotnet new sln -n RpcSystem# 创建项目dotnet new webapi -n RpcClient.APIdotnet new classlib -n RpcClient.Coredotnet new classlib -n RpcServer.Servicedotnet new classlib -n RpcShared# 添加到解决方案dotnet sln add RpcClient.API/RpcClient.API.csprojdotnet sln add RpcClient.Core/RpcClient.Core.csprojdotnet sln add RpcServer.Service/RpcServer.Service.csprojdotnet sln add RpcShared/RpcShared.csproj# 添加项目引用dotnet add RpcClient.API reference RpcClient.Coredotnet add RpcClient.API reference RpcShareddotnet add RpcClient.Core reference RpcShareddotnet add RpcServer.Service reference RpcShared# 添加NuGet包cd RpcClient.APIdotnet add package RabbitMQ.Clientcd ../RpcClient.Coredotnet add package RabbitMQ.Clientcd ../RpcServer.Servicedotnet add package RabbitMQ.Client复制代码第2步定义共享模型RpcSharedModels/RpcRequest.csView CodeModels/RpcResponse.csView CodeMessages/FibonacciRequest.csView Code第3步RPC客户端核心库RpcClient.CoreServices/IRpcClient.csView CodeServices/RpcClient.cs复制代码using System.Collections.Concurrent;using System.Text;using System.Text.Json;using Microsoft.Extensions.Logging;using RabbitMQ.Client;using RabbitMQ.Client.Events;using RpcShared.Models;namespace RpcClient.Core.Services{public class RpcClient : IRpcClient{private readonly IConnection _connection;private readonly IModel _channel;private readonly ILoggerRpcClient _logger;private readonly string _replyQueueName;private readonly ConcurrentDictionarystring, TaskCompletionSourceRpcResponse _pendingRequests;private readonly AsyncEventingBasicConsumer _consumer;private bool _disposed false;public RpcClient(IConnectionFactory connectionFactory,ILoggerRpcClient logger){_logger logger;_pendingRequests new ConcurrentDictionarystring, TaskCompletionSourceRpcResponse();// 建立连接和通道_connection connectionFactory.CreateConnection();_channel _connection.CreateModel();// 声明临时回复队列排他性连接关闭时自动删除_replyQueueName _channel.QueueDeclare(queue: ,durable: false,exclusive: true,autoDelete: true,arguments: null).QueueName;// 创建消费者监听回复队列_consumer new AsyncEventingBasicConsumer(_channel);_consumer.Received OnResponseReceived;// 开始消费回复队列_channel.BasicConsume(queue: _replyQueueName,autoAck: false,consumer: _consumer);_logger.LogInformation(RPC Client initialized with reply queue: {ReplyQueue}, _replyQueueName);}public async TaskRpcResponse CallAsync(RpcRequest request, TimeSpan timeout){if (_disposed)throw new ObjectDisposedException(nameof(RpcClient));var tcs new TaskCompletionSourceRpcResponse();var cancellationTokenSource new CancellationTokenSource(timeout);// 注册超时取消cancellationTokenSource.Token.Register(() {if (_pendingRequests.TryRemove(request.RequestId, out var removedTcs)){removedTcs.TrySetException(new TimeoutException($RPC call timed out after {timeout.TotalSeconds} seconds));_logger.LogWarning(RPC request {RequestId} timed out, request.RequestId);}});// 将请求添加到待处理字典if (!_pendingRequests.TryAdd(request.RequestId, tcs)){throw new InvalidOperationException($Request with ID {request.RequestId} is already pending);}try{// 序列化请求var requestJson JsonSerializer.Serialize(request);var requestBody Encoding.UTF8.GetBytes(requestJson);// 设置消息属性var properties _channel.CreateBasicProperties();properties.ReplyTo _replyQueueName;properties.CorrelationId request.RequestId;properties.Persistent true;_logger.LogDebug(Sending RPC request {RequestId} to queue: rpc_queue, request.RequestId);// 发布请求到RPC队列_channel.BasicPublish(exchange: ,routingKey: rpc_queue,basicProperties: properties,body: requestBody);_logger.LogInformation(RPC request {RequestId} sent successfully, request.RequestId);// 等待响应return await tcs.Task;}catch (Exception ex){// 发生异常时移除待处理请求_pendingRequests.TryRemove(request.RequestId, out _);_logger.LogError(ex, Error sending RPC request {RequestId}, request.RequestId);throw;}}public async TaskTResponse? CallAsyncTResponse(RpcRequest request, TimeSpan timeout) where TResponse : class{var response await CallAsync(request, timeout);if (!response.Success){throw new InvalidOperationException($RPC call failed: {response.Error});}return response.GetDataTResponse();}private async Task OnResponseReceived(object sender, BasicDeliverEventArgs ea){var responseBody ea.Body.ToArray();var responseJson Encoding.UTF8.GetString(responseBody);var correlationId ea.BasicProperties.CorrelationId;_logger.LogDebug(Received RPC response for correlation ID: {CorrelationId}, correlationId);try{var response JsonSerializer.DeserializeRpcResponse(responseJson);if (response null){_logger.LogError(Failed to deserialize RPC response for correlation ID: {CorrelationId}, correlationId);return;}// 查找匹配的待处理请求if (_pendingRequests.TryRemove(correlationId, out var tcs)){tcs.TrySetResult(response);_logger.LogDebug(RPC response for {CorrelationId} delivered to waiting task, correlationId);}else{_logger.LogWarning(Received response for unknown correlation ID: {CorrelationId}, correlationId);}// 手动确认消息_channel.BasicAck(ea.DeliveryTag, false);}catch (Exception ex){_logger.LogError(ex, Error processing RPC response for correlation ID: {CorrelationId}, correlationId);// 处理失败时拒绝消息不重新入队_channel.BasicNack(ea.DeliveryTag, false, false);// 如果反序列化失败仍然通知等待的任务if (_pendingRequests.TryRemove(correlationId, out var tcs)){tcs.TrySetException(new InvalidOperationException(Failed to process RPC response));}}await Task.CompletedTask;}public void Dispose(){if (!_disposed){_disposed true;// 取消所有待处理的请求foreach (var (requestId, tcs) in _pendingRequests){tcs.TrySetCanceled();}_pendingRequests.Clear();_channel?.Close();_channel?.Dispose();_connection?.Close();_connection?.Dispose();_logger.LogInformation(RPC Client disposed);}}}}复制代码Services/FibonacciRpcClient.cs复制代码using RpcClient.Core.Services;using RpcShared.Messages;using RpcShared.Models;namespace RpcClient.Core.Services{public class FibonacciRpcClient{private readonly IRpcClient _rpcClient;private readonly ILoggerFibonacciRpcClient _logger;public FibonacciRpcClient(IRpcClient rpcClient, ILoggerFibonacciRpcClient logger){_rpcClient rpcClient;_logger logger;}public async Tasklong CalculateFibonacciAsync(int number, bool useOptimized true, TimeSpan? timeout null){var request new RpcRequest{Method fibonacci.calculate,Timestamp DateTime.UtcNow}.WithParameter(number, number).WithParameter(useOptimized, useOptimized);timeout ?? TimeSpan.FromSeconds(30);try{_logger.LogInformation(Calculating Fibonacci({Number}) with timeout {Timeout}s,number, timeout.Value.TotalSeconds);var response await _rpcClient.CallAsyncFibonacciResponse(request, timeout.Value);if (response ! null){_logger.LogInformation(Fibonacci({Number}) {Result} (calculated in {Time}ms),number, response.Result, response.CalculationTimeMs);return response.Result;}throw new InvalidOperationException(Received null response from RPC server);}catch (TimeoutException ex){_logger.LogError(ex, Fibonacci calculation timed out for number {Number}, number);throw;}catch (Exception ex){_logger.LogError(ex, Error calculating Fibonacci for number {Number}, number);throw;}}public async TaskFibonacciResponse CalculateFibonacciDetailedAsync(int number, bool useOptimized true, TimeSpan? timeout null){var request new RpcRequest{Method fibonacci.calculate,Timestamp DateTime.UtcNow}.WithParameter(number, number).WithParameter(useOptimized, useOptimized);timeout ?? TimeSpan.FromSeconds(30);var response await _rpcClient.CallAsyncFibonacciResponse(request, timeout.Value);return response ?? throw new InvalidOperationException(Received null response from RPC server);}}}复制代码第4步RPC客户端APIRpcClient.APIProgram.csView CodeServices/IMathRpcService.csView CodeServices/MathRpcService.csView CodeControllers/MathController.csView Code第5步RPC服务器RpcServer.ServiceProgram.csView CodeServices/FibonacciCalculator.csView CodeServices/FibonacciRpcServer.csView Code第6步高级特性 - 带重试的RPC客户端Services/ResilientRpcClient.csView Code第7步运行与测试启动服务复制代码# 终端1启动RPC服务器cd RpcServer.Servicedotnet run# 终端2启动RPC客户端APIcd RpcClient.APIdotnet run复制代码测试API复制代码# 计算斐波那契数列curl -X GET https://localhost:7000/api/math/fibonacci/10curl -X GET https://localhost:7000/api/math/fibonacci/20/detailed# 健康检查curl -X GET https://localhost:7000/api/math/health复制代码测试错误场景# 测试超时设置很小的超时时间# 测试无效输入curl -X GET https://localhost:7000/api/math/fibonacci/-5curl -X GET https://localhost:7000/api/math/fibonacci/100观察日志输出客户端发送请求生成CorrelationId服务器接收请求处理计算服务器发送响应使用相同的CorrelationId客户端接收响应匹配CorrelationId第8步性能测试和监控创建性能测试控制器复制代码[ApiController][Route(api/[controller])]public class BenchmarkController : ControllerBase{private readonly IMathRpcService _mathService;private readonly ILoggerBenchmarkController _logger;public BenchmarkController(IMathRpcService mathService, ILoggerBenchmarkController logger){_mathService mathService;_logger logger;}[HttpPost(fibonacci/batch)]public async TaskActionResult CalculateFibonacciBatch([FromBody] Listint numbers){var results new Listobject();var totalStopwatch System.Diagnostics.Stopwatch.StartNew();foreach (var number in numbers){var stopwatch System.Diagnostics.Stopwatch.StartNew();try{var result await _mathService.CalculateFibonacciAsync(number);results.Add(new{number,result,success true,durationMs stopwatch.ElapsedMilliseconds});}catch (Exception ex){results.Add(new{number,success false,error ex.Message,durationMs stopwatch.ElapsedMilliseconds});}}return Ok(new{totalDurationMs totalStopwatch.ElapsedMilliseconds,requests numbers.Count,results});}}复制代码本章总结在这一章中我们完整实现了RabbitMQ的RPC模式RPC核心概念理解了回调队列、关联ID、请求-响应模式。客户端实现创建了能够发送请求并异步等待响应的RPC客户端。服务器实现构建了处理请求并返回响应的RPC服务器。错误处理实现了超时控制、异常处理和重试机制。性能优化使用缓存和优化算法提高计算效率。** resilience**通过Polly实现了弹性重试策略。RPC模式为微服务架构提供了强大的同步通信能力结合消息队列的异步特性既保持了