- 1.70.0 (latest)
- 1.69.0
- 1.68.0
- 1.67.0
- 1.66.2
- 1.65.0
- 1.64.0
- 1.63.1
- 1.62.0
- 1.61.0
- 1.60.0
- 1.59.1
- 1.58.0
- 1.57.1
- 1.54.0
- 1.53.0
- 1.52.0
- 1.51.2
- 1.50.0
- 1.49.0
- 1.48.0
- 1.47.0
- 1.46.0
- 1.45.0
- 1.44.0
- 1.43.0
- 1.42.0
- 1.41.0
- 1.40.0
- 1.39.0
- 1.38.0
- 1.37.0
- 1.36.0
- 1.35.0
- 1.34.1
- 1.33.0
- 1.32.0
- 1.31.0
- 1.30.2
- 1.29.0
- 1.28.0
- 1.27.0
- 1.26.0
- 1.25.0
- 1.24.0
- 1.23.0
- 1.22.0
- 1.21.0
- 1.20.1
- 1.19.0
- 1.18.0
- 1.17.0
- 1.16.0
- 1.15.0
Package managedwriter provides an EXPERIMENTAL thick client around the BigQuery storage API's BigQueryWriteClient. More information about this new write client may also be found in the public documentation: https://cloud.google.com/bigquery/docs/write-api
It is EXPERIMENTAL and subject to change or removal without notice. This is primarily to signal that this package may still make breaking changes to existing methods and functionality.
Currently, this client targets the BigQueryWriteClient present in the v1 endpoint, and is intended as a more feature-rich successor to the classic BigQuery streaming interface, which is presented as the Inserter abstraction in cloud.google.com/go/bigquery, and the tabledata.insertAll method if you're more familiar with the BigQuery v2 REST methods.
Creating a Client
To start working with this package, create a client:
ctx := context.Background() client, err := managedwriter.NewClient(ctx, projectID) if err != nil { // TODO: Handle error. }
Defining the Protocol Buffer Schema
The write functionality of BigQuery Storage requires data to be sent using encoded protocol buffer messages using proto2 wire format. As the protocol buffer is not self-describing, you will need to provide the protocol buffer schema. This is communicated using a DescriptorProto message, defined within the protocol buffer libraries: https://pkg.go.dev/google.golang.org/protobuf/types/descriptorpb#DescriptorProto
More information about protocol buffers can be found in the proto2 language guide: https://developers.google.com/protocol-buffers/docs/proto
Details about data type conversions between BigQuery and protocol buffers can be found in the public documentation: https://cloud.google.com/bigquery/docs/write-api#data_type_conversions
For cases where the protocol buffer is compiled from a static ".proto" definition, this process is straightforward. Instantiate an example message, then convert the descriptor into a descriptor proto:
m := &myprotopackage.MyCompiledMessage{} descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
If the message uses advanced protocol buffer features like nested messages/groups, or enums, the cloud.google.com/go/bigquery/storage/managedwriter/adapt subpackage contains functionality to normalize the descriptor into a self-contained definition:
m := &myprotopackage.MyCompiledMessage{} descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor()) if err != nil { // TODO: Handle error. }
The adapt subpackage also contains functionality for generating a DescriptorProto using a BigQuery table's schema directly.
Constructing a ManagedStream
The ManagedStream handles management of the underlying write connection to the BigQuery Storage service. You can either create a write session explicitly and pass it in, or create the write stream while setting up the ManagedStream.
It's easiest to register the protocol buffer descriptor you'll be using to send data when setting up the managed stream using the WithSchemaDescriptor option, though you can also set/change the schema as part of an append request once the ManagedStream is created.
// Create a ManagedStream using an explicit stream identifer, either a default // stream or one explicitly created by CreateWriteStream. managedStream, err := client.NewManagedStream(ctx, WithStreamName(streamName), WithSchemaDescriptor(descriptorProto)) if err != nil { // TODO: Handle error. }
In addition, NewManagedStream can create new streams implicitly:
// Alternately, allow the ManagedStream to handle stream construction by supplying // additional options. tableName := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", myProject, myDataset, myTable) manageStream, err := client.NewManagedStream(ctx, WithDestinationTable(tableName), WithType(managedwriter.BufferedStream), WithSchemaDescriptor(descriptorProto)) if err != nil { // TODO: Handle error. }
Writing Data
Use the AppendRows function to write one or more serialized proto messages to a stream. You can choose to specify an offset in the stream to handle de-duplication for user-created streams, but a "default" stream neither accepts nor reports offsets.
AppendRows returns a future-like object that blocks until the write is successful or yields an error.
// Define a couple of messages. mesgs := []*myprotopackage.MyCompiledMessage{ { UserName: proto.String("johndoe"), EmailAddress: proto.String("[email protected]", FavoriteNumbers: []proto.Int64{1,42,12345}, }, { UserName: proto.String("janesmith"), EmailAddress: proto.String("[email protected]", FavoriteNumbers: []proto.Int64{1,3,5,7,9}, }, } // Encode the messages into binary format. encoded := make([][]byte, len(mesgs)) for k, v := range mesgs{ b, err := proto.Marshal(v) if err != nil { // TODO: Handle error. } encoded[k] = b } // Send the rows to the service, and specify an offset for managing deduplication. result, err := managedStream.AppendRows(ctx, encoded, WithOffset(0)) // Block until the write is complete and return the result. returnedOffset, err := result.GetResult(ctx) if err != nil { // TODO: Handle error. }
Buffered Stream Management
For Buffered streams, users control when data is made visible in the destination table/stream independently of when it is written. Use FlushRows on the ManagedStream to advance the flush point ahead in the stream.
// We've written 1500+ rows in the stream, and want to advance the flush point // ahead to make the first 1000 rows available. flushOffset, err := managedStream.FlushRows(ctx, 1000)
Pending Stream Management
Pending streams allow users to commit data from multiple streams together once the streams have been finalized, meaning they'll no longer allow further data writes.
// First, finalize the stream we're writing into. totalRows, err := managedStream.Finalize(ctx) if err != nil { // TODO: Handle error. } req := &storagepb.BatchCommitWriteStreamsRequest{ Parent: parentName, WriteStreams: []string{managedStream.StreamName()}, } // Using the client, we can commit data from multple streams to the same // table atomically. resp, err := client.BatchCommitWriteStreams(ctx, req)
Error Handling
Like other Google Cloud services, this API relies on common components that can provide an enhanced set of errors when communicating about the results of API interactions.
Specifically, the apierror package (https://pkg.go.dev/github.com/googleapis/gax-go/v2/apierror) provides convenience methods for extracting structured information about errors.
The BigQuery Storage API service augments applicable errors with service-specific details in the form of a StorageError message. The StorageError message is accessed via the ExtractProtoMessage method in the apierror package. Note that the StorageError messsage does not implement Go's error interface.
An example of accessing the structured error details:
// By way of example, let's assume the response from an append call returns an error. _, err := result.GetResult(ctx) if err != nil { if apiErr, ok := apierror.FromError(err); ok { // We now have an instance of APIError, which directly exposes more specific // details about multiple failure conditions include transport-level errors. storageErr := &storagepb.StorageError{} if e := apiErr.Details().ExtractProtoMessage(storageErr); e != nil { // storageErr now contains service-specific information about the error. log.Printf("Received service-specific error code %s", storageErr.GetCode().String()) } } }
Constants
DetectProjectID
const DetectProjectID = "*detect-project-id*"
DetectProjectID is a sentinel value that instructs NewClient to detect the project ID. It is given in place of the projectID argument. NewClient will use the project ID from the given credentials or the default credentials (https://developers.google.com/accounts/docs/application-default-credentials) if no credentials were provided. When providing credentials, not all options will allow NewClient to extract the project ID. Specifically a JWT does not have the project ID encoded.
NoStreamOffset
const NoStreamOffset int64 = -1
NoStreamOffset is a sentinel value for signalling we're not tracking stream offset (e.g. a default stream which allows simultaneous append streams).
Variables
AppendClientOpenCount, AppendClientOpenRetryCount, AppendRequests, AppendRequestBytes, AppendRequestErrors, AppendRequestRows, AppendResponses, AppendResponseErrors, FlushRequests
var (
// AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)
// AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)
// AppendRequests is a measure of the number of append requests sent.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)
// AppendRequestBytes is a measure of the bytes sent as append requests.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestBytes = stats.Int64(statsPrefix+"append_request_bytes", "Number of bytes sent as append requests", stats.UnitBytes)
// AppendRequestErrors is a measure of the number of append requests that errored on send.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestErrors = stats.Int64(statsPrefix+"append_request_errors", "Number of append requests that yielded immediate error", stats.UnitDimensionless)
// AppendRequestRows is a measure of the number of append rows sent.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless)
// AppendResponses is a measure of the number of append responses received.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)
// AppendResponseErrors is a measure of the number of append responses received with an error attached.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless)
// FlushRequests is a measure of the number of FlushRows requests sent.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless)
)
AppendClientOpenView, AppendClientOpenRetryView, AppendRequestsView, AppendRequestBytesView, AppendRequestErrorsView, AppendRequestRowsView, AppendResponsesView, AppendResponseErrorsView, FlushRequestsView
var (
// AppendClientOpenView is a cumulative sum of AppendClientOpenCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenView *view.View
// AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenRetryView *