Skip to content

QueueCacheMissException: Item not found in cache. #8306

@wassim-k

Description

@wassim-k

We've encounted QueueCacheMissException in production where it happens infrequently and at seemingly random times.
I've managed to replicate it in the test code below, by basically taking the default configuration for streams and dividing it by 300s in order to maintain the original ratio.

[Fact]
public async Task Test()
{
    using var fixture = new ClusterFixture();
    var client = fixture.SiloHost.Services.GetRequiredService<IClusterClient>();
    var logger = fixture.SiloHost.Services.GetRequiredService<ILoggerFactory>().CreateLogger("Logger");
    var streamProvider = client.GetStreamProvider("SimpleMemoryStreamProvider");
    var stream = streamProvider.GetStream<int>("TestStream", Guid.Empty);

    foreach (var i in Enumerable.Range(6, 6))
    {
        logger.LogInformation("Sending {Value}...", i);
        await Task.Delay(TimeSpan.FromSeconds(i));
        await stream.OnNextAsync(i);
    }
}

[ImplicitStreamSubscription("TestStream")]
public class TestGrain : IGrainBase, IGrainWithGuidKey
{
    private readonly ILogger<TestGrain> _logger;
    private readonly IGrainContext _grainContext;

    public TestGrain(ILogger<TestGrain> logger, IGrainContext grainContext)
    {
        _logger = logger;
        _grainContext = grainContext;
    }

    public IGrainContext GrainContext => _grainContext;

    public async Task OnActivateAsync(CancellationToken token)
    {
        var streamProvider = this.GetStreamProvider("SimpleMemoryStreamProvider");
        var stream = streamProvider.GetStream<int>("TestStream", this.GetPrimaryKey());

        await stream.SubscribeAsync(
            (value, token) =>
            {
                _logger.LogInformation("Received {Value}", value);
                return Task.CompletedTask;
            },
            ex =>
            {
                _logger.LogError(ex, "A stream error has occurred.");
                return Task.CompletedTask;
            });
    }
}

public class ClusterFixture : IDisposable
{
    public ClusterFixture()
    {
        var builder = new TestClusterBuilder(1);

        builder.AddSiloBuilderConfigurator<TestSiloConfig>();

        Cluster = builder.Build();
        Cluster.Deploy();
        SiloHost = ((InProcessSiloHandle)Cluster.Primary).SiloHost;
    }

    public TestCluster Cluster { get; private set; }

    public IHost SiloHost { get; private set; }

    public void Dispose()
    {
        Cluster.StopAllSilos();
        Cluster.Dispose();
        GC.SuppressFinalize(this);
    }

    private class TestSiloConfig : ISiloConfigurator
    {
        public void Configure(ISiloBuilder hostBuilder)
        {
            hostBuilder
                .ConfigureLogging(logger => logger.AddDebug())
                .ConfigureServices(services =>
                {
                    services
                        .Configure<StreamCacheEvictionOptions>("SimpleMemoryStreamProvider", options =>
                        {
                            options.DataMaxAgeInCache = TimeSpan.FromSeconds(6);
                            options.DataMinTimeInCache = TimeSpan.FromSeconds(1);
                        });
                });

            hostBuilder
                .AddMemoryGrainStorageAsDefault()
                .AddMemoryGrainStorage(ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME)
                .AddMemoryStreams<DefaultMemoryMessageBodySerializer>("SimpleMemoryStreamProvider", configure =>
                {
                    configure.ConfigurePullingAgent(ob => ob.Configure(options =>
                    {
                        options.StreamInactivityPeriod = TimeSpan.FromSeconds(6);
                    }));

                    configure.ConfigureStreamPubSub(Orleans.Streams.StreamPubSubType.ImplicitOnly);
                });
        }
    }
}
23:46:08:475	Logger: Information: Sending 6...
23:46:14:483	Logger: Information: Sending 7...
23:46:14:734	TestGrain: Information: Received 6
23:46:21:495	Logger: Information: Sending 8...
23:46:29:492	Logger: Information: Sending 9...
23:46:29:492	TestGrain: Error: A stream error has occurred.
23:46:29:492	
23:46:29:492	Orleans.Streams.QueueCacheMissException: Item not found in cache.  Requested: [EventSequenceToken: SeqNum=638111979680907519, EventIndex=0], Low: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0], High: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0]
23:46:29:492	   at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
23:46:29:492	   at Orleans.Providers.Streams.Common.PooledQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 118
23:46:29:492	   at Orleans.Providers.MemoryPooledCache`1.Cursor..ctor(PooledQueueCache cache, StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 107
23:46:29:492	   at Orleans.Providers.MemoryPooledCache`1.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 170
23:46:29:492	   at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 305
23:46:29:492	TestGrain: Information: Received 8
23:46:38:491	Logger: Information: Sending 10...
23:46:43:738	The thread 0x3ec8 has exited with code 0 (0x0).
23:46:48:498	Logger: Information: Sending 11...
23:46:48:498	TestGrain: Error: A stream error has occurred.
23:46:48:498	
23:46:48:498	Orleans.Streams.QueueCacheMissException: Item not found in cache.  Requested: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0], Low: [EventSequenceToken: SeqNum=638111979680907523, EventIndex=0], High: [EventSequenceToken: SeqNum=638111979680907523, EventIndex=0]
23:46:48:498	   at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
23:46:48:498	   at Orleans.Providers.Streams.Common.PooledQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 118
23:46:48:498	   at Orleans.Providers.MemoryPooledCache`1.Cursor..ctor(PooledQueueCache cache, StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 107
23:46:48:498	   at Orleans.Providers.MemoryPooledCache`1.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 170
23:46:48:498	   at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 305
23:46:48:498	TestGrain: Information: Received 10

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions