В традиционной модели происходит примерно следующее: вы получаете команду, обрабатываете ее, сохраняете результат в базу данных и затем пытаетесь опубликовать событие в брокер сообщений. Но что если публикация не удалась? Или что если публикация прошла успешно, но данные не сохранились? Получаем несогласованные данные, потерянные события или, ещё хуже, дублирующиеся события. Ммм, знакомо?
Представьте ситуацию: пользователь размещает заказ, деньги списываются, но события о создании заказа потерялось по пути к микросервису доставки. Или наоборот - заказ не создался в базе, а события об успешном размещении уже полетели в другие сервисы. В любом случае, последствия такой рассинхронизации крайне неприятны. Именно здесь на сцену... простите, в игру вступает паттерн Outbox. Суть проста: мы сохраняем событие в таблицу Outbox внутри той же транзакции, в которой обновляем основные данные. Затем независимый процесс читает эту таблицу и публикует события в брокер сообений. Это решение обеспечивает атомарность операции записи данных и регистрации события. В C# 14 с использованием нового синтаксиса первичных конструкторов, кратких записей и улучшеного сопостовления шаблонов реализация этого паттерна становится еще проще.
На протяжении последних трех лет я внедрял подобные решения в различных проектах - от небольших стартапов до крупных корпоративных систем. И могу с уверенностью сказать - паттерн Outbox стал для меня незаменимым инструментом в обеспечении надежности распределеных систем.
Основы Event-Driven CQRS
CQRS (Command Query Responsibility Segregation) - это архитектурный паттерн, который я считаю революционным в современной разработке. Его суть проста: разделить операции чтения и записи. Звучит банально, но поверте мне, эта простая идея кардинально меняет архитектуру приложения. Когда я впервые встретил CQRS, то думал: "Зачем усложнять? Один репозиторий для чтения и записи - и все работает!" Но со временем, работая над высоконагруженными системами, я понял его истинную ценность. Модель чтения (queries) и модель записи (commands) имеют разные паттерны использования и масштабирования.
Традиционная монолитная модель: Client -> API -> Single Model -> Database
CQRS модель: Client -> Commands -> Write Model -> Database -> Queries -> Read Model -> Database (или кэш)
Event-Driven CQRS добавляет в эту схему событийный механизм. Вместо прямого обновления read-модели после изменения write-модели, система генерирует события, которые асинхронно обрабатываются для обновления read-модели.
Расширенная схема: Client -> Commands -> Write Model -> Database -> Events -> Event Handler -> Read Model -> Read Model
Это дает фантастическую производительность read-моделей. Я работал с проектом, где переход на такую архитектуру сократил время выполнения сложных запросов с нескольких секунд до миллисекунд. Почему? Потому что read-модель денормализована и оптимизирована конкретно для запросов. Асинхронная обработка команд - еще одно преимущество, о котором часто забывают. В синхронной модели пользователь ждет, пока вся цепочка операций завершится. В асинхронной - клиент получает подтверждение, что команда принята, а система обрабатывает ее в фоновом режиме.
Однажды я внедрил это решение в платежную систему: пользователь нажимал "Оплатить" и сразу получал уведомление "Платеж в обработке", а не ждал 5-10 секунд, как раньше. Конверсия выросла на 23%! Люди не любят ждать.
В микросервисной архитектуре событийная модель становится не просто удобной, а необходимой. Микросервисы должны быть слабо связаны и общаться асинхронно. События - идеальный механизм для этого.
Часто путают Event-Driven CQRS с Event Sourcing. Это разные паттерны! В Event Sourcing состояние системы - это последовательность событий, без них невозможно реконструировать данные. В Event-Driven CQRS события - это способ синхронизации моделей, но не единственный источник правды. Я столкнулся с этой путанницей, когда коллега предложил использовать Event Sourcing для всего проекта. После долгих обсуждений мы решили применить его только для критически важных доменов, где история изменений имела бизнес-ценность. Для остальных частей системы хватило Event-Driven CQRS без полного Event Sourcing. Выбор между ними зависит от конкретной задачи. Event Sourcing требует больше ресурсов на поддержку и сложнее в имплементации, но даёт полную историю изменений. Event-Driven CQRS проще внедрить и он эффективнее для большинства сценариев.
Давайте поговорим о ключевых компонентах Event-Driven CQRS. В течение многих лет работы с этим паттерном я выделил для себя несколько критически важных элементов.
Во-первых, брокер сообщений. Это ядро всей архитектуры. Я перепробовал разные варианты: RabbitMQ, Azure Service Bus, Kafka. Выбор зависит от требований к производительности и устойчивости. RabbitMQ отлично подходит для небольших и средних проектов, Kafka - для систем с огромным потоком данных. Забавный случай: в одном проекте мы использовали RabbitMQ для обработки заказов. При нагрузке в Black Friday система чуть не легла - очереди переполнились. После этого срочно мигрировали на Kafka. Мораль: всегда думайте о пиковых нагрузках!
Второй важный компонент - обработчики событий. Я предпочитаю делать их идемпотентными, чтобы повторная обработка одного и того же события не создавала проблем. Это сложнее в реализации, но спасает от множества проблем в продакшене.
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
| public async Task HandleAsync(OrderPlacedEvent @event)
{
// Проверка, не обработали ли мы уже это событие
if (await _processedEventsRepository.IsProcessedAsync(@event.Id))
return;
// Логика обновления read-модели
await _orderReadModelRepository.CreateAsync(
new OrderReadModel(@event.OrderId, @event.Total));
// Отметка события как обработанного
await _processedEventsRepository.MarkAsProcessedAsync(@event.Id);
} |
|
Третий компонент - системы мониторинга и отказоустойчивости. Я убежден, что без них Event-Driven система превращается в черный ящик. Нам нужно знать, где застряли события, какие обработчики падают и почему. В одном проекте мы реализовали "dead letter queue" - специальную очередь для необработанных событий. Она спасла нас, когда из-за ошибки в коде часть событий не обрабатывалась. Мы смогли восстановить все данные после исправления бага. Что касается организации кода, я часто использую такую структуру для команд:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| // Команда
public record PlaceOrderCommand(string CustomerId, List<OrderItem> Items);
// Обработчик команды
public class PlaceOrderCommandHandler : ICommandHandler<PlaceOrderCommand>
{
public async Task HandleAsync(PlaceOrderCommand command)
{
// Бизнес-логика и валидация
var order = new Order(Guid.NewGuid().ToString(), command.CustomerId, command.Items);
// Сохранение в БД и публикация события в рамках одной транзакции
await _orderRepository.SaveAsync(order);
// Событие автоматически публикуется через outbox
}
} |
|
Для обработки событий я предпочитаю использовать независимые обработчики, которые подписываются на конкретные события:
C# | 1
2
3
4
5
6
7
8
9
| public class OrderPlacedEventHandler : IEventHandler<OrderPlacedEvent>
{
public async Task HandleAsync(OrderPlacedEvent @event)
{
// Обновление read-модели
// Отправка уведомлений
// Другие действия
}
} |
|
Ещё один критический аспект - согласованность данных между сервисами. Здесь есть два основных подхода: eventual consistency (в конечном итоге все будет согласовано) и immediate consistency (моментальная согласованность). В большинстве случаев я выбираю первый вариант - он проще в реализации и масштабировании. Но это требует правильного проектирования доменной модели и бизнес-процессов. Пользователи должны понимать, что некоторые операции выполняются асинхронно.
Пример из жизни: в e-commerce системе статус заказа меняется на "Оплачен" не моментально после оплаты, а через некоторое время, когда событие об успешной оплате будет обработано соответствующим сервисом. Мы решили эту проблему, показывая пользователю промежуточный статус "Платеж в обработке".
На практике я заметил, что Event-Driven CQRS отлично подходит для доменов с естественной асинхронностью процессов. Например, в логистике: размещение заказа, подтверждение, сборка, отправка - все эти этапы и так происходят с задержкой, поэтому асинхронность в системе не создает проблем для пользователей.
CQRS Добрый день. Подскажите, где можно почитать материал по этой теме, для написания курсовой. Data Driven Test, провайдер базы данных Добрый день!
Пытаюсь настроить в VS 2010 тестирование.
Не могу понять какой провайдер нужно... Data driven test по данным из Access вот есть такой тестusing System;
using System.Collections.Generic;
using System.Linq;
using... Анимация State Driven Camera Всем привет. Подскажите пожалуйста, можно ли под State Driven Camera создать что-то вроде анимации...
Паттерн Outbox как гарантия доставки
Помню свой первый проект с микросервисами - я был так воодушевлён идеей независимых сервисов, общающихся через события! Но очень быстро столкнулся с фундаментальной проблемой: как гарантировать, что событие будет опубликовано, если сохранение в базу данных и публикация в брокер сообщений - это две отдельные операции?
Классический сценарий: пользователь регистрируется, мы сохраняем его данные в базу и пытаемся опубликовать событие UserRegistered. Но что если после успешного сохранения в базу сервис падает или брокер сообщений недоступен? Событие теряется, другие сервисы не узнают о новом пользователе, и мы получаем несогласованность данных. Существует несколько подходов к решению этой проблемы:
1. Сначала публикуем событие, потом сохраняем в базу. Но тогда можем разослать события, а данные не сохранить.
2. Двухфазный коммит (2PC) между базой и брокером. Слишком сложно и малопроизводительно.
3. Паттерн Outbox - и вот он действительно работает!
Суть паттерна Outbox в том, что мы сохраняем события в специальную таблицу в той же базе данных, где хранятся основные данные, в рамках одной транзакции. Вот как это выглядит:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| public async Task PlaceOrderAsync(Order order)
{
using var conn = new SqlConnection(_dbConn);
await conn.OpenAsync();
using var tx = conn.BeginTransaction();
try
{
// Вставка данных заказа
await conn.ExecuteAsync(
"INSERT INTO Orders (Id, Total) VALUES (@Id, @Total)",
new { order.Id, order.Total },
tx);
// Подготовка и вставка события
var evt = order.ToEvent();
await conn.ExecuteAsync(
"INSERT INTO Outbox (Id, Type, Payload) VALUES (@Id, @Type, @Payload)",
new {
Id = Guid.NewGuid(),
Type = evt.GetType().Name,
Payload = JsonSerializer.Serialize(evt)
},
tx);
// Фиксация транзакции
tx.Commit();
}
catch (Exception ex)
{
// Откат транзакции в случае ошибки
tx.Rollback();
throw;
}
} |
|
После этого отдельный процесс (обычно фоновая служба) читает события из таблицы Outbox и публикует их в брокер сообщений:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| public async Task ProcessOutboxAsync()
{
using var conn = new SqlConnection(_dbConn);
var events = await conn.QueryAsync<OutboxRecord>(
"SELECT TOP 100 * FROM Outbox WHERE Processed = 0");
foreach (var record in events)
{
var evt = DeserializeEvent(record.Type, record.Payload);
await _publisher.PublishAsync(evt);
await conn.ExecuteAsync(
"UPDATE Outbox SET Processed = 1 WHERE Id = @Id",
new { record.Id });
}
} |
|
Этот механизм обеспечивает атомарность операций с базой данных. Если транзакция успешно завершается, мы гарантированно имеем и данные, и событие в базе. Если транзакция откатывается, не сохраняется ни то, ни другое. Одним из ключевых аспектов паттерна Outbox является обработка дублирующихся сообщений. Когда я первый раз реализовал этот паттерн, то наивно полагал, что просто пометить событие как обработанное достаточно. Но реальность быстро поставила меня на место.
Что если процесс публикации событий завершился после отправки сообщения, но до обновления статуса в базе? При следующем запуске он попытается отправить то же событие снова! Тут-то и приходит на помощь идемпотентность - способность операции давать одинаковый результат при повторном выполнении. Я обычно реализую идемпотентность на стороне потребителя событий:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
| public async Task HandleOrderPlacedEvent(OrderPlacedEvent @event)
{
// Проверяем, не обрабатывали ли мы уже это событие
if (await _processedEventsStore.HasBeenProcessed(@event.EventId))
return;
// Выполняем действия с событием
await _orderService.ProcessNewOrder(@event.OrderId, @event.Items);
// Сохраняем ID события как обработанного
await _processedEventsStore.MarkAsProcessed(@event.EventId);
} |
|
Альтернативный подход - использовать уникальные идентификаторы событий и настроить механизм "exactly-once delivery" на уровне брокера сообщений. Но честно? Это сложнее и менее надежно.
Часто спрашивают: чем Outbox лучше Change Data Capture (CDC)? CDC перехватывает изменения в базе данных и генерирует события на их основе. Звучит похоже, но есть фундаментальные различия. В одном проекте мы использовали CDC, и главная проблема была в том, что события формировались автоматически на основе изменений в базе, без контекста доменной логики. Попробуй пойми из изменения строки в таблице, что именно произошло в бизнес-процессе! В Outbox же мы явно создаем события с богатым бизнес-контекстом. Ещё одна проблема CDC - сложность настройки. Для PostgreSQL используется logical replication, для SQL Server - change tracking или change data capture. Каждый подход требует специфичных настроек и привилегий.
Что касается очистки таблицы Outbox - тут есть несколько стратегий:
1. Простое удаление обработанных сообщений через определенный интервал.
2. Перемещение обработанных сообщений в архивную таблицу.
3. Установка TTL (time to live) для записей.
Я обычно использую комбинированный подход: устанавливаю флаг Processed = 1 сразу после публикации, а затем отдельный процесс удаляет записи старше определенного возраста:
C# | 1
2
3
4
5
6
7
| public async Task CleanupOutboxAsync()
{
using var conn = new SqlConnection(_dbConn);
await conn.ExecuteAsync(
"DELETE FROM Outbox WHERE Processed = 1 AND CreatedAt < @threshold",
new { threshold = DateTime.UtcNow.AddDays(-7) });
} |
|
Для критичных событий я предпочитаю архивирование вместо удаления. Однажды нам это спасло систему, когда обнаружилась ошибка в обработке определенного типа событий - мы смогли восстановить все из архива.
Интеграция Outbox с паттерном Saga - это отдельная интересная тема. Sagas используются для координации распределенных транзакций через несколько сервисов. В этом контексте Outbox обеспечивает надежную доставку команд и событий между шагами саги. Я считаю, что внедрение паттерна Outbox - это почти всегда компромис между простотой реализации и надежностью системы. В проетах небольшого и среднего масштаба он дает огромные преимущества с минимальными накладными расходами. Стоит отметить, что транзакционные гарантии, предоставляемые этим паттерном, трудно переоценить.
На одном из моих последних проектов мы потратили недели на отладку странных несоответствий между сервисами, пока не обнаружили, что примерно 0.5% событий просто терялись при публикации. После внедрения Outbox проблема ушла полностью. Небольшие изменения в архитектуре - огромный выигрыш в стабильности! При сравнении Outbox с традиционными подходами выигрыш очевиден:
Синхронная публикация: [База данных] -> [Сохранение данных] -> [Публикация события] если публикация не удалась, данные уже сохранены
Outbox: [База данных] -> [Транзакция: сохранение данных + запись в Outbox] -> [Асинхронная публикация]
Преимущество: атомарность операции гарантирована транзакцией базы данных
Важно также рассмотреть технические аспекты реализации паттерна. Таблица Outbox обычно имеет следующую структуру:
SQL | 1
2
3
4
5
6
7
8
9
10
| CREATE TABLE Outbox (
Id UNIQUEIDENTIFIER PRIMARY KEY,
EventType NVARCHAR(100) NOT NULL,
EventData NVARCHAR(MAX) NOT NULL,
CreatedAt DATETIME2 NOT NULL DEFAULT GETUTCDATE(),
Processed BIT NOT NULL DEFAULT 0,
ProcessedAt DATETIME2 NULL,
Error NVARCHAR(MAX) NULL,
RetryCount INT NOT NULL DEFAULT 0
); |
|
Я рекомендую добавлять поля для отслеживания ошибок и количества попыток обработки - это значительно упрощает отладку и восстановление после сбоев. Кроме того, стоит добавить индекс для быстрого поиска необработанных сообщений:
SQL | 1
| CREATE INDEX IX_Outbox_Processed ON Outbox (Processed) INCLUDE (Id, EventType); |
|
Для эффективной обработки таблицы Outbox я рекомендую использовать механизм блокировок, чтобы несколко экземпляров обработчика (например, в среде с несколькими инстансами приложения) не пытались обработать одни и те же сообщения:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| public async Task ProcessOutboxAsync()
{
using var conn = new SqlConnection(_dbConn);
await conn.OpenAsync();
using var transaction = conn.BeginTransaction();
// Получаем и блокируем записи для обработки
var events = await conn.QueryAsync<OutboxRecord>(
@"SELECT TOP(20) * FROM Outbox WITH (UPDLOCK, READPAST)
WHERE Processed = 0
ORDER BY CreatedAt",
transaction: transaction);
foreach (var record in events)
{
try
{
var evt = DeserializeEvent(record.EventType, record.EventData);
await _publisher.PublishAsync(evt);
await conn.ExecuteAsync(
@"UPDATE Outbox
SET Processed = 1, ProcessedAt = @now
WHERE Id = @Id",
new { now = DateTime.UtcNow, record.Id },
transaction);
}
catch (Exception ex)
{
await conn.ExecuteAsync(
@"UPDATE Outbox
SET RetryCount = RetryCount + 1, Error = @error
WHERE Id = @Id",
new { error = ex.ToString(), record.Id },
transaction);
}
}
await transaction.CommitAsync();
} |
|
В высоконагруженных системах я обычно разделяю таблицу Outbox по типам событий или другим критериям (шардинг). Это позволяет обрабатывать разные типы событий параллельно и снижает нагрузку на базу данных.
Особое внимание стоит уделить обработке сбоев. Стратегия повторных попыток должна учитывать тип ошибки. Например, временная недоступность брокера сообщений - это повод для повторной попытки, а ошибка сериализации события - нет.
Я использую экспоненциальную стратегию задержки между попытками:
C# | 1
2
3
4
| TimeSpan GetNextRetryDelay(int retryCount)
{
return TimeSpan.FromSeconds(Math.Pow(2, Math.Min(retryCount, 8)));
} |
|
Это дает задержки: 2, 4, 8, 16, 32, 64, 128, 256 секунд.
В моей практике я также столкнулся с проблемой, когда Outbox становился бутылочным горлышком при высоких нагрузках. Решение? Батчинг! Вместо обработки сообщений по одному я стал обрабатывать их пакетами:
C# | 1
2
3
4
5
6
7
8
9
10
| // Публикация пакета событий
await _publisher.PublishBatchAsync(events);
// Обновление статуса для всего пакета
await conn.ExecuteAsync(
@"UPDATE Outbox
SET Processed = 1, ProcessedAt = @now
WHERE Id IN @Ids",
new { now = DateTime.UtcNow, Ids = events.Select(e => e.Id).ToList() },
transaction); |
|
Такой подход сократил время обработки Outbox на порядок в одном из проектов.
Ещё одно практическое соображение - мониторинг. Я настоятельно рекомендую настроить алерты на количество необработанных сообщений в Outbox и время их пребывания там. Это ранние индикаторы проблем в системе.
Внедрение паттерна Outbox в существующую систему может быть вызовом, особенно если у вас много мест, где происходит публикация событий. Я обычно рекомендую инкрементальный подход - начинайте с наиболее критичных доменов и постепенно расширяйте охват.
Практическая реализация на C# 14
Давайте я покажу, как реализовать Event-Driven CQRS с паттерном Outbox на C# 14, используя его новые возможности - первичные конструкторы, краткие записи и улучшеное сопостовление шаблонов. Я начну с организации команд и запросов с помощью библиотеки MediatR. Это популярный инструмент для имплементации шаблона Mediator, который значительно упрощает работу с командами и событиями. Вот как выглядит базовая настройка:
C# | 1
2
3
4
5
6
7
| public static IServiceCollection AddMediatrServices(this IServiceCollection services)
{
return services
.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(Assembly.GetExecutingAssembly()))
.AddTransient(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>))
.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
} |
|
Я обычно добавляю пару поведений в конвейер обработки - для валидации и логирования. Это позволяет отделить эти сквозные задачи от основной бизнес-логики.
Теперь определим доменную модель. В C# 14 это становится намного лаконичнее благодаря первичным конструкторам:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| public class Order
{
public string Id { get; }
public decimal Total { get; }
public List<OrderItem> Items { get; }
public string CustomerId { get; }
public OrderStatus Status { get; private set; }
public Order(string id, decimal total, List<OrderItem> items, string customerId)
{
Id = id;
Total = total;
Items = items;
CustomerId = customerId;
Status = OrderStatus.Created;
}
public void MarkAsPaid() => Status = OrderStatus.Paid;
public OrderPlacedEvent ToPlacedEvent() => new(Id, Total, CustomerId, Items.Select(i => i.ToDto()).ToList());
public OrderPaidEvent ToPaidEvent() => new(Id, Total, CustomerId);
}
public record OrderItem(string ProductId, int Quantity, decimal Price)
{
public OrderItemDto ToDto() => new(ProductId, Quantity, Price);
}
public record OrderItemDto(string ProductId, int Quantity, decimal Price);
public record OrderPlacedEvent(string OrderId, decimal Total, string CustomerId, List<OrderItemDto> Items);
public record OrderPaidEvent(string OrderId, decimal Total, string CustomerId); |
|
Обратите внимание, как я использую новый синтаксис записей (records) для событий и DTO. Это не только сокращает код, но и делает его более читаемым.
Следующий шаг - интеграция с Entity Framework. Я создаю контекст базы данных, который включает и основные сущности, и таблицу Outbox:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| public class AppDbContext : DbContext
{
public DbSet<Order> Orders { get; set; }
public DbSet<OutboxMessage> OutboxMessages { get; set; }
public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<Order>(entity =>
{
entity.HasKey(e => e.Id);
entity.Property(e => e.Total).HasColumnType("decimal(18,2)");
entity.Property(e => e.Status).HasConversion<string>();
entity.HasMany<OrderItem>().WithOne().HasForeignKey("OrderId");
});
modelBuilder.Entity<OrderItem>(entity =>
{
entity.HasKey(e => new { e.OrderId, e.ProductId });
entity.Property(e => e.Price).HasColumnType("decimal(18,2)");
});
modelBuilder.Entity<OutboxMessage>(entity =>
{
entity.HasKey(e => e.Id);
entity.Property(e => e.EventType).IsRequired();
entity.Property(e => e.EventData).IsRequired();
entity.Property(e => e.CreatedAt).HasDefaultValueSql("GETUTCDATE()");
entity.HasIndex(e => e.Processed);
});
}
}
public class OutboxMessage
{
public Guid Id { get; set; }
public string EventType { get; set; } = null!;
public string EventData { get; set; } = null!;
public DateTime CreatedAt { get; set; }
public bool Processed { get; set; }
public DateTime? ProcessedAt { get; set; }
public string? Error { get; set; }
public int RetryCount { get; set; }
} |
|
Для обработки команд я использую отдельный слой сервисов. Вот пример сервиса для обработки заказов с использованием паттерна Outbox:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| public class OrderService : IOrderService
{
private readonly AppDbContext _dbContext;
private readonly IEventSerializer _eventSerializer;
public OrderService(AppDbContext dbContext, IEventSerializer eventSerializer)
{
_dbContext = dbContext;
_eventSerializer = eventSerializer;
}
public async Task<string> PlaceOrderAsync(PlaceOrderCommand command)
{
// Создаем новый заказ
var order = new Order(
Guid.NewGuid().ToString(),
command.Items.Sum(i => i.Price * i.Quantity),
command.Items.Select(i => new OrderItem(i.ProductId, i.Quantity, i.Price)).ToList(),
command.CustomerId
);
// Начинаем транзакцию
using var transaction = await _dbContext.Database.BeginTransactionAsync();
try
{
// Сохраняем заказ
_dbContext.Orders.Add(order);
// Создаем событие и сохраняем его в outbox
var @event = order.ToPlacedEvent();
_dbContext.OutboxMessages.Add(new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = @event.GetType().Name,
EventData = _eventSerializer.Serialize(@event),
CreatedAt = DateTime.UtcNow,
Processed = false
});
await _dbContext.SaveChangesAsync();
// Фиксируем транзакцию
await transaction.CommitAsync();
return order.Id;
}
catch
{
// В случае ошибки откатываем транзакцию
await transaction.RollbackAsync();
throw;
}
}
} |
|
Этот код демонстрирует как мы можем сохранить заказ и событие в одной транзакции, что гарантирует согласованность данных.
Теперь нам нужен фоновый сервис для обработки сообщений из Outbox и их публикации в брокер сообщений. Я обычно использую IHostedService для этой цели:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
| public class OutboxProcessor : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<OutboxProcessor> _logger;
private readonly IEventSerializer _eventSerializer;
private readonly IEventPublisher _eventPublisher;
private readonly TimeSpan _pollingInterval = TimeSpan.FromSeconds(10);
public OutboxProcessor(
IServiceProvider serviceProvider,
ILogger<OutboxProcessor> logger,
IEventSerializer eventSerializer,
IEventPublisher eventPublisher)
{
_serviceProvider = serviceProvider;
_logger = logger;
_eventSerializer = eventSerializer;
_eventPublisher = eventPublisher;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Outbox processor is starting.");
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessOutboxMessagesAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred while processing outbox messages.");
}
await Task.Delay(_pollingInterval, stoppingToken);
}
_logger.LogInformation("Outbox processor is stopping.");
}
private async Task ProcessOutboxMessagesAsync(CancellationToken stoppingToken)
{
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var messages = await dbContext.OutboxMessages
.Where(m => !m.Processed)
.OrderBy(m => m.CreatedAt)
.Take(20)
.ToListAsync(stoppingToken);
if (!messages.Any())
return;
_logger.LogInformation("Found {Count} messages to process", messages.Count);
foreach (var message in messages)
{
try
{
var eventType = Type.GetType($"YourNamespace.{message.EventType}");
if (eventType == null)
{
_logger.LogWarning("Unknown event type: {EventType}", message.EventType);
message.Processed = true;
message.ProcessedAt = DateTime.UtcNow;
message.Error = "Unknown event type";
continue;
}
var @event = _eventSerializer.Deserialize(message.EventData, eventType);
await _eventPublisher.PublishAsync(@event, stoppingToken);
message.Processed = true;
message.ProcessedAt = DateTime.UtcNow;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing outbox message {MessageId}", message.Id);
message.RetryCount++;
message.Error = ex.ToString();
// Если превышено количество попыток, помечаем как обработаное с ошибкой
if (message.RetryCount >= 5)
{
message.Processed = true;
message.ProcessedAt = DateTime.UtcNow;
}
}
}
await dbContext.SaveChangesAsync(stoppingToken);
}
} |
|
Обратите внимание на механизм повторных попыток - мы увеличиваем счетчик и пытаемся снова при следующем запуске, но после определенного количества неудачных попыток помечаем сообщение как обработаное с ошибкой, чтобы оно не блокировало обработку других сообщений.
Для интеграции с брокером сообщений я обычно создаю абстракцию в виде интерфейса, чтобы не привязываться к конкретной реализации. Вот как это может выглядеть:
C# | 1
2
3
4
5
6
7
| public interface IEventPublisher
{
Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
where TEvent : class;
Task PublishBatchAsync<TEvent>(IEnumerable<TEvent> events, CancellationToken cancellationToken = default)
where TEvent : class;
} |
|
В своих проетках я часто использую Azure Service Bus для публикации событий. Вот реализация этого интерфейса для него:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
| public class AzureServiceBusPublisher : IEventPublisher
{
private readonly ServiceBusClient _client;
private readonly ILogger<AzureServiceBusPublisher> _logger;
private readonly Dictionary<Type, ServiceBusSender> _senders = new();
public AzureServiceBusPublisher(string connectionString, ILogger<AzureServiceBusPublisher> logger)
{
_client = new ServiceBusClient(connectionString);
_logger = logger;
}
public async Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
where TEvent : class
{
var sender = GetOrCreateSender<TEvent>();
var eventType = typeof(TEvent).Name;
var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(@event))
{
MessageId = Guid.NewGuid().ToString(),
Subject = eventType,
ApplicationProperties =
{
["EventType"] = eventType
}
};
await sender.SendMessageAsync(message, cancellationToken);
_logger.LogInformation("Published event {EventType} with ID {MessageId}",
eventType, message.MessageId);
}
public async Task PublishBatchAsync<TEvent>(IEnumerable<TEvent> events, CancellationToken cancellationToken = default)
where TEvent : class
{
var sender = GetOrCreateSender<TEvent>();
var eventType = typeof(TEvent).Name;
var messages = events.Select(e => new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(e))
{
MessageId = Guid.NewGuid().ToString(),
Subject = eventType,
ApplicationProperties =
{
["EventType"] = eventType
}
}).ToList();
await sender.SendMessagesAsync(messages, cancellationToken);
_logger.LogInformation("Published {Count} events of type {EventType}",
messages.Count, eventType);
}
private ServiceBusSender GetOrCreateSender<TEvent>() where TEvent : class
{
var eventType = typeof(TEvent);
if (!_senders.TryGetValue(eventType, out var sender))
{
var queueName = eventType.Name.ToLowerInvariant();
sender = _client.CreateSender(queueName);
_senders[eventType] = sender;
}
return sender;
}
public async ValueTask DisposeAsync()
{
foreach (var sender in _senders.Values)
{
await sender.DisposeAsync();
}
await _client.DisposeAsync();
}
} |
|
Для десериализации событий нам понадобится сервис, который будет преобразовывать JSON в объекты событий:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| public interface IEventSerializer
{
string Serialize(object @event);
object Deserialize(string data, Type eventType);
TEvent Deserialize<TEvent>(string data) where TEvent : class;
}
public class JsonEventSerializer : IEventSerializer
{
private readonly JsonSerializerOptions _options;
public JsonEventSerializer()
{
_options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false
};
}
public string Serialize(object @event) =>
JsonSerializer.Serialize(@event, @event.GetType(), _options);
public object Deserialize(string data, Type eventType) =>
JsonSerializer.Deserialize(data, eventType, _options)
?? throw new InvalidOperationException($"Failed to deserialize {eventType.Name}");
public TEvent Deserialize<TEvent>(string data) where TEvent : class =>
JsonSerializer.Deserialize<TEvent>(data, _options)
?? throw new InvalidOperationException($"Failed to deserialize {typeof(TEvent).Name}");
} |
|
Обработка событий на стороне потребителя также требует внимания. Я обычно использую механизм подписки через тот же MediatR, создавая обработчики для конкретных типов событий:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
| public class OrderCreatedEventHandler : INotificationHandler<OrderPlacedEvent>
{
private readonly IReadModelRepository<OrderReadModel> _repository;
private readonly ILogger<OrderCreatedEventHandler> _logger;
private readonly IProcessedEventsRepository _processedEvents;
public OrderCreatedEventHandler(
IReadModelRepository<OrderReadModel> repository,
IProcessedEventsRepository processedEvents,
ILogger<OrderCreatedEventHandler> logger)
{
_repository = repository;
_processedEvents = processedEvents;
_logger = logger;
}
public async Task Handle(OrderPlacedEvent notification, CancellationToken cancellationToken)
{
var eventId = notification.GetType().Name + "_" + notification.OrderId;
// Проверяем, не обрабатывали ли мы уже это событие
if (await _processedEvents.HasBeenProcessedAsync(eventId, cancellationToken))
{
_logger.LogInformation("Event {EventId} already processed", eventId);
return;
}
try
{
// Создаем read-модель на основе события
var readModel = new OrderReadModel
{
Id = notification.OrderId,
CustomerId = notification.CustomerId,
Total = notification.Total,
Status = "Created",
Items = notification.Items.Select(i => new OrderItemReadModel
{
ProductId = i.ProductId,
Quantity = i.Quantity,
Price = i.Price
}).ToList()
};
await _repository.CreateAsync(readModel, cancellationToken);
// Отмечаем событие как обработанное
await _processedEvents.MarkAsProcessedAsync(eventId, cancellationToken);
_logger.LogInformation("Created read model for order {OrderId}", notification.OrderId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing OrderPlacedEvent for order {OrderId}",
notification.OrderId);
throw; // Позволяем механизму повторных попыток сработать
}
}
} |
|
Для хранения информации об обработанных событиях я использую отдельное хранилище:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| public interface IProcessedEventsRepository
{
Task<bool> HasBeenProcessedAsync(string eventId, CancellationToken cancellationToken = default);
Task MarkAsProcessedAsync(string eventId, CancellationToken cancellationToken = default);
}
public class ProcessedEventsRepository : IProcessedEventsRepository
{
private readonly AppDbContext _dbContext;
public ProcessedEventsRepository(AppDbContext dbContext)
{
_dbContext = dbContext;
}
public async Task<bool> HasBeenProcessedAsync(string eventId, CancellationToken cancellationToken = default)
{
return await _dbContext.ProcessedEvents
.AnyAsync(e => e.EventId == eventId, cancellationToken);
}
public async Task MarkAsProcessedAsync(string eventId, CancellationToken cancellationToken = default)
{
_dbContext.ProcessedEvents.Add(new ProcessedEvent
{
EventId = eventId,
ProcessedAt = DateTime.UtcNow
});
await _dbContext.SaveChangesAsync(cancellationToken);
}
} |
|
Для чтения данных (read-модель) я предпочитаю использовать отдельную базу данных или хотя бы отдельные таблицы в той же базе, оптимизированные для запросов:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| public interface IReadModelRepository<T> where T : class
{
Task<T?> GetByIdAsync(string id, CancellationToken cancellationToken = default);
Task<IEnumerable<T>> GetAllAsync(CancellationToken cancellationToken = default);
Task CreateAsync(T entity, CancellationToken cancellationToken = default);
Task UpdateAsync(T entity, CancellationToken cancellationToken = default);
}
public class OrderReadModelRepository : IReadModelRepository<OrderReadModel>
{
private readonly ReadDbContext _dbContext;
public OrderReadModelRepository(ReadDbContext dbContext)
{
_dbContext = dbContext;
}
public async Task<OrderReadModel?> GetByIdAsync(string id, CancellationToken cancellationToken = default)
{
return await _dbContext.OrderReadModels
.Include(o => o.Items)
.FirstOrDefaultAsync(o => o.Id == id, cancellationToken);
}
public async Task<IEnumerable<OrderReadModel>> GetAllAsync(CancellationToken cancellationToken = default)
{
return await _dbContext.OrderReadModels
.Include(o => o.Items)
.ToListAsync(cancellationToken);
}
public async Task CreateAsync(OrderReadModel entity, CancellationToken cancellationToken = default)
{
_dbContext.OrderReadModels.Add(entity);
await _dbContext.SaveChangesAsync(cancellationToken);
}
public async Task UpdateAsync(OrderReadModel entity, CancellationToken cancellationToken = default)
{
_dbContext.OrderReadModels.Update(entity);
await _dbContext.SaveChangesAsync(cancellationToken);
}
} |
|
Вся эта инфраструктура требует правильной регистрации зависимостей. В .NET 8 для этого можно использовать удобный синтаксис:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| var builder = WebApplication.CreateBuilder(args);
// Регистрация контекстов баз данных
builder.Services.AddDbContext<AppDbContext>(options =>
options.UseSqlServer(builder.Configuration.GetConnectionString("WriteDatabase")));
builder.Services.AddDbContext<ReadDbContext>(options =>
options.UseSqlServer(builder.Configuration.GetConnectionString("ReadDatabase")));
// Регистрация сервисов
builder.Services
.AddScoped<IOrderService, OrderService>()
.AddScoped<IEventSerializer, JsonEventSerializer>()
.AddSingleton<IEventPublisher>(sp =>
new AzureServiceBusPublisher(
builder.Configuration.GetConnectionString("ServiceBus")!,
sp.GetRequiredService<ILogger<AzureServiceBusPublisher>>()))
.AddScoped<IProcessedEventsRepository, ProcessedEventsRepository>()
.AddScoped<IReadModelRepository<OrderReadModel>, OrderReadModelRepository>();
// Регистрация MediatR
builder.Services.AddMediatrServices();
// Регистрация фоновых сервисов
builder.Services.AddHostedService<OutboxProcessor>();
var app = builder.Build(); |
|
Механизм снепшотов для оптимизации восстановления состояния особенно полезен в системах с большим количеством событий. Вместо воспроизведения всех событий с самого начала, мы можем восстанавливать состояние из последнего снепшота и применять только новые события:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| public class SnapshotService<T> where T : class
{
private readonly ISnapshotRepository<T> _repository;
private readonly ILogger<SnapshotService<T>> _logger;
public SnapshotService(ISnapshotRepository<T> repository, ILogger<SnapshotService<T>> logger)
{
_repository = repository;
_logger = logger;
}
public async Task<T?> GetLatestSnapshotAsync(string aggregateId, CancellationToken cancellationToken = default)
{
return await _repository.GetLatestAsync(aggregateId, cancellationToken);
}
public async Task SaveSnapshotAsync(string aggregateId, T snapshot, long version, CancellationToken cancellationToken = default)
{
await _repository.SaveAsync(aggregateId, snapshot, version, cancellationToken);
_logger.LogInformation("Saved snapshot for aggregate {AggregateId} at version {Version}",
aggregateId, version);
}
} |
|
Сложности и подводные камни
При внедрении Event-Driven CQRS с паттерном Outbox я столкнулся с рядом неочевидных проблем, о которых редко пишут в туториалах. Давайте поговорим о производительности и масштабируемости такого решения.
Первое, с чем я столкнулся - Outbox таблица быстро разрастается в высоконагруженных системах. В одном из проектов через три месяца эксплуатации она содержала более 20 миллионов записей, что существенно снизило скорость SQL-запросов. Решение? Партиционирование таблицы по дате создания события:
SQL | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| CREATE PARTITION FUNCTION OutboxPartitionFunction (datetime2)
AS RANGE RIGHT FOR VALUES (
'2023-01-01T00:00:00.000',
'2023-02-01T00:00:00.000',
'2023-03-01T00:00:00.000'
-- и так далее
);
CREATE PARTITION SCHEME OutboxPartitionScheme
AS PARTITION OutboxPartitionFunction
TO (
[PRIMARY],
[PRIMARY],
[PRIMARY],
[PRIMARY]
);
CREATE TABLE Outbox (
-- ... определение таблицы
)
ON OutboxPartitionScheme(CreatedAt); |
|
Второй подвох - увеличение латентности при публикации событий. Асинхронная обработка Outbox происходит по расписанию, что вносит задержку. Если ваш бизнес-процесс критичен к скорости обновления read-модели, стоит рассмотреть гибридный подход: критичные события публиковать синхронно, а все остальные через Outbox. Что касается альтернативных подходов, я экспериментировал с CDC (Change Data Capture) от SQL Server. Он отслеживает изменения в таблицах и генерирует события автоматически. Плюс - не нужно писать код для Outbox. Минус - нет контроля над форматом событий, они слишком привязаны к структуре БД, а не к бизнес-модели. Еще одна альтернатива - Debezium, проект с открытым исходным кодом для CDC. Он интегрируется с Kafka и поддерживает различные СУБД. Однако, в моем опыте, его настройка сложнее, чем реализация собственного Outbox.
Для тестовой среды я обычно использую Docker Compose. Вот пример конфигурации:
YAML | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| version: '3.8'
services:
db:
image: mcr.microsoft.com/mssql/server:2022-latest
environment:
- ACCEPT_EULA=Y
- SA_PASSWORD=YourStrong@Passw0rd
ports:
- "1433:1433"
volumes:
- sqldata:/var/opt/mssql
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
app:
build:
context: .
dockerfile: Dockerfile
depends_on:
- db
- rabbitmq
environment:
- ConnectionStrings__WriteDatabase=Server=db;Database=WriteDb;User=sa;Password=YourStrong@Passw0rd;TrustServerCertificate=true
- ConnectionStrings__ReadDatabase=Server=db;Database=ReadDb;User=sa;Password=YourStrong@Passw0rd;TrustServerCertificate=true
- RabbitMQ__Host=rabbitmq
volumes:
sqldata: |
|
Модульное тестирование CQRS-систем требует особого подхода. Я использую паттерн "Port and Adapters" (или "Гексагональная архитектура"), который позволяет изолировать бизнес-логику от инфраструктуры:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| [Fact]
public async Task PlaceOrder_ShouldCreateOrderAndStoreEvent_WhenCommandIsValid()
{
// Arrange
var mockDbContext = new Mock<AppDbContext>();
var mockEventSerializer = new Mock<IEventSerializer>();
var service = new OrderService(mockDbContext.Object, mockEventSerializer.Object);
var command = new PlaceOrderCommand("customer123", new List<OrderItemDto>
{
new("product1", 2, 10.0m)
});
// Act
await service.PlaceOrderAsync(command);
// Assert
mockDbContext.Verify(x => x.Orders.Add(It.IsAny<Order>()), Times.Once);
mockDbContext.Verify(x => x.OutboxMessages.Add(It.IsAny<OutboxMessage>()), Times.Once);
mockDbContext.Verify(x => x.SaveChangesAsync(It.IsAny<CancellationToken>()), Times.Once);
} |
|
Для интеграционного тестирования я предпочитаю использовать контейнеры и реальную базу данных, но с изолированными схемами для каждого теста.
В продакшн-среде Kubernetes дает невероятную гибкость. Я обычно разделяю сервисы записи и чтения на отдельные deployments с разными стратегиями масштабирования:
YAML | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| apiVersion: apps/v1
kind: Deployment
metadata:
name: write-api
spec:
replicas: 3
strategy:
type: RollingUpdate
template:
spec:
containers:
- name: write-api
image: yourregistry/write-api:latest
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m" |
|
YAML | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| apiVersion: apps/v1
kind: Deployment
metadata:
name: read-api
spec:
replicas: 5 # Больше реплик для read-сервиса, так как запросов обычно больше
strategy:
type: RollingUpdate
template:
spec:
containers:
- name: read-api
image: yourregistry/read-api:latest
resources:
requests:
memory: "256Mi"
cpu: "200m"
limits:
memory: "512Mi"
cpu: "500m" |
|
Особое внимание нужно уделить Outbox-процессору. Я обычно деплою его как отдельный сервис с меньшим количеством реплик:
YAML | 1
2
3
4
5
6
7
8
9
10
11
| apiVersion: apps/v1
kind: Deployment
metadata:
name: outbox-processor
spec:
replicas: 2 # Достаточно небольшого числа реплик
template:
spec:
containers:
- name: outbox-processor
image: yourregistry/outbox-processor:latest |
|
В одном проекте у нас неожиданно случился интересный казус - мы запустили слишком много экземпляров Outbox-процессора, и они начали конкурировать за сообщения, создавая ненужную нагрузку на базу данных. Урок: всегда используйте механизм блокировок, чтобы избежать параллельной обработки одних и тех же сообщений.
Если сравнивать наше решение с альтернативными подходами, стоит обратить внимание на Apache Kafka с Kafka Connect. Этот стек предлагает CDC-коннекторы, которые могут читать из журнала транзакций баз данных и автоматически публиковать изменения в Kafka. В чем преимущества Kafka:
1. Встроенная поддержка партиционирования и масштабирования.
2. Хранение сообщений с возможностью воспроизведения (replay).
3. Гарантированный порядок сообщений в пределах партиции.
Однако, на моем опыте, Kafka намного сложнее в настройке и обслуживании. В небольшом стартапе мы потратили почти месяц на стабилизацию Kafka-кластера. А еще сам Kafka-кластер требует значительных ресурсов - минимум 3 ноды ZooKeeper и 3 ноды Kafka для отказоустойчивой конфигурации. SQL Outbox намного проще внедрить, особенно если у вас уже есть SQL-база. Кроме того, он не требует внедрения дополнительных технологий в инфраструктуру.
Ещё одна проблема, с которой я столкнулся - контроль версий сообщений. Когда вы меняете структуру событий, вам нужно подумать о обратной совместимости. Я использую такой подход:
C# | 1
2
3
4
5
6
7
| public record OrderPlacedEventV2(
string OrderId,
decimal Total,
string CustomerId,
List<OrderItemDto> Items,
string? PromotionCode = null // Новое поле в версии 2
); |
|
И соответствующую логику десериализации:
C# | 1
2
3
4
5
6
7
| object DeserializeEvent(string type, string payload) =>
type switch
{
"OrderPlacedEvent" => JsonSerializer.Deserialize<OrderPlacedEvent>(payload)!,
"OrderPlacedEventV2" => JsonSerializer.Deserialize<OrderPlacedEventV2>(payload)!,
_ => throw new InvalidOperationException($"Unknown event type {type}")
}; |
|
Ещё одна сложность - мониторинг. В распределенной системе сложно понять, где произошла ошибка и почему. Я рекомендую внедрить распределенную трассировку (distributed tracing) с использованием OpenTelemetry:
C# | 1
2
3
4
5
6
7
| builder.Services.AddOpenTelemetry()
.WithTracing(tracerProviderBuilder =>
tracerProviderBuilder
.AddSource("MyCompany.OrderService")
.AddSqlClientInstrumentation()
.AddAspNetCoreInstrumentation()
.AddJaegerExporter()); |
|
Это позволит отслеживать путь запроса через все микросервисы и понимать, где возникают задержки или ошибки.
Одна из самых недооцененых проблем - очистка таблицы Outbox. Помните тот проект с 20 миллионами записей? Мы столкнулись с тем, что массовое удаление старых записей блокировало таблицу и приводило к проблемам с производительностю. Решение - удалять старые записи небольшими пакетами:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| public async Task CleanupOutboxAsync()
{
const int batchSize = 1000;
var threshold = DateTime.UtcNow.AddDays(-30); // Храним события 30 дней
int deleted;
do
{
using var conn = new SqlConnection(_dbConn);
deleted = await conn.ExecuteAsync(
@"WITH OldMessages AS (
SELECT TOP(@batchSize) Id
FROM Outbox
WHERE Processed = 1 AND ProcessedAt < @threshold
)
DELETE FROM Outbox WHERE Id IN (SELECT Id FROM OldMessages)",
new { batchSize, threshold });
await Task.Delay(1000); // Пауза между пакетами
}
while (deleted >= batchSize);
} |
|
Полный рабочий пример
Я создал небольшое, но полнофункциональное приложение для управления заказами, демонстрирующее все описанные концепции. Давайте рассмотрим основные компоненты и то, как они взаимодействуют. Структура решения выглядит так:
C# | 1
2
3
4
5
6
7
8
| OrderSystem/
├─ OrderSystem.Domain/ # Доменная модель и события
├─ OrderSystem.Infrastructure/ # Реализация репозиториев, Outbox
├─ OrderSystem.WriteApi/ # API для обработки команд
├─ OrderSystem.ReadApi/ # API для запросов
├─ OrderSystem.OutboxProcessor/ # Сервис обработки Outbox
├─ OrderSystem.Common/ # Общие интерфейсы и утилиты
└─ OrderSystem.Tests/ # Юнит-тесты и интеграционные тесты |
|
Центральный объект нашей системы - заказ. Вот как выглядит его модель и события:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| // OrderSystem.Domain/Entities/Order.cs
public class Order
{
public string Id { get; }
public string CustomerId { get; }
public decimal Total { get; private set; }
public List<OrderItem> Items { get; }
public OrderStatus Status { get; private set; }
// Конструктор и методы, создающие события
public OrderPlacedEvent ToPlacedEvent() =>
new(Id, CustomerId, Total, Items.Select(i => i.ToDto()).ToList());
public void UpdateStatus(OrderStatus status)
{
// Валидация перехода между статусами
Status = status;
}
}
// События
public record OrderPlacedEvent(string OrderId, string CustomerId,
decimal Total, List<OrderItemDto> Items); |
|
Для обработки команд я использую сервис, интегрированный с Outbox:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| // OrderSystem.Infrastructure/Services/OrderService.cs
public class OrderService : IOrderService
{
private readonly AppDbContext _dbContext;
public async Task<string> PlaceOrderAsync(PlaceOrderCommand command)
{
var order = new Order(Guid.NewGuid().ToString(), command.CustomerId,
command.Items.Select(i => new OrderItem(i.ProductId, i.Quantity, i.Price)).ToList());
using var transaction = await _dbContext.Database.BeginTransactionAsync();
try
{
_dbContext.Orders.Add(order);
// Сохраняем событие в Outbox
var @event = order.ToPlacedEvent();
_dbContext.OutboxMessages.Add(new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = @event.GetType().Name,
EventData = JsonSerializer.Serialize(@event),
CreatedAt = DateTime.UtcNow
});
await _dbContext.SaveChangesAsync();
await transaction.CommitAsync();
return order.Id;
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
} |
|
В одном из проектов я столкнулся с проблемой восстановления после сбоя базы данных. Решение оказалось простым - хранить контрольную сумму события вместе с самим событием:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
| // Улучшеная версия OutboxMessage
public class OutboxMessage
{
// ... обычные поля
public string EventHash { get; set; } = null!;
public static string CalculateHash(string eventData)
{
using var sha = SHA256.Create();
var hash = sha.ComputeHash(Encoding.UTF8.GetBytes(eventData));
return Convert.ToBase64String(hash);
}
} |
|
Для API запросов я использую отдельные DTO и контроллеры:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| // OrderSystem.ReadApi/Controllers/OrdersController.cs
[ApiController]
[Route("api/orders")]
public class OrdersController : ControllerBase
{
private readonly IOrderReadRepository _repository;
[HttpGet("{id}")]
public async Task<ActionResult<OrderReadDto>> GetOrder(string id)
{
var order = await _repository.GetByIdAsync(id);
if (order == null)
return NotFound();
return new OrderReadDto(
order.Id,
order.CustomerId,
order.Total,
order.Status,
order.Items.Select(i => new OrderItemReadDto(
i.ProductId, i.Quantity, i.Price, i.Total)).ToList()
);
}
} |
|
Самое интересное - как это всё работает вместе:
1. Клиент отправляет POST-запрос к WriteApi для создания заказа.
2. OrderService сохраняет Order и OrderPlacedEvent в одной транзакции.
3. OutboxProcessor асинхронно читает события и публикует их в RabbitMQ.
4. Обработчик событий в ReadApi получает событие и обновляет read-модель.
5. Клиент может сразу получить подтверждение размещения заказа, а потом проверять его статус через ReadApi.
Теперь давайте подробнее рассмотрим, как работает OutboxProcessor и его взаимодействие с брокером сообщений:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| // OrderSystem.OutboxProcessor/Worker.cs
public class Worker : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<Worker> _logger;
private readonly IConfiguration _configuration;
private readonly TimeSpan _pollingInterval;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
using var scope = _serviceProvider.CreateScope();
var processor = scope.ServiceProvider.GetRequiredService<IOutboxProcessor>();
try
{
var count = await processor.ProcessPendingMessagesAsync(stoppingToken);
if (count > 0)
_logger.LogInformation("Processed {Count} outbox messages", count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing outbox messages");
}
await Task.Delay(_pollingInterval, stoppingToken);
}
}
} |
|
Для обработки событий я также реализовал механизм повторных попыток:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| // OrderSystem.Infrastructure/Services/OutboxProcessor.cs
public async Task<int> ProcessPendingMessagesAsync(CancellationToken cancellationToken)
{
var messages = await _dbContext.OutboxMessages
.Where(m => !m.Processed && (m.RetryCount < _maxRetries) &&
(m.NextRetryTime == null || m.NextRetryTime <= DateTime.UtcNow))
.OrderBy(m => m.CreatedAt)
.Take(20)
.ToListAsync(cancellationToken);
if (!messages.Any())
return 0;
int processed = 0;
foreach (var message in messages)
{
try
{
// Публикация события
await _eventPublisher.PublishAsync(message.EventType, message.EventData);
message.Processed = true;
message.ProcessedAt = DateTime.UtcNow;
processed++;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to process message {MessageId}", message.Id);
message.Error = ex.Message;
message.RetryCount++;
message.NextRetryTime = DateTime.UtcNow.AddSeconds(Math.Pow(2, message.RetryCount));
}
}
await _dbContext.SaveChangesAsync(cancellationToken);
return processed;
} |
|
Интересный момент в моей реализации - использование шаблона "Разветвление-соединение" (Fork-Join) для параллельной обработки событий:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| public async Task ProcessBatchAsync(IReadOnlyCollection<OutboxMessage> messages)
{
var tasks = messages.Select(async message =>
{
try
{
await _eventPublisher.PublishAsync(message.EventType, message.EventData);
return (message, success: true, error: (string?)null);
}
catch (Exception ex)
{
return (message, success: false, error: ex.Message);
}
});
var results = await Task.WhenAll(tasks);
// Обновляем статусы сообщений в БД
await UpdateMessageStatusesAsync(results);
} |
|
В одном проекте мы столкнулись с проблемой восстановления порядка событий после сбоя. Решение: добавить метаданные о последовательности:
C# | 1
2
3
4
5
6
| public class OutboxMessage
{
// ... другие поля
public string? AggregateId { get; set; }
public long AggregateVersion { get; set; }
} |
|
Это позволяет гарантировать, что события для одного агрегата будут обработаны в правильном порядке, даже если произошел сбой.
Полное решение включает и средства мониторинга. Я добавил статистику по обработке Outbox:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
| // OrderSystem.OutboxProcessor/Metrics.cs
public class OutboxMetrics
{
private readonly IMetricsRoot _metrics;
private readonly Counter _processedCounter;
private readonly Counter _failedCounter;
private readonly Histogram _processingTime;
public void RecordProcessed() => _processedCounter.Increment();
public void RecordFailed() => _failedCounter.Increment();
public void RecordProcessingTime(long milliseconds) =>
_processingTime.Update(milliseconds);
} |
|
Такой подход позволяет отслеживать производительность системы и быстро выявлять проблемы.
Заключение
Работая над крупными проэктами, я заметил, что данный подход становится все более популярным именно в enterprise-секторе. Причина проста - он решает реальные проблемы согласованности данных, с которыми стакиваются разработчики распределеных систем. При этом дает гибкость и возможность независимого масштабирования. Тенденции развития этого подхода, на мой взгляд, будут включать дальнейшую автоматизацию: инструменты для генерации шаблонного кода, библиотеки для упрощения интеграции с брокерами сообщений, системы мониторинга специально для outbox-паттерна.
Мы уже видим, как облачные провайдеры внедряют сервисы, ориентированные на этот паттерн. Например, AWS Step Functions и Azure Durable Functions - хотя они не реализуют Outbox напрямую, но решают схожие проблемы надежной доставки событий.
Будущее этой архитектуры я вижу в еще большей декомпозиции - микросервисы уступят место наносервисам или даже функциям (FaaS), а согласованность между ними будет обеспечиваться именно через такие механизмы, как Outbox.
WebBrowser не поддерживает Event MouseDown и Event MouseUp Здравствуйте, у меня имеется WebBrowser control в windowsFormApp, но он не поддерживает Event... Помогите с паттерном для RegExp Есть такая строка:
<a class="title" href="http://urlik.com/123456">title 123456</a>
Нужно... Не могу разобраться с паттерном Стратегия Здравствуйте. Изучал раньше Java, сейчас хотел попробывать себя в шарпе.
Но столкнулся с одной... Регулярные выражения с переменным паттерном Мне нужно отправлять личные сообщения людям из небольшого списка на одном форуме. За один раз можно... Нужен пример работы с паттерном bridge я тут продолжаю мучать паттерны вот дополз до моста
нашол на этом сайте... Реализовать алгоритм Маерса или Укконена, приближенное сравнение строки с паттерном Доброго дня уважаемые!
Требуется реализовать алгоритм Маерса или Укконена, приближенное сравнение... Является ли срока паттерном RegEx? Есть ли какие-то стандартные средства, которыми можно проверить, является ли пользовательская... Использование стека при работе с паттерном Имеется форма. Нужно выбрать одну из фигур (треугольник, квадрат, круг/эллипс). Можно менять их... Как рисовать на форме? Пользуюсь MVP паттерном День добрый!
У меня или навык гугленья сломался или что-то еще, поэтому прошу помощи реальных... Продумать структуру программы с factory паттерном Дали задание реализовать данную программу при помощью использования фабричного паттерна.
... Нужно сделать расширяемые методы, продемонстрировать работу с паттерном singleton Нужно сделать расширяемые методы, продемонстрировать работу с паттерном singleton.
Так же нужно... Нужно сделать расширяемые методы, продемонстрировать работу с паттерном singleton Дан массив данных о студентах некоторой группы: фамилия, имя, отчество и дата рождения (день,...
|