vertexdoc

Vertex's official website

介绍

下面通过转账案例来构建一个完整的服务

Cluster

Cluster的集群化配置请参照Orleans的集群设置

接口项目(IGrains)

  • 创建一个名称为Transfer.IGrains的项目
  • 通过nuget引入Vertex.Abstractions包
  • 项目中创建IAccount接口,表示用户金额账户的API,代码如下 ```csharp public interface IAccount : IVertexActor, IGrainWithIntegerKey { /// <summary> /// 查询当前余额 /// </summary> /// [AlwaysInterleave] Task GetBalance();
    /// <summary>
    /// 充值金额,方便进行测试
    /// </summary>
    /// <param name="amount">金额</param>
    /// <param name="topupId">充值Id</param>
    /// <returns></returns>
    Task<bool> TopUp(decimal amount, string topupId);

    /// <summary>
    /// 转账操作
    /// </summary>
    /// <param name="toAccountId">目标账户Id/param>
    /// <param name="amount">转账金额</param>
    /// <param name="transferId">转账的唯一ID</param>
    /// <returns></returns>
    Task<bool> Transfer(long toAccountId, decimal amount, string transferId);

    /// <summary>
    /// 转账到账方法
    /// </summary>
    /// <param name="amount">到账金额</param>
    /// <returns></returns>
    Task TransferArrived(decimal amount);

    /// <summary>
    /// 转账失败金额回退方法
    /// </summary>
    /// <param name="amount">回退的金额</param>
    /// <returns></returns>
    Task<bool> TransferRefunds(decimal amount);   ``` * 创建一个IAccountDb接口,这个是一个空接口,主要用来监听事件,然后把金额变化同步到数据库,方便查询   ```csharp
public interface IAccountDb : IFlowActor, IGrainWithIntegerKey
{
}   ```

启动项目(Host)

创建一个控制台项目,代码如下

public class Program
    {
        public static Task Main(string[] args)
        {
            var host = CreateHost();
            return host.RunAsync();
        }

        private static IHost CreateHost()
        {
            return new HostBuilder()
                .UseOrleans(siloBuilder =>
                {
                    siloBuilder
                        .UseLocalhostClustering()
                        .Configure<ClusterOptions>(options =>
                        {
                            options.ClusterId = "dev";
                            options.ServiceId = "Transfer";
                        })
                        .Configure<EndpointOptions>(options => options.AdvertisedIPAddress = IPAddress.Loopback)
                        .ConfigureApplicationParts(parts =>
                        {
                            parts.AddApplicationPart(typeof(Account).Assembly).WithReferences();
                            parts.AddApplicationPart(typeof(DIDActor).Assembly).WithReferences();
                            parts.AddApplicationPart(typeof(StreamIdActor).Assembly).WithReferences();
                        })
                        .AddSimpleMessageStreamProvider("SMSProvider", options => options.FireAndForgetDelivery = true).AddMemoryGrainStorage("PubSubStore");
                })
                .ConfigureServices(serviceCollection =>
                {
                    serviceCollection.AddVertex();
                    serviceCollection.AddLinq2DbStorage(
                        config =>
                    {
                        config.Connections = new Vertex.Storage.Linq2db.Options.ConnectionOptions[]
                        {
                         //这里使用SQLite内存数据库,方便无依赖启动
                         new Vertex.Storage.Linq2db.Options.ConnectionOptions
                         {
                            Name = Consts.CoreDbName,
                            ProviderName = "SQLite.MS",
                            ConnectionString = "Data Source=Vertex.SQLite.db;"
                         }
                        };
                    }, new EventArchivePolicy("month", (name, time) => $"Vertex_Archive_{name}_{DateTimeOffset.FromUnixTimeSeconds(time).ToString("yyyyMM")}".ToLower(), table => table.StartsWith("Vertex_Archive".ToLower())));
                    //这里使用内存消息队列,方便无依赖启动
                    serviceCollection.AddInMemoryStream();
                })
                .ConfigureLogging(logging =>
                {
                    logging.SetMinimumLevel(LogLevel.Information);
                    logging.AddConsole();
                }).Build();
        }
    }

Client

客户端用于访问集群和对外提供api,是外部流量进入集群的中转服务,可以是asp.net core、Grpc等等

我们这里创建一个简单的控制台程序,来讲解怎么访问cluster

//创建客户端对象,可以使用依赖注入注入到容器中,这样其它地方只需要注入IClusterClient就可以访问集群
private static async Task<IClusterClient> StartClientWithRetries(int initializeAttemptsBeforeFailing = 5)
{
    int attempt = 0;
    IClusterClient client;
    while (true)
    {
        try
        {
            var builder = new ClientBuilder()
                .UseLocalhostClustering()
                .ConfigureApplicationParts(parts =>
                    parts.AddApplicationPart(typeof(IAccount).Assembly).WithReferences())
                .ConfigureLogging(logging => logging.AddConsole());
            client = builder.Build();
            await client.Connect();
            Console.WriteLine("Client successfully connect to silo host");
            break;
        }
        catch (Exception)
        {
            attempt++;
            Console.WriteLine(
                $"Attempt {attempt} of {initializeAttemptsBeforeFailing} failed to initialize the Orleans client.");
            if (attempt > initializeAttemptsBeforeFailing)
            {
                throw;
            }

            await Task.Delay(TimeSpan.FromSeconds(5));
        }
    }

    return client;
}
//启动入口
private static async Task Main(string[] args)
{
    using var client = await StartClientWithRetries();
    var txId = Guid.NewGuid().ToString();//唯一id,代表这次充值的唯一id,会在流程中传播,保证整个流程的幂等性
    var account_1 = client.GetGrain<IAccount>(1);
    var account_2 = client.GetGrain<IAccount>(2);
    await account_1.TopUp(100, txId);//先给账户1充值100
    var transferId = Guid.NewGuid().ToString();//转账唯一id
    await account_1.Transfer(2, 50, transferId);//账户1给账户2转账50元
    await Task.Delay(1000);//流程是异步的,所以需要等待完成才能获取到正确的余额
    Console.WriteLine($"账户1的余额={await account_1.GetBalance()}");
    Console.WriteLine($"账户2的余额={await account_2.GetBalance()}");