NetCore 开发实战(2)——微服务实战


26 | 工程结构概览:定义应用分层及依赖关系

从这一节开始进入微服务实战部分

这一节主要探讨工程的结构和应用的分层

在应用的分层这里定义了四个层次:

  1. 领域模型层

  2. 基础设施层

  3. 应用层

  4. 共享层

可以通过代码来看一下

共享层一共建立三个工程:

  1. GeekTime.Core:主要承载基础的简单的类型,比如说异常或者一些帮助类

  2. GeekTime.Domain.Abstractions:抽象层,领域的抽象是指在领域模型可以定义一些基类或者接口,领域事件接口,领域事件处理接口,还有 Entity 的接口和 Entity 的基类

  3. GeekTime.Infrastructure.Core:基础设施的核心层,是指对仓储,还有 EFContext 定义一些共享代码

这些包实际上在不同的项目里面都可以共享,所以建议的做法是把这些代码都通过私有的 NuGet 的仓库来存储,然后其他的工程可以使用 NuGet 包来直接引用即可

领域模型层就是定义领域模型的地方,这里面会有不同的聚合,还有领域事件,不同的聚合下面就是领域模型

基础设施层是仓储层和一些共享代码的实现,这里只定义了仓储层的实现,包括 EF 的 DomainContext,还有 Order 的仓储层,User 的仓储层,还定义了领域模型与数据库之间的映射关系,就是在 EntityConfigurations 这目录下面去定义

应用层分两个,一个工程是 API 层,是用来承载 Web API 或者 Web 应用的,另外一个是后台任务,这个就是用来执行一些特殊的 Job,作为 Job 的宿主运行的,它可以是一个控制台的应用程序

在 Web 层,Web API 层,也分了几个关键目录 Application,Controllers,Extensions,Infrastructure

基础设施层会放一些身份认证缓存之类的与基础设施交互相关的一些代码

扩展层主要是将服务注册进容器的代码和中间件配置的代码,也就是两扩展方法,一个是对 ServiceCollection 的扩展,一个是对 ApplicationBuilder 的扩展

控制器层主要用来定义 Web API,这一层就是定义前后端交互的接口

应用层使用了 *CQRS *的设计模式,就是命令与查询职责分离,把命令放在一个目录,把查询放在一个目录,同样的这里还有两个事件处理的目录,一个是领域模型,领域事件的处理,一个是集成事件的处理

再看一下各层之间的依赖关系

Shared 层实际上是不依赖任何层次的,它存储了共享的代码,被各个工程共享

GeekTime.Core,GeekTime.Domain.Abstractions 是不依赖任何工程的,而 GeekTime.Infrastructure.Core 依赖了 GeekTime.Domain.Abstractions,实现了仓储,比如说仓储会依赖 IAggregateRoot 接口

public interface IRepository<TEntity> where TEntity : Entity, IAggregateRoot

领域模型需要继承模型的基类,并且实现一个聚合根的接口,表示它是一个聚合根

public class Order : Entity<long>, IAggregateRoot

领域事件需要实现一个领域事件的接口

public class OrderCreatedDomainEvent : IDomainEvent

基础设施层是一个独立的程序集,实现了仓储的部分,定义了一个 Order 的仓储

public interface IOrderRepository : IRepository<Order, long>

还定义了 Order 仓储的实现

public class OrderRepository : Repository<Order, long, DomainContext>, IOrderRepository
{
    public OrderRepository(DomainContext context) : base(context)
    {
    }
}

这里可以看到仓储实际上依赖了基础设施层共享代码里面的仓储的定义 IRepository,这样就可以复用仓储层的代码,这样定义 OrderRepository 就会比较简单,可以复用 Repository 的一些实现

public abstract class Repository<TEntity, TKey, TDbContext> : Repository<TEntity, TDbContext>, IRepository<TEntity, TKey> where TEntity : Entity<TKey>, IAggregateRoot where TDbContext : EFContext
{
    public Repository(TDbContext context) : base(context)
    {
    }

    public virtual bool Delete(TKey id)
    {
        var entity = DbContext.Find<TEntity>(id);
        if (entity == null)
        {
            return false;
        }
        DbContext.Remove(entity);
        return true;
    }

    public virtual async Task<bool> DeleteAsync(TKey id, CancellationToken cancellationToken = default)
    {
        var entity = await DbContext.FindAsync<TEntity>(id, cancellationToken);
        if (entity == null)
        {
            return false;
        }
        DbContext.Remove(entity);
        return true;
    }

    public virtual TEntity Get(TKey id)
    {
        return DbContext.Find<TEntity>(id);
    }

    public virtual async Task<TEntity> GetAsync(TKey id, CancellationToken cancellationToken = default)
    {
        return await DbContext.FindAsync<TEntity>(id, cancellationToken);
    }
}

已经实现了一些基本的方法,增删改查的方法

数据库访问的实现,继承了自己定义的 EFContext,EFContext 作为共享代码在各个工程里面复用

public class DomainContext : EFContext

另外一个比较特殊的是事务处理的对象,这个对象是用来管理整个应用程序的请求上下文中的事务,这样就可以避免手动地去处理事务,简化代码

public class DomainContextTransactionBehavior<TRequest, TResponse> : TransactionBehavior<DomainContext, TRequest, TResponse>
{
    public DomainContextTransactionBehavior(DomainContext dbContext, ICapPublisher capBus, ILogger<DomainContextTransactionBehavior<TRequest, TResponse>> logger) : base(dbContext, capBus, logger)
    {
    }
}

应用层依赖了基础设施层,基础设施层又依赖了领域层

应用层实际上是把各层组装在一起的这一层,它是应用程序的一个宿主,协调各层之间的关系,以及组装代码都是在这里实现的

总结一下:

领域模型层专注于业务的设计,它不依赖于其他各层,它是相对独立的

基础设施的仓储层仅仅负责领域模型的存取,它不负责任何的业务逻辑代码的承载

推荐使用 CQRS 的模式来设计应用程序,使应用程序的代码结构更加的合理,在团队和项目膨胀的情况下,工程的可维护性不至于急剧的下降

Web API 是面向前端交互的接口,避免依赖领域模型

共享代码建议设计为共享包,使用私有的 NuGet 仓库来分发和管理

27 | 定义Entity:区分领域模型的内在逻辑和外在行为

上一节讲到领域模型分为两层

一层是抽象层,定义了公共的接口和类

另一层就是领域模型的定义层

先看一下抽象层的定义

  1. 实体接口 IEntity
namespace GeekTime.Domain
{
    public interface IEntity
    {
        object[] GetKeys();
    }

    public interface IEntity<TKey> : IEntity
    {
        TKey Id { get; }
    }
}

通常情况下实体只有一个 ID,但是也不排除存在多个 ID 的情况,所以这里的接口 IEntity 定义实现为多个 ID 的情况,而 IEntity 表示实体只有一个 Id

同样看一下 Entity 的定义

public abstract class Entity : IEntity

public abstract class Entity<TKey> : Entity, IEntity<TKey>

同样地定义了一个 Entity 和 Entity,这样就可以在实体上面定义一些共享的方法,比如 ToString

public abstract class Entity : IEntity
{
    public abstract object[] GetKeys();

    public override string ToString()
    {
        // 输出当前实体的名称以及它的 Id 的清单
        return $"[Entity: {GetType().Name}] Keys = {string.Join(",", GetKeys())}";
    }
}

对于 Entity 定义了比较多的方法

public abstract class Entity<TKey> : Entity, IEntity<TKey>
{
    int? _requestedHashCode;
    public virtual TKey Id { get; protected set; }
    public override object[] GetKeys()
    {
        return new object[] { Id };
    }

    /// <summary>
    /// 表示对象是否相等
    /// 这个方法的重载使我们可以正确的判断两个实体是否是同一个实体
    /// 根据 Id 判断,如果没有 Id 的话,两个实体是不会相等的
    /// </summary>
    /// <param name="obj"></param>
    /// <returns></returns>
    public override bool Equals(object obj)
    {
        if (obj == null || !(obj is Entity<TKey>))
            return false;

        if (Object.ReferenceEquals(this, obj))
            return true;

        if (this.GetType() != obj.GetType())
            return false;

        Entity<TKey> item = (Entity<TKey>)obj;

        if (item.IsTransient() || this.IsTransient())
            return false;
        else
            return item.Id.Equals(this.Id);
    }

    /// <summary>
    /// 这个方法用来辅助对比两个对象是否相等
    /// </summary>
    /// <returns></returns>
    public override int GetHashCode()
    {
        if (!IsTransient())
        {
            if (!_requestedHashCode.HasValue)
                _requestedHashCode = this.Id.GetHashCode() ^ 31;

            return _requestedHashCode.Value;
        }
        else
            return base.GetHashCode();
    }

    /// <summary>
    /// 表示对象是否为全新创建的,未持久化的
    /// </summary>
    /// <returns></returns>
    public bool IsTransient()
    {
        // 如果它没有 Id 就表示它没有持久化
        return EqualityComparer<TKey>.Default.Equals(Id, default);
    }

    public override string ToString()
    {
        return $"[Entity: {GetType().Name}] Id = {Id}";
    }

    /// <summary>
    /// 操作符 == 重载
    /// 借助上面的 Equals 方法
    /// 使得可以直接用 == 判断两个领域对象是否相等
    /// </summary>
    /// <param name="left"></param>
    /// <param name="right"></param>
    /// <returns></returns>
    public static bool operator ==(Entity<TKey> left, Entity<TKey> right)
    {
        if (Object.Equals(left, null))
            return (Object.Equals(right, null)) ? true : false;
        else
            return left.Equals(right);
    }

    /// <summary>
    /// 操作符 != 重载
    /// </summary>
    /// <param name="left"></param>
    /// <param name="right"></param>
    /// <returns></returns>
    public static bool operator !=(Entity<TKey> left, Entity<TKey> right)
    {
        return !(left == right);
    }
}
  1. 聚合根接口 IAggregateRoot
namespace GeekTime.Domain
{
    public interface IAggregateRoot
    {
    }
}

聚合根接口实际上是一个空接口,它不实现任何的方法,它的作用是在实现仓储层的时候,让一个仓储对应一个聚合根

  1. 领域事件接口 IDomainEvent
namespace GeekTime.Domain
{
    public interface IDomainEvent : INotification
    {
    }
}
  1. 域事件处理接口 IDomainEventHandler
namespace GeekTime.Domain
{
    public interface IDomainEventHandler<TDomainEvent> : INotificationHandler<TDomainEvent> 
        where TDomainEvent : IDomainEvent
    {
    }
}
  1. 还有一个领域模型里面比较关键的值对象 ValueObject

值对象的定义比较特殊,因为它是没有 Id 的,所以没有关于 Id 的定义,并且没有对值对象定义接口

重点实现了它是否相等的判断,也是重载了 Equals 这个方法和 GetHashCode 这个方法

protected static bool EqualOperator(ValueObject left, ValueObject right)
{
    if (ReferenceEquals(left, null) ^ ReferenceEquals(right, null))
    {
        return false;
    }
    return ReferenceEquals(left, null) || left.Equals(right);
}

protected static bool NotEqualOperator(ValueObject left, ValueObject right)
{
    return !(EqualOperator(left, right));
}

public override int GetHashCode()
{
    return GetAtomicValues()
     .Select(x => x != null ? x.GetHashCode() : 0)
     .Aggregate((x, y) => x ^ y);
}

它有一个特殊的抽象方法的定义,获取它的原子值

protected abstract IEnumerable<object> GetAtomicValues();

这个方法的作用是将值对象的字段输出出来,作为唯一标识来判断两个对象是否相等,可以看到 Equals 的定义里面也是调用了获取原子值这个方法来判断它是否相等

public override bool Equals(object obj)
{
    if (obj == null || obj.GetType() != GetType())
    {
        return false;
    }
    ValueObject other = (ValueObject)obj;
    IEnumerator<object> thisValues = GetAtomicValues().GetEnumerator();
    IEnumerator<object> otherValues = other.GetAtomicValues().GetEnumerator();
    while (thisValues.MoveNext() && otherValues.MoveNext())
    {
        if (ReferenceEquals(thisValues.Current, null) ^ ReferenceEquals(otherValues.Current, null))
        {
            return false;
        }
        if (thisValues.Current != null && !thisValues.Current.Equals(otherValues.Current))
        {
            return false;
        }
    }
    return !thisValues.MoveNext() && !otherValues.MoveNext();
}

接下来看一下定义的 Order 实体

public class Order : Entity<long>, IAggregateRoot
{
    public string UserId { get; private set; }

    public string UserName { get; private set; }

    public Address Address { get; private set; }

    public int ItemCount { get; private set; }

    protected Order()
    { }

    public Order(string userId, string userName, int itemCount, Address address)
    {
        this.UserId = userId;
        this.UserName = userName;
        this.Address = address;
        this.ItemCount = itemCount;

        this.AddDomainEvent(new OrderCreatedDomainEvent(this));
    }

    public void ChangeAddress(Address address)
    {
        this.Address = address;
    }
}

它首先实现了 Entity,这一个在上一节已经讲过,另外一个 Order 定义为一个聚合根,它需要实现聚合根接口 IAggregateRoot

实体中字段的 set 设置为 private,这样的好处是 Order 所有的数据的操作都应该由实体负责,而不应该被外部对象去操作,从而让领域模型符合封闭开放的原则

对于领域模型的操作,都应该是定义具有业务逻辑含义的方法来定义

比如说 ChangeAddress,就定义一个 ChangeAddress 的方法,把新的地址传进来,由领域模型负责赋值

这里面就可以添加一些地址的校验,比如新的地址是否能够与旧的地址距离太远

看一下地址的定义

public class Address : ValueObject
{
    public string Street { get; private set; }
    public string City { get; private set; }
    public string ZipCode { get; private set; }

    public Address() { }
    public Address(string street, string city, string zipcode)
    {
        Street = street;
        City = city;
        ZipCode = zipcode;
    }

    protected override IEnumerable<object> GetAtomicValues()
    {
        yield return Street;
        yield return City;
        yield return ZipCode;
    }
}

只能通过构造函数给值对象赋值,这里面需要注意的是重载了获取原子值的方法,使用了 yield return

总结一下

在定义领域模型的时候,首先领域模型的字段的修改应该设置为私有的

使用构造函数来表示对象的创建,它的初始值都是由构造函数的参数来赋值的

另外需要定义有业务含义的动作来操作模型的字段

领域模型只负责自己数据的处理,领域服务或者命令负责调用领域模型的业务动作

样就可以区分领域模型的内在逻辑和外在逻辑,使代码结构更加合理

28 | 工作单元模式(UnitOfWork):管理好你的事务

工作单元模式有如下几个特性:

  1. 使用同一上下文

  2. 跟踪实体的状态

  3. 保障事务一致性

我们对实体的操作,最终的状态都是应该如实保存到我们的存储中,进行持久化

接下来看一下代码

为了实现工作单元模式,这里定义了一个工作单元的接口

public interface IUnitOfWork : IDisposable
{
    Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
    Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default);
}

这两个方法的区别是:一个是返回的 int 是指我们影响的数据条数,另外一个返回 bool 表示我们保存是否成功,本质上这两个方法达到的效果是相同的

另外还定义了一个事务管理的接口

public interface ITransaction
{
    // 获取当前事务
    IDbContextTransaction GetCurrentTransaction();

    // 判断当前事务是否开启
    bool HasActiveTransaction { get; }

    // 开启事务
    Task<IDbContextTransaction> BeginTransactionAsync();

    // 提交事务
    Task CommitTransactionAsync(IDbContextTransaction transaction);

    // 事务回滚
    void RollbackTransaction();
}

在实现上我们是借助 EF 来实现工作单元模式的

看一下 EFContext 的定义

/// <summary>
/// DbContext 是 EF 的基类,然后实现了 UnitOfWork 的接口和事务的接口
/// </summary>
public class EFContext : DbContext, IUnitOfWork, ITransaction
{
    protected IMediator _mediator;
    ICapPublisher _capBus;

    // 后面的章节会详细讲到这两个参数
    public EFContext(DbContextOptions options, IMediator mediator, ICapPublisher capBus) : base(options)
    {
        _mediator = mediator;
        _capBus = capBus;
    }

    #region IUnitOfWork

    public async Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default)
    {
        var result = await base.SaveChangesAsync(cancellationToken);
        //await _mediator.DispatchDomainEventsAsync(this);
        return true;
    }

    //// 可以看到这个方法实际上与上面的方法是相同的,所以这个方法可以不实现
    //public override Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = default)
    //{
    //    return base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
    //}

    #endregion

    #region ITransaction

    private IDbContextTransaction _currentTransaction;// 把当前的事务用一个字段存储

    public IDbContextTransaction GetCurrentTransaction() => _currentTransaction;// 获取当前的事务就是返回存储的私有对象

    public bool HasActiveTransaction => _currentTransaction != null;// 事务是否开启是判断当前这个事务是否为空

    /// <summary>
    /// 开启事务
    /// </summary>
    /// <returns></returns>
    public Task<IDbContextTransaction> BeginTransactionAsync()
    {
        if (_currentTransaction != null) return null;
        _currentTransaction = Database.BeginTransaction(_capBus, autoCommit: false);
        return Task.FromResult(_currentTransaction);
    }

    /// <summary>
    /// 提交事务
    /// </summary>
    /// <param name="transaction">当前事务</param>
    /// <returns></returns>
    public async Task CommitTransactionAsync(IDbContextTransaction transaction)
    {
        if (transaction == null) throw new ArgumentNullException(nameof(transaction));
        if (transaction != _currentTransaction) throw new InvalidOperationException($"Transaction {transaction.TransactionId} is not current");

        try
        {
            await SaveChangesAsync();// 将当前所有的变更都保存到数据库
            transaction.Commit();
        }
        catch
        {
            RollbackTransaction();
            throw;
        }
        finally
        {
            if (_currentTransaction != null)
            {
                // 最终需要把当前事务进行释放,并且置为空
                // 这样就可以多次的开启事务和提交事务
                _currentTransaction.Dispose();
                _currentTransaction = null;
            }
        }
    }

    /// <summary>
    /// 回滚
    /// </summary>
    public void RollbackTransaction()
    {
        try
        {
            _currentTransaction?.Rollback();
        }
        finally
        {
            if (_currentTransaction != null)
            {
                _currentTransaction.Dispose();
                _currentTransaction = null;
            }
        }
    }

    #endregion
}

另外一个我们还是需要关注的一点就是如何管理我们的事务

这里有一个类 TransactionBehavior,这个类是用来注入我们的事务的管理过程的,具体它是怎么工作的在后续的章节会讲到,这里先关注它的实现过程

public class TransactionBehavior<TDbContext, TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TDbContext : EFContext
{
    ILogger _logger;
    TDbContext _dbContext;
    ICapPublisher _capBus;
    public TransactionBehavior(TDbContext dbContext, ICapPublisher capBus, ILogger logger)
    {
        _dbContext = dbContext ?? throw new ArgumentNullException(nameof(dbContext));
        _capBus = capBus ?? throw new ArgumentNullException(nameof(capBus));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }

    public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
    {
        var response = default(TResponse);
        var typeName = request.GetGenericTypeName();

        try
        {
            // 首先判断当前是否有开启事务
            if (_dbContext.HasActiveTransaction)
            {
                return await next();
            }

            // 定义了一个数据库操作执行的策略,比如说可以在里面嵌入一些重试的逻辑,这里创建了一个默认的策略
            var strategy = _dbContext.Database.CreateExecutionStrategy();

            await strategy.ExecuteAsync(async () =>
            {
                Guid transactionId;
                using (var transaction = await _dbContext.BeginTransactionAsync())
                using (_logger.BeginScope("TransactionContext:{TransactionId}", transaction.TransactionId))
                {
                    _logger.LogInformation("----- 开始事务 {TransactionId} ({@Command})", transaction.TransactionId, typeName, request);

                    response = await next();// next 实际上是指我们的后续操作,这里的模式有点像之前讲的中间件模式

                    _logger.LogInformation("----- 提交事务 {TransactionId} {CommandName}", transaction.TransactionId, typeName);

                    await _dbContext.CommitTransactionAsync(transaction);

                    transactionId = transaction.TransactionId;
                }
            });

            return response;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "处理事务出错 {CommandName} ({@Command})", typeName, request);

            throw;
        }
    }
}

回过头来看一下我们的 EFContext,EFContext 实现 IUnitOfWork,工作单元模式的核心,它实现了事务的管理和工作单元模式,我们就可以借助 EFContext 来实现我们的仓储层

29 | 定义仓储:使用EF Core实现仓储层

首先定义仓储层的接口,以及仓储层实现的基类,抽象类

仓储层的接口

namespace GeekTime.Infrastructure.Core
{
    /// <summary>
    /// 包含普通实体的仓储
    /// 约束 TEntity 必须是继承 Entity 的基类,必须实现聚合根 IAggregateRoot
    /// 也就是说仓储里面存储的对象必须是一个聚合根对象
    /// </summary>
    /// <typeparam name="TEntity"></typeparam>
    public interface IRepository<TEntity> where TEntity : Entity, IAggregateRoot
    {
        IUnitOfWork UnitOfWork { get; }
        TEntity Add(TEntity entity);
        Task<TEntity> AddAsync(TEntity entity, CancellationToken cancellationToken = default);
        TEntity Update(TEntity entity);
        Task<TEntity> UpdateAsync(TEntity entity, CancellationToken cancellationToken = default);
        bool Remove(Entity entity);// 由于没有指定主键,只能根据当前实体进行删除操作
        Task<bool> RemoveAsync(Entity entity);
    }

    /// <summary>
    /// 包含指定主键的类型的实体的仓储
    /// 继承了上面的接口 IRepository<TEntity>,也就是说拥有了上面定义的所有方法
    /// 另外一个,它实现了几个跟 Id 相关的操作的方法
    /// </summary>
    /// <typeparam name="TEntity"></typeparam>
    /// <typeparam name="TKey"></typeparam>
    public interface IRepository<TEntity, TKey> : IRepository<TEntity> where TEntity : Entity<TKey>, IAggregateRoot
    {
        bool Delete(TKey id);
        Task<bool> DeleteAsync(TKey id, CancellationToken cancellationToken = default);
        TEntity Get(TKey id);
        Task<TEntity> GetAsync(TKey id, CancellationToken cancellationToken = default);
    }
}

具体抽象类的实现

namespace GeekTime.Infrastructure.Core
{
    /// <summary>
    /// 定义普通实体的仓储
    /// 定义约束 TDbContext 必须是 EFContext,也就是仓储必须依赖于 EFContext 及其子类
    /// 将来就可以把自己定义的比如 DomainContext 作为泛型参数传入 Repository,就可以很快捷地定义出来自己的仓储
    /// </summary>
    /// <typeparam name="TEntity"></typeparam>
    /// <typeparam name="TDbContext"></typeparam>
    public abstract class Repository<TEntity, TDbContext> : IRepository<TEntity> where TEntity : Entity, IAggregateRoot where TDbContext : EFContext
    {
        // 具体实现需要依赖 DbContext
        protected virtual TDbContext DbContext { get; set; }

        public Repository(TDbContext context)
        {
            this.DbContext = context;
        }
        public virtual IUnitOfWork UnitOfWork => DbContext;// 因为 DbContext, EFContext 实际上实现了 IUnitOfWork,所以直接返回

        // 下面这些方法都是 EntityFramework 提供的能力,所以就能通过简单的几行代码来实现基本的仓储操作

        public virtual TEntity Add(TEntity entity)
        {
            return DbContext.Add(entity).Entity;
        }

        public virtual Task<TEntity> AddAsync(TEntity entity, CancellationToken cancellationToken = default)
        {
            return Task.FromResult(Add(entity));
        }

        public virtual TEntity Update(TEntity entity)
        {
            return DbContext.Update(entity).Entity;
        }

        public virtual Task<TEntity> UpdateAsync(TEntity entity, CancellationToken cancellationToken = default)
        {
            return Task.FromResult(Update(entity));
        }

        public virtual bool Remove(Entity entity)
        {
            DbContext.Remove(entity);
            return true;
        }

        public virtual Task<bool> RemoveAsync(Entity entity)
        {
            return Task.FromResult(Remove(entity));
        }
    }

    /// <summary>
    /// 定义主键的实体的仓储
    /// </summary>
    /// <typeparam name="TEntity"></typeparam>
    /// <typeparam name="TKey"></typeparam>
    /// <typeparam name="TDbContext"></typeparam>
    public abstract class Repository<TEntity, TKey, TDbContext> : Repository<TEntity, TDbContext>, IRepository<TEntity, TKey> where TEntity : Entity<TKey>, IAggregateRoot where TDbContext : EFContext
    {
        public Repository(TDbContext context) : base(context)
        {
        }

        /// <summary>
        /// 根据 Id 从 DbContext 获取 Entity,然后再 Remove
        /// 这样的好处是可以跟踪对象的状态
        /// 坏处是任意的删除都需要先去数据库里面做查询
        /// </summary>
        /// <param name="id"></param>
        /// <returns></returns>
        public virtual bool Delete(TKey id)
        {
            var entity = DbContext.Find<TEntity>(id);
            if (entity == null)
            {
                return false;
            }
            DbContext.Remove(entity);
            return true;
        }

        public virtual async Task<bool> DeleteAsync(TKey id, CancellationToken cancellationToken = default)
        {
            var entity = await DbContext.FindAsync<TEntity>(id, cancellationToken);
            if (entity == null)
            {
                return false;
            }
            DbContext.Remove(entity);
            return true;
        }

        public virtual TEntity Get(TKey id)
        {
            return DbContext.Find<TEntity>(id);
        }

        public virtual async Task<TEntity> GetAsync(TKey id, CancellationToken cancellationToken = default)
        {
            return await DbContext.FindAsync<TEntity>(id, cancellationToken);
        }
    }

}

实现自己的 DbContext

DomainContext

namespace GeekTime.Infrastructure
{
    public class DomainContext : EFContext
    {
        public DomainContext(DbContextOptions options, IMediator mediator, ICapPublisher capBus) : base(options, mediator, capBus)
        {
        }

        public DbSet<Order> Orders { get; set; }

        public DbSet<User> Users { get; set; }

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            #region 注册领域模型与数据库的映射关系
            modelBuilder.ApplyConfiguration(new OrderEntityTypeConfiguration());
            modelBuilder.ApplyConfiguration(new UserEntityTypeConfiguration());
            #endregion
            base.OnModelCreating(modelBuilder);
        }
    }
}

映射关系,针对每一个领域模型创建一个 EntityTypeConfiguration

OrderEntityTypeConfiguration

namespace GeekTime.Infrastructure.EntityConfigurations
{
    class OrderEntityTypeConfiguration : IEntityTypeConfiguration<Order>
    {
        public void Configure(EntityTypeBuilder<Order> builder)
        {
            // 定义主键
            builder.HasKey(p => p.Id);
            //builder.ToTable("order");
            //builder.Property(p => p.UserId).HasMaxLength(20);
            //builder.Property(p => p.UserName).HasMaxLength(30);

            // 定义导航属性
            builder.OwnsOne(o => o.Address, a =>
                {
                    a.WithOwner();
                    //a.Property(p => p.City).HasMaxLength(20);
                    //a.Property(p => p.Street).HasMaxLength(50);
                    //a.Property(p => p.ZipCode).HasMaxLength(10);
                });
        }
    }
}

UserEntityTypeConfiguration

namespace GeekTime.Infrastructure.EntityConfigurations
{
    class UserEntityTypeConfiguration : IEntityTypeConfiguration<User>
    {
        public void Configure(EntityTypeBuilder<User> builder)
        {
            builder.HasKey(p => p.Id);
        }
    }
}

事务处理

要实现对 DomainContext 的事务处理的话,仅仅需要创建一个类 DomainContextTransactionBehavior

namespace GeekTime.Infrastructure
{
    public class DomainContextTransactionBehavior<TRequest, TResponse> : TransactionBehavior<DomainContext, TRequest, TResponse>
    {
        public DomainContextTransactionBehavior(DomainContext dbContext, ICapPublisher capBus, ILogger<DomainContextTransactionBehavior<TRequest, TResponse>> logger) : base(dbContext, capBus, logger)
        {
        }
    }
}

为了演示效果,在应用程序启动时,添加一行代码

Startup

// 这一行代码的作用是创建一个 Scope,在这个范围内创建 DomainContext
using (var scope = app.ApplicationServices.CreateScope())
{
    var dc = scope.ServiceProvider.GetService<DomainContext>();

    // 确定数据库已经创建,如果数据库没有创建,这个时候会执行数据库的自动创建过程,根据模型创建数据库
    dc.Database.EnsureCreated();
}

数据库的注册部分

ServiceCollectionExtensions

/// <summary>
/// 这个定义就是将连接字符串配置到 dDomainContext
/// </summary>
/// <param name="services"></param>
/// <param name="connectionString"></param>
/// <returns></returns>
public static IServiceCollection AddMySqlDomainContext(this IServiceCollection services, string connectionString)
{
    return services.AddDomainContext(builder =>
    {
        builder.UseMySql(connectionString);
    });
}

这一行代码的调用位置是在 ConfigureServices 里面

// 从配置中获取字符串
services.AddMySqlDomainContext(Configuration.GetValue<string>("Mysql"));

启动程序,运行过程中 EF 框架会根据定义的实体映射关系生成数据库,可在 Mysql 数据库中查看生成结果

接着丰富一下 Order 的映射关系

namespace GeekTime.Infrastructure.EntityConfigurations
{
    class OrderEntityTypeConfiguration : IEntityTypeConfiguration<Order>
    {
        public void Configure(EntityTypeBuilder<Order> builder)
        {
            // 定义主键
            builder.HasKey(p => p.Id);
            builder.ToTable("order");// 修改表名为 order,不带 s
            builder.Property(p => p.UserId).HasMaxLength(20);// 修改字段长度
            builder.Property(p => p.UserName).HasMaxLength(30);

            // 定义导航属性
            // OwnsOne 的方式可以将 Address 这个值类型作为同一个表的字段来设置
            builder.OwnsOne(o => o.Address, a =>
                {
                    a.WithOwner();
                    a.Property(p => p.City).HasMaxLength(20);
                    a.Property(p => p.Street).HasMaxLength(50);
                    a.Property(p => p.ZipCode).HasMaxLength(10);
                });
        }
    }
}

启动程序,可以看到数据库修改结果

这说明可以在仓储层定义领域模型与数据库的映射关系,这个映射关系可以组织为一个目录,为每一个领域模型设置一个类型来定义,并且这个过程是强类型的,这样的结构,便于后期维护

另外仓储层的话,定义了一个 IOrderRepository,仅仅实现了 IRepository 泛型接口,引进 Order,由于 Order 实际上有一个主键是 long,所以这里把主键类型也传给 IRepository

namespace GeekTime.Infrastructure.Repositories
{
    public interface IOrderRepository : IRepository<Order, long>
    {

    }
}

Order

public class Order : Entity<long>, IAggregateRoot

这样子,Order 的仓储就定义完毕

那么 Order 仓储的实现也非常简单,仅仅需要继承 Repository,把 Order,long,DomainContext 传入泛型 Repository 即可,这里还实现了 IOrderRepository

namespace GeekTime.Infrastructure.Repositories
{
    public class OrderRepository : Repository<Order, long, DomainContext>, IOrderRepository
    {
        public OrderRepository(DomainContext context) : base(context)
        {
        }
    }
}

通过这样简单的继承,可以复用之前定义的代码,快速实现仓储层的定义

可以通过代码提升看到仓储层是有 Add,Update,Remove,Delete 方法,还有 UnitOfWork 的属性

这样一来就完成了仓储层的定义,可以看到仓储层的代码非常的薄,仅仅包含了一些接口的定义和类的继承,需要自定义一些方法的时候,可以在仓储层定义一些特殊方法,比如 AddABC 等特殊的逻辑都可以在这里去实现

namespace GeekTime.Infrastructure.Repositories
{
    public class OrderRepository : Repository<Order, long, DomainContext>, IOrderRepository
    {
        public OrderRepository(DomainContext context) : base(context)
        {
        }
    }

    public void AddABC()
    {

    }
}

另外一个在组织领域模型和数据库的关系的时候,可以很清晰的看到,是在 EntityConfiguration 这个目录下面,为每一个模型定义一个映射类,当领域模型越来越复杂,数据库的结构越来越复杂的时候,这样的组织结构会非常的清晰

30 | 领域事件:提升业务内聚,实现模块解耦

我们在领域的抽象层定义了领域事件和领域事件处理的接口

IDomainEvent

namespace GeekTime.Domain
{
    public interface IDomainEvent : INotification
    {
    }
}

这是一个空接口,它只是标记出来某一个对象是否是领域事件,INotification 也是一个空接口,它是 MediatR 框架的一个接口,是用来实现事件传递用的

namespace MediatR
{
    public interface INotification
    {
    }
}

接着是 IDomainEventHandler

namespace GeekTime.Domain
{
    public interface IDomainEventHandler<TDomainEvent> : INotificationHandler<TDomainEvent> 
        where TDomainEvent : IDomainEvent
    {
        //这里我们使用了INotificationHandler的Handle方法来作为处理方法的定义
        //Task Handle(TDomainEvent domainEvent, CancellationToken cancellationToken);
    }
}

同样这个接口也是继承了 IDomainEventHandler 接口,它有一个泛型参数是 TDomainEvent,这个 TDomainEvent 约束必须为 IDomainEvent,也就是说处理程序只处理 IDomainEvent 作为入参

实际上该方法已经在 INotificationHandler 中定义好了,所以这里不需要重新定义,只是告诉大家它的定义是什么样子的

在 Entity 中对领域事件代码的处理

private List<IDomainEvent> _domainEvents;
public IReadOnlyCollection<IDomainEvent> DomainEvents => _domainEvents?.AsReadOnly();

public void AddDomainEvent(IDomainEvent eventItem)
{
    _domainEvents = _domainEvents ?? new List<IDomainEvent>();
    _domainEvents.Add(eventItem);
}

public void RemoveDomainEvent(IDomainEvent eventItem)
{
    _domainEvents?.Remove(eventItem);
}

public void ClearDomainEvents()
{
    _domainEvents?.Clear();
}

将领域事件做一个实体的属性存储进来,它应该是一个列表,因为在一个实体操作过程中间可能会发生多件事情,领域事件应该是可以被实体模型之外的代码读到,所以暴露一个 ReadOnly 的 Collection

这里还提供几个方法:添加领域事件,移除领域事件,清除领域事件

这些方法都是在领域模型内部进行调用的

可以看一下之前定义的 Order

public Order(string userId, string userName, int itemCount, Address address)
{
    this.UserId = userId;
    this.UserName = userName;
    this.Address = address;
    this.ItemCount = itemCount;

    this.AddDomainEvent(new OrderCreatedDomainEvent(this));
}

public void ChangeAddress(Address address)
{
    this.Address = address;
    //this.AddDomainEvent(new OrderAddressChangedDomainEvent(this));
}

当我们构造一个全新的 Order 的时候,实际上这里可以定义一个事件叫做 OrderCreatedDomainEvent,这个领域事件它的构造函数的入参就是一个 Order,当我们调用 Order 的构造函数时,实际上我们的行为就是在创建一个全新的 Order,所以在这里添加一个事件 AddDomainEvent

同理的比如说 ChangeAddress 被调用了,我们在这里实际上可以定义一个 OrderAddressChangedDomainEvent 类似这样子的领域事件出来

大家可以看到领域事件的构造和添加都应该是在领域模型的方法内完成的,而不应该是被外界的代码去调用创建,因为这些事件都是领域模型内部发生的事件

接着看看 OrderCreatedDomainEvent 的定义

namespace GeekTime.Domain.Events
{
    public class OrderCreatedDomainEvent : IDomainEvent
    {
        public Order Order { get; private set; }
        public OrderCreatedDomainEvent(Order order)
        {
            this.Order = order;
        }
    }
}

那我们如何处理我们的领域事件,接收领域事件的处理应该定义在应用层

namespace GeekTime.API.Application.DomainEventHandlers
{
    public class OrderCreatedDomainEventHandler : IDomainEventHandler<OrderCreatedDomainEvent>
    {
        ICapPublisher _capPublisher;
        public OrderCreatedDomainEventHandler(ICapPublisher capPublisher)
        {
            _capPublisher = capPublisher;
        }

        public async Task Handle(OrderCreatedDomainEvent notification, CancellationToken cancellationToken)
        {
            await _capPublisher.PublishAsync("OrderCreated", new OrderCreatedIntegrationEvent(notification.Order.Id));
        }
    }
}

它继承了 IDomainEventHandler,这个接口是上面讲到的领域事件处理器的接口,它的泛型入参就是要处理的事件的类型 OrderCreatedDomainEvent

为了简单演示起见,这里的逻辑是当我们创建一个新的订单时,我们向 EventBus 发布一条事件,叫做 OrderCreated 这个事件

我们在 OrderController 的 CreateOrder 定义了一个 CreateOrderCommand

[HttpPost]
public async Task<long> CreateOrder([FromBody]CreateOrderCommand cmd)
{
    return await _mediator.Send(cmd, HttpContext.RequestAborted);
}

CreateOrderCommand

namespace GeekTime.API.Application.Commands
{
    public class CreateOrderCommand : IRequest<long>
    {

        //ublic CreateOrderCommand() { }
        public CreateOrderCommand(int itemCount)
        {
            ItemCount = itemCount;
        }

        public long ItemCount { get; private set; }
    }
}

CreateOrderCommandHandler

public async Task<long> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
{

    var address = new Address("wen san lu", "hangzhou", "310000");
    var order = new Order("xiaohong1999", "xiaohong", 25, address);

    _orderRepository.Add(order);
    await _orderRepository.UnitOfWork.SaveEntitiesAsync(cancellationToken);
    return order.Id;
}

我们在 CreateOrderCommandHandler 里面创建了一个 Order,然后保存进仓储,调用了 UnitOfWork 的 SaveEntitiesAsync

启动程序,直接执行,调用我们的方法,可以看到我们先进入到了创建订单的处理系统(CreateOrderCommandHandler),接着进入到了领域事件发布的 Publish 的代码(MediatorExtension),当仓储存储完毕之后,进入到了 OrderCreatedDomainEventHandler,也就是说我们在创建完我们的领域模型并将其保存之后,我们的领域事件的处理程序才触发

在之前讲解实现 UnitOfWork 的时候(EFContext),我们的 SaveEntitiesAsync 里面只有一行代码是 SaveChangesAsync,这里添加了一行代码,是发送领域事件的代码 DispatchDomainEventsAsync

public async Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default)
{
    var result = await base.SaveChangesAsync(cancellationToken);
    //await _mediator.DispatchDomainEventsAsync(this);
    return true;
}

这就是 MediatorExtension 中看到的 DispatchDomainEventsAsync

namespace GeekTime.Infrastructure.Core.Extensions
{
    static class MediatorExtension
    {
        public static async Task DispatchDomainEventsAsync(this IMediator mediator, DbContext ctx)
        {
            var domainEntities = ctx.ChangeTracker
                .Entries<Entity>()
                .Where(x => x.Entity.DomainEvents != null && x.Entity.DomainEvents.Any());

            var domainEvents = domainEntities
                .SelectMany(x => x.Entity.DomainEvents)
                .ToList();

            domainEntities.ToList()
                .ForEach(entity => entity.Entity.ClearDomainEvents());

            foreach (var domainEvent in domainEvents)
                await mediator.Publish(domainEvent);
        }
    }
}

大家可以看到我们发送领域事件实际上是这么一个过程:我们从当前要保存的 EntityContext 里面去跟踪我们的实体,然后从跟踪到的实体的对象中获取到我们当前的 Event,如果 Event 是存在的,就把它取出来,然后将实体内的 Event 进行清除,再然后将这些 Event 逐条地通过中间件发送出去,并且找到对应的 Handler 处理

定义领域事件实际上也非常简单,只需要在领域模型创建一个 Events 的目录,然后将领域事件都定义在这里,领域事件需要继承 IDomainEvent,领域事件的处理器都定义在 DomainEventHandler,在应用层这个目录下面,我们可以为每一个事件都定义我们的处理程序

总结一下

领域模型内创建事件:我们不要在领域模型的外面去构造事件,然后传递给领域模型,因为整个领域事件是由领域的业务逻辑触发的,而不是说外面的对模型的操作触发的

另外就是针对领域事件应该定义专有的领域事件处理类,就像我们刚才演示的,在一个特定的目录,对每一个事件进行定义处理类

还有一个就是在同一个事务里面去处理我们的领域事件,实际上我们也可以选择在不同的事务里面处理,如果需要在不同的事务里面去处理领域事件的时候,我们就需要考虑一致性的问题,考虑中间出错,消息丢失的问题

31 | APIController:定义API的最佳实践

首先看一个传统意义上三层架构定义的 Controller

[HttpPost]
public Task<long> CreateOrder([FromBody]CreateOrderVeiwModel viewModel)
{
    var model = viewModel.ToModel();
    return await orderService.CreateOrder(model);
}

class OrderService : IOrderService
{
    public long CreateOrder(CreateOrderModel model)
    {
        var address = new Address("wen san lu", "hangzhou", "310000");
        var order = new Order("xiaohong1999", "xiaohong", 25, address);

        _orderRepository.Add(order);
        await _orderRepository.UnitOfWork.SaveEntitiesAsync(cancellationToken);
        return order.Id;
    }
}

可以看到这里的 Controller 负责模型转换,还负责服务调用,服务里面实际上就是领域模型的操作部分

随着业务逻辑的越来越复杂,Controller 会越来越膨胀,在 DDD 领域驱动设计的理念下,我们更倾向于把应用程序的每一层明确区分,然后层与层之间的界限应该是明确的,在实现上面应该也是隔离的

Controller 这一层负责与前端用户的交互,它主要的责任就是定义输入和输出,实现身份认证,授权功能,它不应该处理领域模型,处理仓储,所以不建议以上的写法,不建议在 Controller 里面写模型转换和服务调用

namespace GeekTime.API.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class OrderController : ControllerBase
    {
        IMediator _mediator;
        public OrderController(IMediator mediator)
        {
            _mediator = mediator;
        }

        [HttpPost]
        public async Task<long> CreateOrder([FromBody]CreateOrderCommand cmd)
        {
            return await _mediator.Send(cmd, HttpContext.RequestAborted);
        }

        [HttpGet]
        public async Task<List<string>> QueryOrder([FromBody]MyOrderQuery myOrderQuery)
        {
            return await _mediator.Send(myOrderQuery);
        }
    }
}

这里使用了中间者模式 Mediator,它通过把命令发送出去,然后我们在 Commands 目录下面定义了每一个命令的 handler,这样就可以将业务逻辑的部分和 Controller 处理的部分,输入输出定义的部分进行隔离,我们的 Controller 还需要去定义路由的规则,路由验证的规则

再看一下 Controller 的构造函数,从设计上建议 Controller 所依赖的服务都通过它的构造函数注入进来,之前有讲过,通过容器进行属性注入的方式,但这种方式我们并不推荐使用,当一个 Controller 依赖了很多服务的时候,可以发现有一部分服务是大部分的 Action 都会依赖到的,有一部分服务只是个别 Action 依赖到的,这个时候就可以使用 FromServices,而不需要在构造函数里面注入它,这样有个好处是在编写单元测试的时候,可以在容器里面 Mock 所有的服务

public async Task<long> CreateOrder([FromServices] IEventBus eventBus, [FromBody]CreateOrderCommand cmd)

这里不建议使用属性注入的方式来注入服务,是因为使用属性注入的时候,会把这些属性,比如说 IOrderService,有可能由其他代码 set 我们的 OrderService,造成意外的情况,使我们的代码的维护不可控

public IOrderService orderService { get; set; }

还有一个关键的点是建议尽可能定义异步的 action,尽可能地使用 async 和 await 这样的组合来实现我们的代码,这样对提高我们应用程序的吞吐量是有一定的帮助的

总结一下

APIController 实际上是负责了对前端用户的输入输出的定义,它还负责了身份验证,授权,Url 定义的部分

APIController 不应该负责业务逻辑的承载,应该把这些职责交给我们命令处理程序或者说领域服务来定义

再一个我们也讲解了 APIController 在注入服务时的一些方法,通过构造函数的注入,通过 FromServices 的方式获取服务,不建议的做法时使用属性注入的方式注入

32 | 集成事件:解决跨微服务的最终一致性

首先看一下集成事件的工作原理

它的目的时为了实现系统的集成,它主要是用于系统里面多个微服务之间相互传递事件

集成事件的实现方式有两种,一种是图上显示的发布订阅的方式,通过 EventBus,还有一种方式是通过观察者模式,由观察者将事件发送给关注事件的人

接着看一下代码上的定义

在 Application 目录下面定义了一个集成事件的目录 IntegrationEvents

OrderCreatedIntegrationEvent

namespace GeekTime.API.Application.IntegrationEvents
{
    public class OrderCreatedIntegrationEvent
    {
        public OrderCreatedIntegrationEvent(long orderId) => OrderId = orderId;
        public long OrderId { get; }
    }
}

得益于基础设施的发展,现在实际上可以借助一些开源框架,很轻松的实现集成事件的发布和订阅的能力

在发布端可以看一下这里的代码

namespace GeekTime.API.Application.DomainEventHandlers
{
    public class OrderCreatedDomainEventHandler : IDomainEventHandler<OrderCreatedDomainEvent>
    {
        ICapPublisher _capPublisher;
        public OrderCreatedDomainEventHandler(ICapPublisher capPublisher)
        {
            _capPublisher = capPublisher;
        }

        public async Task Handle(OrderCreatedDomainEvent notification, CancellationToken cancellationToken)
        {
            await _capPublisher.PublishAsync("OrderCreated", new OrderCreatedIntegrationEvent(notification.Order.Id));
        }
    }
}

这里我们定义了一个领域事件,它的作用就是将我们的集成事件发送出去,具体是要发送到 RabbitMQ 还是 kafka 这些消息队列中间件里面是可配置的,对于业务逻辑来讲的话,它是透明的

这里有一个 ICapPublisher 接口,这个接口实际上是由中国的开源社区开发的一个框架,借助这个框架,我们可以轻松的实现消息的发布和订阅

那我们如何来订阅其他微服务发出的消息呢?

namespace GeekTime.API.Application.IntegrationEvents
{
    public class SubscriberService : ISubscriberService, ICapSubscribe
    {
        IMediator _mediator;
        public SubscriberService(IMediator mediator)
        {
            _mediator = mediator;
        }

        [CapSubscribe("OrderPaymentSucceeded")]
        public void OrderPaymentSucceeded(OrderPaymentSucceededIntegrationEvent @event)
        {
            //Do SomeThing
        }

        [CapSubscribe("OrderCreated")]
        public void OrderCreated(OrderCreatedIntegrationEvent @event)
        {
            //Do SomeThing
        }
    }
}

我们可以通过订阅服务,它同样也是借助了 Cap 的组件,我们实现了 ICapPublisher 这个接口,就可以将服务标记成我们的订阅服务

另外我们的订阅方法,订阅的处理函数上面,标记 CapSubscribe 这个属性,将我们要订阅的事件名放在这里,我们就可以订阅到这个事件了

namespace GeekTime.API.Application.IntegrationEvents
{
    public class OrderPaymentSucceededIntegrationEvent
    {
        public OrderPaymentSucceededIntegrationEvent(long orderId) => OrderId = orderId;
        public long OrderId { get; }
    }
}

我们可以看到集成事件定义的话,它是没有接口和基类的约束的,因为在异构的系统里面,对于集成事件来讲的定义是相对比较灵活的,我们的建议是用这种简单的类型来承载它即可

总结一下

集成事件实际上也是由领域的业务逻辑驱动的,它本质上也是领域事件,只是说它是跨服务的领域事件

另外一个集成事件大部分场景是领域事件驱动的,也有可能是一些比如说定时任务触发的,由于集成事件是跨微服务来传递信息的,所以我们没办法通过事务来处理,那就需要借助 Cap 这样的框架来实现最终的一致性

当然我们建议仅在必要的情况下定义和使用集成事件,因为一旦引入集成事件,比如 EventBus,我们应用程序的版本控制,比如说我们发布新版本的时候,新旧版本的事件的发布和订阅都会受到影响,这个时候我们没办法使我们的应用程序成为一个单纯的无状态的程序,在更新新版本的时候,那么就会带来新的负担,兼容性方面我们会需要做更多的工作

33 | 集成事件:使用RabbitMQ来实现EventBus

这一节我们来讲解如何通过 CAP 组件和 RabbitMQ 来实现 EventBus

要实现 EventBus,我们这里借助了 RabbitMQ,它的整个安装和使用的体验是非常人性化的,如果是在 Windows 下开发的话,它可以有 Windows 的 installer,也可以在其它的操作系统下安装和使用,当然它也支持 Docker 的模式,我们可以在以下的地址获取到安装包和安装方法的说明

https://www.rabbitmq.com/download.html

另一个就是在 .NET Core 社区比较知名的 CAP 框架,这个框架是由我们国人开发的,它实现了开箱即用的 EventBus 的实现,我们可以通过简单的配置,就能把 RabbitMQ 集成进来,并且实现我们的集成事件的处理

https://github.com/dotnetcore/CAP

我们来看一下 CAP 框架的实现架构

它实际上实现了一个叫 OutBox 的设计模式,就是在我们的每个微服务,比如说微服务 A 的数据库 A,在这个数据库内部它建立了两张表,一张叫 publish 事件表和一张叫 receiver 事件表,这两张事件表用来记录微服务 A 发出的和微服务 A 收到的事件

当我们要发出事件时,我们会把事件的存储逻辑与我们的业务逻辑的事务合并,在同一个事务里提交,也就意味着当我们的业务逻辑提交成功时,我们的事件表里面的事件是一定存在的,它是与我们的业务逻辑的事务是强绑定的

如果说我们的业务逻辑失败了,事务回滚了,这条事件是不会出现在我们的事件表里的,这样子就可以做到说我们要发送的事件一定是与业务逻辑是一致的

接下来由我们组件来负责将事件表里的事件全部都发送到 EventBus,比如说 RabbitMQ 消息队列里面去,由接收方订阅

对于订阅的事件的话,设计的模式也是同理,当我们的应用程序在消息队列获取到信息的时候,它就会将这些消息持久化到我们的数据库的 Receive 事件表里,这样我们就可以在本地进行事件的处理,失败重试等操作,这些都是由 CAP 框架完成的,我们仅需要去做简单的配置,关注发布和订阅的业务逻辑即可

我们看一下代码,刚才有提到 CAP 的架构,关键的一点是需要事件的存储与我们的业务逻辑在同一个事务里,所以说我们在处理事务的逻辑部分的话,需要嵌入 CAP 的一部分代码,我们看一下 EFContext 的定义

public EFContext(DbContextOptions options, IMediator mediator, ICapPublisher capBus) : base(options)
{
    _mediator = mediator;
    _capBus = capBus;
}

之前有关注到有一个叫 ICapPublisher 这个入参,关键的是这一行代码我们需要关注一下

_currentTransaction = Database.BeginTransaction(_capBus, autoCommit: false);

这一行代码的作用是创建事务,我们可以看到创建事务的过程中,我们把 ICapPublisher 也传入给了这个方法的构造函数,实际上这个方法是由 CAP 的组件提供的,它的核心作用就是将我们要发送的事件与我们的业务的存储都放在同一个事务内部,这样子我们就可以使得事务提交时或者回滚时,我们的事件与业务逻辑的存取都是一致的

然后我们再来看一下配置的部分,写在 ServiceCollectionExtensions 下面

public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
    services.AddTransient<ISubscriberService, SubscriberService>();
    services.AddCap(options =>
    {
        options.UseEntityFramework<DomainContext>();

        options.UseRabbitMQ(options =>
        {
            configuration.GetSection("RabbitMQ").Bind(options);
        });
        //options.UseDashboard();
    });

    return services;
}

我们这里定义了一个 AddEventBus,可以看到将我们之前演示的代码订阅服务注入进来,然后 Services 最重点的代码是 AddCap,我们需要告诉 CAP 框架我们是针对 DomainContext 来实现我们的 EventBus,EventBus 与 DomainContext 共享我们的数据库连接,下面一行代码是指我们要用 RabbitMQ 来作为我们 EventBus 的消息队列的存储,这里可以看到使用了一个 Bind 的方法将我们的配置绑定到 RabbitMQ 的 options 上面去

我们可以看一下我们的配置

  "RabbitMQ": {
    "HostName": "localhost",
    "UserName": "admin",
    "Password": "123456",
    "VirtualHost": "geektime",
    "ExchangeName": "geek_queue"
  }

这里可以看到我们定义了一个 RabbitMQ 的配置,然后这里面会有我们的 host,因为是本地安装的,所以访问地址就是 localhost,VirtualHost 是 RabbitMQ 一个比较特殊的设置,它的作用是将 RabbitMQ 的空间区分为不同的空间,你可以认为这是一个租户,相同的 VirtualHost,大家都可以认为是一个 RabbitMQ 的集群,最下面的 ExchangeName 就是队列需要订阅的 Exchange 的名称,消息的发布和订阅都是通过这个 Exchange 来的

然后我们在 Startup 这里添加一行

services.AddEventBus(Configuration);

这样我们就配置完成了

33 | 集成事件:使用RabbitMQ来实现EventBus

为了演示我们的发布和订阅的话,我们在这里的代码做一些稍微的调整

namespace GeekTime.API.Application.DomainEventHandlers
{
    public class OrderCreatedDomainEventHandler : IDomainEventHandler<OrderCreatedDomainEvent>
    {
        ICapPublisher _capPublisher;
        public OrderCreatedDomainEventHandler(ICapPublisher capPublisher)
        {
            _capPublisher = capPublisher;
        }

        public async Task Handle(OrderCreatedDomainEvent notification, CancellationToken cancellationToken)
        {
            await _capPublisher.PublishAsync("OrderCreated", new OrderCreatedIntegrationEvent(notification.Order.Id));
        }
    }
}

这里我们发布了一个 OrderCreated 的集成事件,然后订阅一个 OrderCreated

namespace GeekTime.API.Application.IntegrationEvents
{
    public class SubscriberService : ISubscriberService, ICapSubscribe
    {
        IMediator _mediator;
        public SubscriberService(IMediator mediator)
        {
            _mediator = mediator;
        }

        [CapSubscribe("OrderPaymentSucceeded")]
        public void OrderPaymentSucceeded(OrderPaymentSucceededIntegrationEvent @event)
        {
            //Do SomeThing
        }

        [CapSubscribe("OrderCreated")]
        public void OrderCreated(OrderCreatedIntegrationEvent @event)
        {
            //Do SomeThing
        }
    }
}

通过标注属性,我们就可以完成订阅

也就是说我们创建一个订单的时,我们会触发订单创建的领域事件,订单创建的领域事件又发送了一个订单创建的集成事件,然后我们在订阅服务里面订阅了订单创建的集成事件

在发布和订阅的地方分别打上一个断点,启动程序,可以看到整个流程

我们再梳理一下整个流程,首先我们创建了一个订单,这个订单触发了我们的 OrderCreated 的领域事件,OrderCreated 的领域事件的处理器像我们的 EventBus 发布了一个 OrderCreated 的集成事件,我们在订阅服务的地方订阅了这个事件,所以我们可以接收到并且做出相应的处理

我们观察一下数据库的表,一共有四张表,cap.publish 和 cap.received 这两张表分别对应发送事件表和接收事件表,order 和 user 这两张表是我们的领域模型表

整个 CAP 的框架,它的实现原理其实有两个关键点,一个是事件表,一个就是事务控制,也就是说将事件的存储嵌入到我们的业务逻辑的事务里面去,这样子我们就可以保证我们的业务与事件是要么都能存储成功,要么都失败

整个 CAP 框架它的应用性是非常强的,非常建议在处理集成事件的时候使用这个框架

34 | MediatR:轻松实现命令查询职责分离模式(CQRS)

核心对象

  • IMeditator
  • IRequese、IRequest
  • IRequestHandler<in TRequest, TResponse>

首先我们安装了 MediatR 的 8.0 的组件包,还安装了依赖注入框架的扩展包,以及依赖注入框架的核心组件包

  • MediatR
  • MediatR.Extensions.Microsoft.DependencyInjection
  • Microsoft.Extensions.DependencyInjection

大家可以观察到 MediatR 的包名和命名空间少了一个 o,猜测是作者故意这样设计的,因为它具体实现里面会有一个接口和类是 Mediator,如果设置同名的话会有一些引用上的问题

var services = new ServiceCollection();

services.AddMediatR(typeof(Program).Assembly);

我们在这里构建一个 ServiceCollection,然后通过一行代码将我们当前的程序集注入进去,它就可以扫描我们当前程序集相关的类,下面看一下我们定义的两个类

internal class MyCommand : IRequest<long>
{ 
    public string CommandName { get; set; }
}

internal class MyCommandHandler : IRequestHandler<MyCommand, long>
{
    public Task<long> Handle(MyCommand request, CancellationToken cancellationToken)
    {
        Console.WriteLine($"MyCommandHandler执行命令:{request.CommandName}");
        return Task.FromResult(10L);
    }
}

第一个类是 MyCommand,它实现了 IRequest 接口,这个接口就代表中介者要执行的命令

第二个类是 MyCommandHandler,它实现了 IRequestHandler 的接口,这个就是我们对命令的处理器的定义

var serviceProvider = services.BuildServiceProvider();

var mediator = serviceProvider.GetService<IMediator>();

await mediator.Send(new MyCommand { CommandName = "cmd01" });

我们从容器里面获取一个 IMediator,然后通过 send 方法发送一个 MyCommand 命令,我们构造了一个新的 MyCommand 的实例传给它

启动程序,输出如下:

MyCommandHandler执行命令:cmd01

我们可以看到 MyCommandHandler 的 Handle 方法执行了,它输出了 MyCommandHandler 的执行命令 cmd01

这样子,这个中介者它有什么好处呢?

大家可以看到,通过中介者模式,我们将命令的构造和命令的处理可以分离开,那么命令的处理如何知道要处理哪个命令呢,就是通过我们泛型的约束来定义的,我们这里为 IRequestHandler 填入了 MyCommand 类型,所以我们能明确知道 MyCommandHandler 是用来处理 MyCommand 的

如果说我在程序里面实现了多个 Handler,我们可以试验一下

internal class MyCommandHandlerV2 : IRequestHandler<MyCommand, long>
{
    public Task<long> Handle(MyCommand request, CancellationToken cancellationToken)
    {
        Console.WriteLine($"MyCommandHandlerV2执行命令:{request.CommandName}");
        return Task.FromResult(10L);
    }
}

internal class MyCommandHandler : IRequestHandler<MyCommand, long>
{
    public Task<long> Handle(MyCommand request, CancellationToken cancellationToken)
    {
        Console.WriteLine($"MyCommandHandler执行命令:{request.CommandName}");
        return Task.FromResult(10L);
    }
}

启动程序,输出如下:

MyCommandHandlerV2执行命令:cmd01

大家可以看到我们输出的是 V2 执行命令

我们把代码进行一个调整,把这个定义移到后面

internal class MyCommandHandler : IRequestHandler<MyCommand, long>
{
    public Task<long> Handle(MyCommand request, CancellationToken cancellationToken)
    {
        Console.WriteLine($"MyCommandHandler执行命令:{request.CommandName}");
        return Task.FromResult(10L);
    }
}

internal class MyCommandHandlerV2 : IRequestHandler<MyCommand, long>
{
    public Task<long> Handle(MyCommand request, CancellationToken cancellationToken)
    {
        Console.WriteLine($"MyCommandHandlerV2执行命令:{request.CommandName}");
        return Task.FromResult(10L);
    }
}

启动程序,输出如下:

MyCommandHandler执行命令:cmd01

大家可以看到我们这次输出的并不是 V2,而是之前的那个命令,为什么会这样子呢?是因为实际上 mediator 对于 IRequestHandler 的扫描,它是有顺序的,后面扫描到的会替换前面扫描到的 Handler,它只会识别其中最后注册进去的一个,也就是说我们在处理 RequestHandler 的时候,我们要注意在注册时仅注册需要的那个

我们再来看看我们的应用程序,回到我们之前的工程里

namespace GeekTime.API.Application.Commands
{
    public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, long>
    {
        IOrderRepository _orderRepository;
        ICapPublisher _capPublisher;
        public CreateOrderCommandHandler(IOrderRepository orderRepository, ICapPublisher capPublisher)
        {
            _orderRepository = orderRepository;
            _capPublisher = capPublisher;
        }

        public async Task<long> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
        {

            var address = new Address("wen san lu", "hangzhou", "310000");
            var order = new Order("xiaohong1999", "xiaohong", 25, address);

            _orderRepository.Add(order);
            await _orderRepository.UnitOfWork.SaveEntitiesAsync(cancellationToken);
            return order.Id;
        }
    }
}

我们可以看到我们的 CreateOrderCommandHandler 实现的是 IRequestHandler,这也就是解释了为什么之前我们并没有显示的调用 CreateOrderCommandHandler,代码却能够执行到这里的原因

实际上我们在定义我的查询的时候,也可以这样定义,例如我们定义一个 MyOrderQuery,把订单的所有名称都输出出去

namespace GeekTime.API.Application.Queries
{
    public class MyOrderQuery : IRequest>
    {
        public MyOrderQuery(string userName) => UserName = userName;

        public string UserName { get; private set; }
    }
}

我们再定义一个查询器,这里就可以从各种地方查询到我们的数据然后返回出去

namespace GeekTime.API.Application.Queries
{
    public class MyOrderQueryHandler : IRequestHandler<MyOrderQuery, List<string>>
    {
        public Task<List<string>> Handle(MyOrderQuery request, CancellationToken cancellationToken)
        {
            return Task.FromResult(new List<string>());
        }
    }
}

实际上我们这样子就完成了查询和查询处理的定义

我们可以在 Controller 定义

[HttpGet]
public async Task<List<string>> QueryOrder([FromBody]MyOrderQuery myOrderQuery)
{
    return await _mediator.Send(myOrderQuery);
}

这样就完成了查询和查询处理逻辑的分离

我们执行命令是同样的实现方式,我们这样子做的话可以将我们的查询的输入和处理定义在一个目录下面,也可以将我们的命令的定义和命令的执行放在一个目录下面,使我们的 Controller 关注于身份认证或者基础设施缓存等等一些逻辑的处理,它不再关心说我的业务逻辑是怎么样子的

35 | MediatR:让领域事件处理更加优雅

核心对象

  • IMediator

  • INotification

  • INotificationHandler

这两个与之前的 Request 的行为是不一样的,接下来看一下代码

internal class MyEvent : INotification
{ 
    public string EventName { get; set; }
}

internal class MyEventHandler : INotificationHandler<MyEvent>
{
    public Task Handle(MyEvent notification, CancellationToken cancellationToken)
    {
        Console.WriteLine($"MyEventHandler执行:{notification.EventName}");
        return Task.CompletedTask;
    }
}

internal class MyEventHandlerV2 : INotificationHandler<MyEvent>
{
    public Task Handle(MyEvent notification, CancellationToken cancellationToken)
    {
        Console.WriteLine($"MyEventHandlerV2执行:{notification.EventName}");
        return Task.CompletedTask;
    }
}
//await mediator.Send(new MyCommand { CommandName = "cmd01" });
await mediator.Publish(new MyEvent { EventName = "event01" });

之前 mediator 使用了 Send 的方式来处理 Command,它还有一个方法 Publish,这个方法的入参是一个 INotification

启动程序,输出如下:

MyEventHandler执行:event01
MyEventHandlerV2执行:event01

与之前的 IRequest 不同的是,INotification 是可以注册多个 Handler 的,它是一个一对多的关系,借助它就可以对领域事件定义多个处理器来处理

接着看一下之前云服务的代码

public async Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default)
{
    var result = await base.SaveChangesAsync(cancellationToken);
    await _mediator.DispatchDomainEventsAsync(this);
    return true;
}

之前在 IUnitOfWork 定义的时候讲过一个发送领域事件的方法 DispatchDomainEventsAsync,看一下这个方法的定义

static class MediatorExtension
{
    public static async Task DispatchDomainEventsAsync(this IMediator mediator, DbContext ctx)
    {
        var domainEntities = ctx.ChangeTracker
            .Entries<Entity>()
            .Where(x => x.Entity.DomainEvents != null && x.Entity.DomainEvents.Any());

        var domainEvents = domainEntities
            .SelectMany(x => x.Entity.DomainEvents)
            .ToList();

        domainEntities.ToList()
            .ForEach(entity => entity.Entity.ClearDomainEvents());

        foreach (var domainEvent in domainEvents)
            await mediator.Publish(domainEvent);
    }
}

可以看到这里是将所有的实体内的领域事件全部都查找出来,然后通过 mediator 的 Publish 发送领域事件,具体的领域事件的处理注册在 mediator 里面,这里定义了一个 OrderCreatedDomainEventHandler

public class OrderCreatedDomainEventHandler : IDomainEventHandler<OrderCreatedDomainEvent>
{
    ICapPublisher _capPublisher;
    public OrderCreatedDomainEventHandler(ICapPublisher capPublisher)
    {
        _capPublisher = capPublisher;
    }

    public async Task Handle(OrderCreatedDomainEvent notification, CancellationToken cancellationToken)
    {
        await _capPublisher.PublishAsync("OrderCreated", new OrderCreatedIntegrationEvent(notification.Order.Id));
    }
}

它继承自 IDomainEventHandler,而 IDomainEventHandler 继承自 INotificationHandler

public interface IDomainEventHandler<TDomainEvent> : INotificationHandler<TDomainEvent> 
    where TDomainEvent : IDomainEvent
{
    //这里我们使用了INotificationHandler的Handle方法来作为处理方法的定义
    Task Handle(TDomainEvent domainEvent, CancellationToken cancellationToken);
}

这也就是为什么 IDomainEventHandler 会识别到 DomainEvent 并且进行处理,同样的在定义 DomainEvent 的时候,也需要标识它是一个 DomainEvent

public class OrderCreatedDomainEvent : IDomainEvent
{
    public Order Order { get; private set; }
    public OrderCreatedDomainEvent(Order order)
    {
        this.Order = order;
    }
}

而 DomainEvent 实际上也是继承自 INotification

public interface IDomainEvent : INotification
{
}

这也就意味着 EventHandler 可以正确的识别到对应的 Event 并且进行处理,这都是 MediatR 的核心能力

领域事件都是定义在 event 目录下,与领域模型定义在一起,所有的领域事件都继承 DomainEvent,分布于这个目录

领域事件的处理 Handler 都定义在 Application 应用层的 Application 下面的 DomainEventHandlers 目录下面

这样的好处是事件的定义与事件的处理是分开的,并且非常的明确知道有哪些领域事件,有哪些领域事件的处理程序

关于 MediatR 再补充一部分内容,在 TransactionBehavior 内可以看到这个类实际上继承自 IPipelineBehavior

namespace MediatR
{
    public interface IPipelineBehavior<in TRequest, TResponse>
    {
        Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next);
    }
}

这个接口的作用是在命令或者事件处理的之前或者之后插入逻辑,它的执行的方式有点像中间件的方式,在 Handler 的入参里面有一个 next 的参数,就是指 CommandHandler 或者 EventHandler 的执行的逻辑,在这里就可以决定 Handler 的具体执行之前或者之后,插入一些逻辑

public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
{
    var response = default(TResponse);
    var typeName = request.GetGenericTypeName();

    try
    {
        // 首先判断当前是否有开启事务
        if (_dbContext.HasActiveTransaction)
        {
            return await next();
        }

        // 定义了一个数据库操作执行的策略,比如说可以在里面嵌入一些重试的逻辑,这里创建了一个默认的策略
        var strategy = _dbContext.Database.CreateExecutionStrategy();

        await strategy.ExecuteAsync(async () =>
        {
            Guid transactionId;
            using (var transaction = await _dbContext.BeginTransactionAsync())
            using (_logger.BeginScope("TransactionContext:{TransactionId}", transaction.TransactionId))
            {
                _logger.LogInformation("----- 开始事务 {TransactionId} ({@Command})", transaction.TransactionId, typeName, request);

                response = await next();// next 实际上是指我们的后续操作,这里的模式有点像之前讲的中间件模式

                _logger.LogInformation("----- 提交事务 {TransactionId} {CommandName}", transaction.TransactionId, typeName);

                await _dbContext.CommitTransactionAsync(transaction);

                transactionId = transaction.TransactionId;
            }
        });

        return response;
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "处理事务出错 {CommandName} ({@Command})", typeName, request);

        throw;
    }
}

这里实现里在执行命令之前判断事务是否开启,如果事务开启的话继续执行后面的逻辑,如果事务没有开启,先开启事务,再执行后面的逻辑


文章作者: Chaoqiang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Chaoqiang !
评论
 上一篇
Netcore认证授权与IdentityServer4(1)鉴权授权源码 Netcore认证授权与IdentityServer4(1)鉴权授权源码
主要概要这个文章主要来聊一聊Net Core中的认证与授权,以及Net Core生态中非常火热的IdentityServer4这个组件的使用,主要内容如下: 鉴权授权流程变化 源码解读鉴权 源码解读多授权策略 JWT和Identity
下一篇 
NetCore 开发实战(1)——必备知识 NetCore 开发实战(1)——必备知识
01 | 简介为什么要学习 .NET Core 微软大力支持推动 .Net 技术生态发展 跨平台:更多的开发环境和部署环境选择,尤其是对 Docker 和 Kubernetes 的良好支持,快速构建微服务并部署到云基础设施中,实现高可用,可
  目录