Skip to content

Conversation

@pitabwire
Copy link
Owner

Please reference any Issue related to this Pull Request. Example: Fixes #1.

See
here
for tips on good Pull Request description. However don't overthink it ship faster.

@gemini-code-assist
Copy link

Summary of Changes

Hello @pitabwire, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the flexibility of the messaging system by exposing the underlying NATS and JetStream client objects through a new As(any) method on the Queue and Topic interfaces. This change empowers advanced users to leverage specific client features directly. Alongside this, the PR delivers notable performance and concurrency improvements in message handling, refines error wrapping, and updates type declarations to modern Go standards, ensuring a more robust and efficient system.

Highlights

  • Exposed Internals via As Method: Introduced an As(any) method to the Queue and Topic interfaces, allowing users to access the underlying NATS and JetStream client objects directly. This provides greater flexibility for advanced use cases not covered by the generic gocloud.dev/pubsub interfaces.
  • Concurrency and Performance Improvements: Refactored jetstreamConsumer's locking mechanism from sync.Mutex to sync.RWMutex and optimized setupActiveBatch for better concurrency. The encodeV1Message function now utilizes sync.Pool for bytes.Buffer to reduce memory allocations during message encoding. The natsConsumer.ReceiveMessages method has been improved for more efficient message fetching, including a short poll for subsequent messages.
  • Type Modernization: Updated various interface{} type assertions and parameters to any across the codebase, aligning with Go 1.18+ best practices for improved readability and maintainability.
  • Corrected Timeout Handling: Fixed an issue in plainConnection.CreateSubscription where batchFetchTimeout was incorrectly using opts.ConsumerConfig.MaxRequestExpires instead of opts.ReceiveWaitTimeOut.
  • Comprehensive Testing for As Methods: Added a new test file, connections/as_test.go, with extensive test cases to ensure the correct functionality and behavior of the newly introduced As methods across different connection types (plain NATS, JetStream) and components (topics, consumers).

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an As method to the Queue and Topic interfaces, allowing access to the underlying NATS-specific implementations. This is a valuable addition for users who need to access provider-specific features. The PR also includes several beneficial refactorings, such as using any instead of interface{}, improving performance with RWMutex and buffer pooling, and fixing a bug in timeout handling.

My review focuses on a few critical areas. I've identified a race condition in the new locking mechanism for jetstreamConsumer, a potential panic in the As methods due to unsafe type assertions, and an issue where errors are being swallowed in natsConsumer.ReceiveMessages. Addressing these points will significantly improve the robustness and correctness of the changes.

Comment on lines 237 to 269
func (jc *jetstreamConsumer) setupActiveBatch(ctx context.Context, batchCount int, batchTimeout time.Duration) (jetstream.MessageBatch, error) {
// Fast path: check if we already have an active batch without write lock
if batch := jc.getActiveBatch(); batch != nil {
return batch, nil
}

// Check for context cancellation
err := ctx.Err()
if err != nil {
// Check for context cancellation before expensive operations
if err := ctx.Err(); err != nil {
return nil, errorutil.Wrap(err, "context canceled while setting up batch")
}

// Use Fetch to block for extended periods
// This provides better behaviour when there are no messages available
// Acquire write lock for batch creation
jc.mu.Lock()
// Double-check after acquiring write lock
if jc.activeBatch != nil {
batch := jc.activeBatch
jc.mu.Unlock()
return batch, nil
}
jc.mu.Unlock()

// Perform fetch outside the lock to avoid blocking other goroutines
batch, err := jc.consumer.Fetch(batchCount, jetstream.FetchMaxWait(batchTimeout))
if err != nil {
// Map connection-related errors
if errors.Is(err, nats.ErrConnectionClosed) || errors.Is(err, nats.ErrConnectionDraining) {
return nil, errorutil.Wrap(err, "connection issue while setting up batch")
}
return nil, errorutil.Wrap(err, "failed to setup fetch from consumer")
}

jc.activeBatch = batch
return jc.activeBatch, nil
jc.setActiveBatch(batch)
return batch, nil
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The refactoring of setupActiveBatch to use double-checked locking introduces a race condition. By unlocking before the jc.consumer.Fetch call (line 256), multiple goroutines can pass the checks and execute Fetch concurrently. This can lead to one goroutine's fetched batch overwriting another's when setActiveBatch is called, causing message loss and unpredictable behavior.

The previous implementation, which held a sync.Mutex for the duration of the function, was safe. I recommend reverting to a simpler and safer locking strategy where the write-lock is held from the second check through the assignment of jc.activeBatch to prevent this race.

func (jc *jetstreamConsumer) setupActiveBatch(ctx context.Context, batchCount int, batchTimeout time.Duration) (jetstream.MessageBatch, error) {
	// Fast path: check if we already have an active batch with a read lock.
	if batch := jc.getActiveBatch(); batch != nil {
		return batch, nil
	}

	// Acquire a write lock to create the batch.
	jc.mu.Lock()
	defer jc.mu.Unlock()

	// Double-check after acquiring the write lock in case another goroutine created it.
	if jc.activeBatch != nil {
		return jc.activeBatch, nil
	}

	// Check for context cancellation before the blocking call.
	if err := ctx.Err(); err != nil {
		return nil, errorutil.Wrap(err, "context canceled while setting up batch")
	}

	batch, err := jc.consumer.Fetch(batchCount, jetstream.FetchMaxWait(batchTimeout))
	if err != nil {
		if errors.Is(err, nats.ErrConnectionClosed) || errors.Is(err, nats.ErrConnectionDraining) {
			return nil, errorutil.Wrap(err, "connection issue while setting up batch")
		}
		return nil, errorutil.Wrap(err, "failed to setup fetch from consumer")
	}

	jc.activeBatch = batch
	return jc.activeBatch, nil
}

Comment on lines 187 to 189
if p, ok := i.(*jetstream.JetStream); ok {
*p = jc.connector.Connection().(*jetstreamConnection).jetStream
return true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The type assertion jc.connector.Connection().(*jetstreamConnection) is not safe and will cause a panic if the underlying Connection type is not *jetstreamConnection. This could happen if the connection is wrapped or if a different implementation is used in the future. This makes the implementation fragile and tightly coupled.

To make this more robust, consider adding a method to the Connection interface to safely access the underlying jetstream.JetStream object, or use a type switch to handle different possible concrete types gracefully.

Comment on lines 195 to 197
if p, ok := i.(**nats.Conn); ok {
*p = q.connector.Connection().(*plainConnection).natsConnection
return true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The type assertion q.connector.Connection().(*plainConnection) is unsafe and could lead to a panic if the Connection interface is implemented by a different concrete type or a wrapper. This tight coupling makes the code fragile.

A safer approach would be to avoid direct type assertions on interface values. Consider adding a method to the Connection interface to safely retrieve the underlying *nats.Conn.

Comment on lines 286 to 294
if err != nil {
// Not an error if we timeout or context is cancelled
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
// Just return what we have so far
return messages, nil
}
// For other errors, stop and return the error
return messages, errorutil.Wrap(err, "error receiving message")
if len(messages) > 0 {
return messages, nil
}
return nil, errorutil.Wrap(err, "error receiving message")
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When q.consumer.NextMsg returns an error (that isn't a timeout), the error is swallowed if len(messages) > 0. The function returns (messages, nil), hiding a potentially critical error like a closed connection from the caller. According to the gocloud.dev/pubsub/driver documentation, ReceiveBatch can return both messages and an error to indicate that the source is unhealthy.

You should return the received messages along with the error to correctly signal the problem to the caller. This allows the client to process the valid messages while also being aware of the underlying issue.

Suggested change
if err != nil {
// Not an error if we timeout or context is cancelled
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
// Just return what we have so far
return messages, nil
}
// For other errors, stop and return the error
return messages, errorutil.Wrap(err, "error receiving message")
if len(messages) > 0 {
return messages, nil
}
return nil, errorutil.Wrap(err, "error receiving message")
}
if err != nil {
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
// It's not an error if we time out; just return what we have.
return messages, nil
}
// For other errors, return them with any messages we have.
// The caller can process the messages and still see the error.
return messages, errorutil.Wrap(err, "error receiving message")
}

@pitabwire pitabwire merged commit 9bc8403 into main Dec 18, 2025
7 checks passed
@pitabwire pitabwire deleted the expose-queue-internals branch December 18, 2025 07:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants