22from enum import Enum
33from functools import cache
44
5+ from clickhouse_connect import get_client
6+ from clickhouse_connect .driver import Client as HttpClient , httputil
57from clickhouse_driver import Client as SyncClient
68from clickhouse_pool import ChPool
79from django .conf import settings
@@ -19,7 +21,96 @@ class Workload(Enum):
1921_default_workload = Workload .ONLINE
2022
2123
22- def get_pool (workload : Workload , team_id = None , readonly = False ):
24+ class ProxyClient :
25+ def __init__ (self , client : HttpClient ):
26+ self ._client = client
27+
28+ def execute (
29+ self ,
30+ query ,
31+ params = None ,
32+ with_column_types = False ,
33+ external_tables = None ,
34+ query_id = None ,
35+ settings = None ,
36+ types_check = False ,
37+ columnar = False ,
38+ ):
39+ if query_id :
40+ settings ["query_id" ] = query_id
41+ result = self ._client .query (query = query , parameters = params , settings = settings , column_oriented = columnar )
42+
43+ # we must play with result summary here
44+ written_rows = int (result .summary .get ("written_rows" , 0 ))
45+ if written_rows > 0 :
46+ return written_rows
47+ if with_column_types :
48+ column_types_driver_format = list (zip (result .column_names , result .column_types ))
49+ return result .result_set , column_types_driver_format
50+ return result .result_set
51+
52+ # Implement methods for session managment: https://peps.python.org/pep-0343/ so ProxyClient can be used in all places a clickhouse_driver.Client is.
53+ def __enter__ (self ):
54+ return self
55+
56+ def __exit__ (self , * args ):
57+ pass
58+
59+
60+ _clickhouse_http_pool_mgr = httputil .get_pool_manager (
61+ maxsize = settings .CLICKHOUSE_CONN_POOL_MAX , # max number of open connection per pool
62+ block = True , # makes the maxsize limit per pool, keeps connections
63+ num_pools = 12 , # number of pools
64+ )
65+
66+
67+ def get_http_client (** overrides ):
68+ kwargs = {
69+ "host" : settings .CLICKHOUSE_HOST ,
70+ "database" : settings .CLICKHOUSE_DATABASE ,
71+ "secure" : settings .CLICKHOUSE_SECURE ,
72+ "username" : settings .CLICKHOUSE_USER ,
73+ "password" : settings .CLICKHOUSE_PASSWORD ,
74+ "ca_cert" : settings .CLICKHOUSE_CA ,
75+ "verify" : settings .CLICKHOUSE_VERIFY ,
76+ "settings" : {"mutations_sync" : "1" } if settings .TEST else {},
77+ # Without this, OPTIMIZE table and other queries will regularly run into timeouts
78+ "send_receive_timeout" : 30 if settings .TEST else 999_999_999 ,
79+ "autogenerate_session_id" : True , # beware, this makes each query to run in a separate session - no temporary tables will work
80+ "pool_mgr" : _clickhouse_http_pool_mgr ,
81+ ** overrides ,
82+ }
83+ return ProxyClient (get_client (** kwargs ))
84+
85+
86+ def get_client_from_pool (workload : Workload = Workload .DEFAULT , team_id = None , readonly = False ):
87+ """
88+ Returns the client for a given workload.
89+
90+ The connection pool for HTTP is managed by a library.
91+ """
92+ if settings .CLICKHOUSE_USE_HTTP :
93+ if team_id is not None and str (team_id ) in settings .CLICKHOUSE_PER_TEAM_SETTINGS :
94+ return get_http_client (** settings .CLICKHOUSE_PER_TEAM_SETTINGS [str (team_id )])
95+
96+ # Note that `readonly` does nothing if the relevant vars are not set!
97+ if readonly and settings .READONLY_CLICKHOUSE_USER is not None and settings .READONLY_CLICKHOUSE_PASSWORD :
98+ return get_http_client (
99+ username = settings .READONLY_CLICKHOUSE_USER ,
100+ password = settings .READONLY_CLICKHOUSE_PASSWORD ,
101+ )
102+
103+ if (
104+ workload == Workload .OFFLINE or workload == Workload .DEFAULT and _default_workload == Workload .OFFLINE
105+ ) and settings .CLICKHOUSE_OFFLINE_CLUSTER_HOST is not None :
106+ return get_http_client (host = settings .CLICKHOUSE_OFFLINE_CLUSTER_HOST , verify = False )
107+
108+ return get_http_client ()
109+
110+ return get_pool (workload = workload , team_id = team_id , readonly = readonly ).get_client ()
111+
112+
113+ def get_pool (workload : Workload = Workload .DEFAULT , team_id = None , readonly = False ):
23114 """
24115 Returns the right connection pool given a workload.
25116
0 commit comments