-
Notifications
You must be signed in to change notification settings - Fork 3
Expose queue internals #74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Summary of ChangesHello @pitabwire, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the flexibility of the messaging system by exposing the underlying NATS and JetStream client objects through a new Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces an As method to the Queue and Topic interfaces, allowing access to the underlying NATS-specific implementations. This is a valuable addition for users who need to access provider-specific features. The PR also includes several beneficial refactorings, such as using any instead of interface{}, improving performance with RWMutex and buffer pooling, and fixing a bug in timeout handling.
My review focuses on a few critical areas. I've identified a race condition in the new locking mechanism for jetstreamConsumer, a potential panic in the As methods due to unsafe type assertions, and an issue where errors are being swallowed in natsConsumer.ReceiveMessages. Addressing these points will significantly improve the robustness and correctness of the changes.
| func (jc *jetstreamConsumer) setupActiveBatch(ctx context.Context, batchCount int, batchTimeout time.Duration) (jetstream.MessageBatch, error) { | ||
| // Fast path: check if we already have an active batch without write lock | ||
| if batch := jc.getActiveBatch(); batch != nil { | ||
| return batch, nil | ||
| } | ||
|
|
||
| // Check for context cancellation | ||
| err := ctx.Err() | ||
| if err != nil { | ||
| // Check for context cancellation before expensive operations | ||
| if err := ctx.Err(); err != nil { | ||
| return nil, errorutil.Wrap(err, "context canceled while setting up batch") | ||
| } | ||
|
|
||
| // Use Fetch to block for extended periods | ||
| // This provides better behaviour when there are no messages available | ||
| // Acquire write lock for batch creation | ||
| jc.mu.Lock() | ||
| // Double-check after acquiring write lock | ||
| if jc.activeBatch != nil { | ||
| batch := jc.activeBatch | ||
| jc.mu.Unlock() | ||
| return batch, nil | ||
| } | ||
| jc.mu.Unlock() | ||
|
|
||
| // Perform fetch outside the lock to avoid blocking other goroutines | ||
| batch, err := jc.consumer.Fetch(batchCount, jetstream.FetchMaxWait(batchTimeout)) | ||
| if err != nil { | ||
| // Map connection-related errors | ||
| if errors.Is(err, nats.ErrConnectionClosed) || errors.Is(err, nats.ErrConnectionDraining) { | ||
| return nil, errorutil.Wrap(err, "connection issue while setting up batch") | ||
| } | ||
| return nil, errorutil.Wrap(err, "failed to setup fetch from consumer") | ||
| } | ||
|
|
||
| jc.activeBatch = batch | ||
| return jc.activeBatch, nil | ||
| jc.setActiveBatch(batch) | ||
| return batch, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The refactoring of setupActiveBatch to use double-checked locking introduces a race condition. By unlocking before the jc.consumer.Fetch call (line 256), multiple goroutines can pass the checks and execute Fetch concurrently. This can lead to one goroutine's fetched batch overwriting another's when setActiveBatch is called, causing message loss and unpredictable behavior.
The previous implementation, which held a sync.Mutex for the duration of the function, was safe. I recommend reverting to a simpler and safer locking strategy where the write-lock is held from the second check through the assignment of jc.activeBatch to prevent this race.
func (jc *jetstreamConsumer) setupActiveBatch(ctx context.Context, batchCount int, batchTimeout time.Duration) (jetstream.MessageBatch, error) {
// Fast path: check if we already have an active batch with a read lock.
if batch := jc.getActiveBatch(); batch != nil {
return batch, nil
}
// Acquire a write lock to create the batch.
jc.mu.Lock()
defer jc.mu.Unlock()
// Double-check after acquiring the write lock in case another goroutine created it.
if jc.activeBatch != nil {
return jc.activeBatch, nil
}
// Check for context cancellation before the blocking call.
if err := ctx.Err(); err != nil {
return nil, errorutil.Wrap(err, "context canceled while setting up batch")
}
batch, err := jc.consumer.Fetch(batchCount, jetstream.FetchMaxWait(batchTimeout))
if err != nil {
if errors.Is(err, nats.ErrConnectionClosed) || errors.Is(err, nats.ErrConnectionDraining) {
return nil, errorutil.Wrap(err, "connection issue while setting up batch")
}
return nil, errorutil.Wrap(err, "failed to setup fetch from consumer")
}
jc.activeBatch = batch
return jc.activeBatch, nil
}
connections/jetstream.go
Outdated
| if p, ok := i.(*jetstream.JetStream); ok { | ||
| *p = jc.connector.Connection().(*jetstreamConnection).jetStream | ||
| return true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type assertion jc.connector.Connection().(*jetstreamConnection) is not safe and will cause a panic if the underlying Connection type is not *jetstreamConnection. This could happen if the connection is wrapped or if a different implementation is used in the future. This makes the implementation fragile and tightly coupled.
To make this more robust, consider adding a method to the Connection interface to safely access the underlying jetstream.JetStream object, or use a type switch to handle different possible concrete types gracefully.
connections/plain.go
Outdated
| if p, ok := i.(**nats.Conn); ok { | ||
| *p = q.connector.Connection().(*plainConnection).natsConnection | ||
| return true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type assertion q.connector.Connection().(*plainConnection) is unsafe and could lead to a panic if the Connection interface is implemented by a different concrete type or a wrapper. This tight coupling makes the code fragile.
A safer approach would be to avoid direct type assertions on interface values. Consider adding a method to the Connection interface to safely retrieve the underlying *nats.Conn.
| if err != nil { | ||
| // Not an error if we timeout or context is cancelled | ||
| if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) { | ||
| // Just return what we have so far | ||
| return messages, nil | ||
| } | ||
| // For other errors, stop and return the error | ||
| return messages, errorutil.Wrap(err, "error receiving message") | ||
| if len(messages) > 0 { | ||
| return messages, nil | ||
| } | ||
| return nil, errorutil.Wrap(err, "error receiving message") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When q.consumer.NextMsg returns an error (that isn't a timeout), the error is swallowed if len(messages) > 0. The function returns (messages, nil), hiding a potentially critical error like a closed connection from the caller. According to the gocloud.dev/pubsub/driver documentation, ReceiveBatch can return both messages and an error to indicate that the source is unhealthy.
You should return the received messages along with the error to correctly signal the problem to the caller. This allows the client to process the valid messages while also being aware of the underlying issue.
| if err != nil { | |
| // Not an error if we timeout or context is cancelled | |
| if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) { | |
| // Just return what we have so far | |
| return messages, nil | |
| } | |
| // For other errors, stop and return the error | |
| return messages, errorutil.Wrap(err, "error receiving message") | |
| if len(messages) > 0 { | |
| return messages, nil | |
| } | |
| return nil, errorutil.Wrap(err, "error receiving message") | |
| } | |
| if err != nil { | |
| if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) { | |
| // It's not an error if we time out; just return what we have. | |
| return messages, nil | |
| } | |
| // For other errors, return them with any messages we have. | |
| // The caller can process the messages and still see the error. | |
| return messages, errorutil.Wrap(err, "error receiving message") | |
| } |
Please reference any Issue related to this Pull Request. Example:
Fixes #1.See
here
for tips on good Pull Request description. However don't overthink it ship faster.