Change feed
React to container changes with IItemChangeFeedProcessor and the AspNetCore hosted service.
Cosmos DB exposes a change feed — a persistent log of every create / update on a container. Cosmos Repository wraps the change-feed processor so you can:
- Raise events when items change.
- Replicate data into another container (perhaps with a different partitioning strategy).
- Replicate data into another store (SQL, search index, message bus, …).
Listening to changes
Two pieces wire change-feed processing into your app: the container configuration that opts an IItem into change-feed monitoring, and a processor that handles each change.
1. Configure the container
Call WithChangeFeedMonitoring on the container builder for any IItem you want to track:
WebApplicationBuilder builder = WebApplication.CreateBuilder(args);
builder.Services.AddCosmosRepository(options =>{ options.DatabaseId = "change-feed-demo"; options.ContainerPerItemType = true;
options.ContainerBuilder.Configure<Book>(containerOptions => { containerOptions.WithContainer(BookDemoConstants.Container); containerOptions.WithPartitionKey(BookDemoConstants.PartitionKey);
containerOptions.WithChangeFeedMonitoring(); });});WithChangeFeedMonitoring(Action<ChangeFeedOptions>?) accepts an optional configuration callback exposing:
| Property | Default | Notes |
|---|---|---|
InstanceName | "default" | Lets multiple processor instances cooperate. |
ProcessorName | "cosmos-repository-pattern-processor" | Name written to the lease container. |
PollInterval | null | How often to poll for new changes. |
StartTime | null | Start the feed from a UTC DateTime (must be Kind.Utc). |
2. Implement a processor
Each IItem that opts into the change feed needs a processor implementing IItemChangeFeedProcessor<TItem>:
public class BookChangeFeedProcessor : IItemChangeFeedProcessor<Book>{ private readonly ILogger<BookChangeFeedProcessor> _logger; private readonly IRepository<BookByIdReference> _bookByIdReferenceRepository;
public BookChangeFeedProcessor( ILogger<BookChangeFeedProcessor> logger, IRepository<BookByIdReference> bookByIdReferenceRepository) { _logger = logger; _bookByIdReferenceRepository = bookByIdReferenceRepository; }
public async ValueTask HandleAsync(Book book, CancellationToken cancellationToken) { _logger.LogInformation("Change detected for book with ID: {BookId}", book.Id);
if (!book.HasBeenUpdated) { await _bookByIdReferenceRepository.CreateAsync( new BookByIdReference(book.Id, book.Category), cancellationToken); }
_logger.LogInformation("Processed change for book with ID: {BookId}", book.Id); }}3. Host the processor in ASP.NET Core
The Microsoft.Azure.CosmosRepository.AspNetCore package wires up an IHostedService that scans for IItemChangeFeedProcessor<TItem> registrations and starts them with the host:
using Microsoft.Azure.CosmosRepository.AspNetCore.Extensions;
builder.Services .AddCosmosRepositoryItemChangeFeedProcessors(typeof(BookChangeFeedProcessor).Assembly) .AddCosmosRepositoryChangeFeedHostedService();AddCosmosRepositoryItemChangeFeedProcessors registers every concrete IItemChangeFeedProcessor<> in the supplied assemblies as singletons. AddCosmosRepositoryChangeFeedHostedService registers the CosmosRepositoryChangeFeedHostedService that owns the processor lifecycle.
Lifecycle
- On startup the hosted service starts each
IContainerChangeFeedProcessor(one per change-feed-enabled container). - Each processor pulls changes from Cosmos and dispatches them to your
HandleAsyncimplementation. - On shutdown the hosted service stops every processor cleanly.
Reference
IItemChangeFeedProcessor<TItem>—ValueTask HandleAsync(TItem item, CancellationToken).IContainerChangeFeedProcessor— exposesStartAsync(),StopAsync(), and theItemTypesit handles.CosmosRepositoryChangeFeedHostedService(AspNetCore package).
Working samples: