20
20
import com .google .cloud .bigquery .TableDefinition ;
21
21
import com .google .cloud .bigquery .TableId ;
22
22
import com .google .cloud .bigquery .TableInfo ;
23
+ import com .google .cloud .bigquery .ViewDefinition ;
23
24
import com .google .common .collect .ImmutableList ;
24
25
import com .google .common .collect .ImmutableMap ;
25
26
import com .google .common .collect .ImmutableSet ;
33
34
import io .trino .spi .connector .ConnectorTableHandle ;
34
35
import io .trino .spi .connector .ConnectorTableMetadata ;
35
36
import io .trino .spi .connector .ConnectorTableProperties ;
37
+ import io .trino .spi .connector .ConnectorTransactionHandle ;
36
38
import io .trino .spi .connector .Constraint ;
37
39
import io .trino .spi .connector .ConstraintApplicationResult ;
40
+ import io .trino .spi .connector .InMemoryRecordSet ;
38
41
import io .trino .spi .connector .LimitApplicationResult ;
39
42
import io .trino .spi .connector .NotFoundException ;
40
43
import io .trino .spi .connector .ProjectionApplicationResult ;
44
+ import io .trino .spi .connector .RecordCursor ;
41
45
import io .trino .spi .connector .SchemaTableName ;
42
46
import io .trino .spi .connector .SchemaTablePrefix ;
47
+ import io .trino .spi .connector .SystemTable ;
43
48
import io .trino .spi .connector .TableNotFoundException ;
44
49
import io .trino .spi .expression .ConnectorExpression ;
45
50
import io .trino .spi .predicate .TupleDomain ;
51
+ import io .trino .spi .type .Type ;
52
+ import io .trino .spi .type .VarcharType ;
46
53
47
54
import javax .inject .Inject ;
48
55
49
56
import java .util .List ;
50
57
import java .util .Map ;
51
58
import java .util .Optional ;
52
59
import java .util .Set ;
60
+ import java .util .function .Function ;
53
61
54
62
import static com .google .cloud .bigquery .TableDefinition .Type .TABLE ;
55
63
import static com .google .cloud .bigquery .TableDefinition .Type .VIEW ;
@@ -66,6 +74,7 @@ public class BigQueryMetadata
66
74
static final int NUMERIC_DATA_TYPE_PRECISION = 38 ;
67
75
static final int NUMERIC_DATA_TYPE_SCALE = 9 ;
68
76
static final String INFORMATION_SCHEMA = "information_schema" ;
77
+ private static final String VIEW_DEFINITION_SYSTEM_TABLE_SUFFIX = "$view_definition" ;
69
78
70
79
private final BigQueryClient bigQueryClient ;
71
80
private final String projectId ;
@@ -160,6 +169,32 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
160
169
return new ConnectorTableMetadata (schemaTableName , columns );
161
170
}
162
171
172
+ @ Override
173
+ public Optional <SystemTable > getSystemTable (ConnectorSession session , SchemaTableName tableName )
174
+ {
175
+ if (isViewDefinitionSystemTable (tableName )) {
176
+ return getViewDefinitionSystemTable (tableName , getViewDefinitionSourceTableName (tableName ));
177
+ }
178
+ return Optional .empty ();
179
+ }
180
+
181
+ private Optional <SystemTable > getViewDefinitionSystemTable (SchemaTableName viewDefinitionTableName , SchemaTableName sourceTableName )
182
+ {
183
+ TableInfo tableInfo = getBigQueryTable (sourceTableName );
184
+ if (tableInfo == null || !(tableInfo .getDefinition () instanceof ViewDefinition )) {
185
+ throw new TableNotFoundException (viewDefinitionTableName );
186
+ }
187
+
188
+ List <ColumnMetadata > columns = ImmutableList .of (new ColumnMetadata ("query" , VarcharType .VARCHAR ));
189
+ List <Type > types = columns .stream ()
190
+ .map (ColumnMetadata ::getType )
191
+ .collect (toImmutableList ());
192
+ Optional <String > query = Optional .ofNullable (((ViewDefinition ) tableInfo .getDefinition ()).getQuery ());
193
+ Iterable <List <Object >> propertyValues = ImmutableList .of (ImmutableList .of (query .orElse ("NULL" )));
194
+
195
+ return Optional .of (createSystemTable (new ConnectorTableMetadata (sourceTableName , columns ), constraint -> new InMemoryRecordSet (types , propertyValues ).cursor ()));
196
+ }
197
+
163
198
@ Override
164
199
public Map <String , ColumnHandle > getColumnHandles (ConnectorSession session , ConnectorTableHandle tableHandle )
165
200
{
@@ -311,4 +346,41 @@ private static boolean containSameElements(Iterable<? extends ColumnHandle> firs
311
346
{
312
347
return ImmutableSet .copyOf (first ).equals (ImmutableSet .copyOf (second ));
313
348
}
349
+
350
+ private static boolean isViewDefinitionSystemTable (SchemaTableName table )
351
+ {
352
+ return table .getTableName ().endsWith (VIEW_DEFINITION_SYSTEM_TABLE_SUFFIX ) &&
353
+ (table .getTableName ().length () > VIEW_DEFINITION_SYSTEM_TABLE_SUFFIX .length ());
354
+ }
355
+
356
+ private static SchemaTableName getViewDefinitionSourceTableName (SchemaTableName table )
357
+ {
358
+ return new SchemaTableName (
359
+ table .getSchemaName (),
360
+ table .getTableName ().substring (0 , table .getTableName ().length () - VIEW_DEFINITION_SYSTEM_TABLE_SUFFIX .length ()));
361
+ }
362
+
363
+ private static SystemTable createSystemTable (ConnectorTableMetadata metadata , Function <TupleDomain <Integer >, RecordCursor > cursor )
364
+ {
365
+ return new SystemTable ()
366
+ {
367
+ @ Override
368
+ public Distribution getDistribution ()
369
+ {
370
+ return Distribution .SINGLE_COORDINATOR ;
371
+ }
372
+
373
+ @ Override
374
+ public ConnectorTableMetadata getTableMetadata ()
375
+ {
376
+ return metadata ;
377
+ }
378
+
379
+ @ Override
380
+ public RecordCursor cursor (ConnectorTransactionHandle transactionHandle , ConnectorSession session , TupleDomain <Integer > constraint )
381
+ {
382
+ return cursor .apply (constraint );
383
+ }
384
+ };
385
+ }
314
386
}
0 commit comments