-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Open
Labels
Needs: investigation 🔍Issue that needs investigationIssue that needs investigation
Milestone
Description
We've encounted QueueCacheMissException in production where it happens infrequently and at seemingly random times.
I've managed to replicate it in the test code below, by basically taking the default configuration for streams and dividing it by 300s in order to maintain the original ratio.
[Fact]
public async Task Test()
{
using var fixture = new ClusterFixture();
var client = fixture.SiloHost.Services.GetRequiredService<IClusterClient>();
var logger = fixture.SiloHost.Services.GetRequiredService<ILoggerFactory>().CreateLogger("Logger");
var streamProvider = client.GetStreamProvider("SimpleMemoryStreamProvider");
var stream = streamProvider.GetStream<int>("TestStream", Guid.Empty);
foreach (var i in Enumerable.Range(6, 6))
{
logger.LogInformation("Sending {Value}...", i);
await Task.Delay(TimeSpan.FromSeconds(i));
await stream.OnNextAsync(i);
}
}
[ImplicitStreamSubscription("TestStream")]
public class TestGrain : IGrainBase, IGrainWithGuidKey
{
private readonly ILogger<TestGrain> _logger;
private readonly IGrainContext _grainContext;
public TestGrain(ILogger<TestGrain> logger, IGrainContext grainContext)
{
_logger = logger;
_grainContext = grainContext;
}
public IGrainContext GrainContext => _grainContext;
public async Task OnActivateAsync(CancellationToken token)
{
var streamProvider = this.GetStreamProvider("SimpleMemoryStreamProvider");
var stream = streamProvider.GetStream<int>("TestStream", this.GetPrimaryKey());
await stream.SubscribeAsync(
(value, token) =>
{
_logger.LogInformation("Received {Value}", value);
return Task.CompletedTask;
},
ex =>
{
_logger.LogError(ex, "A stream error has occurred.");
return Task.CompletedTask;
});
}
}
public class ClusterFixture : IDisposable
{
public ClusterFixture()
{
var builder = new TestClusterBuilder(1);
builder.AddSiloBuilderConfigurator<TestSiloConfig>();
Cluster = builder.Build();
Cluster.Deploy();
SiloHost = ((InProcessSiloHandle)Cluster.Primary).SiloHost;
}
public TestCluster Cluster { get; private set; }
public IHost SiloHost { get; private set; }
public void Dispose()
{
Cluster.StopAllSilos();
Cluster.Dispose();
GC.SuppressFinalize(this);
}
private class TestSiloConfig : ISiloConfigurator
{
public void Configure(ISiloBuilder hostBuilder)
{
hostBuilder
.ConfigureLogging(logger => logger.AddDebug())
.ConfigureServices(services =>
{
services
.Configure<StreamCacheEvictionOptions>("SimpleMemoryStreamProvider", options =>
{
options.DataMaxAgeInCache = TimeSpan.FromSeconds(6);
options.DataMinTimeInCache = TimeSpan.FromSeconds(1);
});
});
hostBuilder
.AddMemoryGrainStorageAsDefault()
.AddMemoryGrainStorage(ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME)
.AddMemoryStreams<DefaultMemoryMessageBodySerializer>("SimpleMemoryStreamProvider", configure =>
{
configure.ConfigurePullingAgent(ob => ob.Configure(options =>
{
options.StreamInactivityPeriod = TimeSpan.FromSeconds(6);
}));
configure.ConfigureStreamPubSub(Orleans.Streams.StreamPubSubType.ImplicitOnly);
});
}
}
}23:46:08:475 Logger: Information: Sending 6...
23:46:14:483 Logger: Information: Sending 7...
23:46:14:734 TestGrain: Information: Received 6
23:46:21:495 Logger: Information: Sending 8...
23:46:29:492 Logger: Information: Sending 9...
23:46:29:492 TestGrain: Error: A stream error has occurred.
23:46:29:492
23:46:29:492 Orleans.Streams.QueueCacheMissException: Item not found in cache. Requested: [EventSequenceToken: SeqNum=638111979680907519, EventIndex=0], Low: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0], High: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0]
23:46:29:492 at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
23:46:29:492 at Orleans.Providers.Streams.Common.PooledQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 118
23:46:29:492 at Orleans.Providers.MemoryPooledCache`1.Cursor..ctor(PooledQueueCache cache, StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 107
23:46:29:492 at Orleans.Providers.MemoryPooledCache`1.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 170
23:46:29:492 at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 305
23:46:29:492 TestGrain: Information: Received 8
23:46:38:491 Logger: Information: Sending 10...
23:46:43:738 The thread 0x3ec8 has exited with code 0 (0x0).
23:46:48:498 Logger: Information: Sending 11...
23:46:48:498 TestGrain: Error: A stream error has occurred.
23:46:48:498
23:46:48:498 Orleans.Streams.QueueCacheMissException: Item not found in cache. Requested: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0], Low: [EventSequenceToken: SeqNum=638111979680907523, EventIndex=0], High: [EventSequenceToken: SeqNum=638111979680907523, EventIndex=0]
23:46:48:498 at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
23:46:48:498 at Orleans.Providers.Streams.Common.PooledQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 118
23:46:48:498 at Orleans.Providers.MemoryPooledCache`1.Cursor..ctor(PooledQueueCache cache, StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 107
23:46:48:498 at Orleans.Providers.MemoryPooledCache`1.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 170
23:46:48:498 at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 305
23:46:48:498 TestGrain: Information: Received 10
flash2048, Bohdandn, larsfjerm and intens-pavlas
Metadata
Metadata
Assignees
Labels
Needs: investigation 🔍Issue that needs investigationIssue that needs investigation