Skip to content

Commit e823bf9

Browse files
authored
feat: make possible to use ClickHouse HTTP interface (PostHog#27273)
1 parent 90057e4 commit e823bf9

File tree

10 files changed

+121
-12
lines changed

10 files changed

+121
-12
lines changed

posthog/clickhouse/client/connection.py

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
from enum import Enum
33
from functools import cache
44

5+
from clickhouse_connect import get_client
6+
from clickhouse_connect.driver import Client as HttpClient, httputil
57
from clickhouse_driver import Client as SyncClient
68
from clickhouse_pool import ChPool
79
from django.conf import settings
@@ -19,7 +21,96 @@ class Workload(Enum):
1921
_default_workload = Workload.ONLINE
2022

2123

22-
def get_pool(workload: Workload, team_id=None, readonly=False):
24+
class ProxyClient:
25+
def __init__(self, client: HttpClient):
26+
self._client = client
27+
28+
def execute(
29+
self,
30+
query,
31+
params=None,
32+
with_column_types=False,
33+
external_tables=None,
34+
query_id=None,
35+
settings=None,
36+
types_check=False,
37+
columnar=False,
38+
):
39+
if query_id:
40+
settings["query_id"] = query_id
41+
result = self._client.query(query=query, parameters=params, settings=settings, column_oriented=columnar)
42+
43+
# we must play with result summary here
44+
written_rows = int(result.summary.get("written_rows", 0))
45+
if written_rows > 0:
46+
return written_rows
47+
if with_column_types:
48+
column_types_driver_format = list(zip(result.column_names, result.column_types))
49+
return result.result_set, column_types_driver_format
50+
return result.result_set
51+
52+
# Implement methods for session managment: https://peps.python.org/pep-0343/ so ProxyClient can be used in all places a clickhouse_driver.Client is.
53+
def __enter__(self):
54+
return self
55+
56+
def __exit__(self, *args):
57+
pass
58+
59+
60+
_clickhouse_http_pool_mgr = httputil.get_pool_manager(
61+
maxsize=settings.CLICKHOUSE_CONN_POOL_MAX, # max number of open connection per pool
62+
block=True, # makes the maxsize limit per pool, keeps connections
63+
num_pools=12, # number of pools
64+
)
65+
66+
67+
def get_http_client(**overrides):
68+
kwargs = {
69+
"host": settings.CLICKHOUSE_HOST,
70+
"database": settings.CLICKHOUSE_DATABASE,
71+
"secure": settings.CLICKHOUSE_SECURE,
72+
"username": settings.CLICKHOUSE_USER,
73+
"password": settings.CLICKHOUSE_PASSWORD,
74+
"ca_cert": settings.CLICKHOUSE_CA,
75+
"verify": settings.CLICKHOUSE_VERIFY,
76+
"settings": {"mutations_sync": "1"} if settings.TEST else {},
77+
# Without this, OPTIMIZE table and other queries will regularly run into timeouts
78+
"send_receive_timeout": 30 if settings.TEST else 999_999_999,
79+
"autogenerate_session_id": True, # beware, this makes each query to run in a separate session - no temporary tables will work
80+
"pool_mgr": _clickhouse_http_pool_mgr,
81+
**overrides,
82+
}
83+
return ProxyClient(get_client(**kwargs))
84+
85+
86+
def get_client_from_pool(workload: Workload = Workload.DEFAULT, team_id=None, readonly=False):
87+
"""
88+
Returns the client for a given workload.
89+
90+
The connection pool for HTTP is managed by a library.
91+
"""
92+
if settings.CLICKHOUSE_USE_HTTP:
93+
if team_id is not None and str(team_id) in settings.CLICKHOUSE_PER_TEAM_SETTINGS:
94+
return get_http_client(**settings.CLICKHOUSE_PER_TEAM_SETTINGS[str(team_id)])
95+
96+
# Note that `readonly` does nothing if the relevant vars are not set!
97+
if readonly and settings.READONLY_CLICKHOUSE_USER is not None and settings.READONLY_CLICKHOUSE_PASSWORD:
98+
return get_http_client(
99+
username=settings.READONLY_CLICKHOUSE_USER,
100+
password=settings.READONLY_CLICKHOUSE_PASSWORD,
101+
)
102+
103+
if (
104+
workload == Workload.OFFLINE or workload == Workload.DEFAULT and _default_workload == Workload.OFFLINE
105+
) and settings.CLICKHOUSE_OFFLINE_CLUSTER_HOST is not None:
106+
return get_http_client(host=settings.CLICKHOUSE_OFFLINE_CLUSTER_HOST, verify=False)
107+
108+
return get_http_client()
109+
110+
return get_pool(workload=workload, team_id=team_id, readonly=readonly).get_client()
111+
112+
113+
def get_pool(workload: Workload = Workload.DEFAULT, team_id=None, readonly=False):
23114
"""
24115
Returns the right connection pool given a workload.
25116

posthog/clickhouse/client/execute.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from clickhouse_driver import Client as SyncClient
1212
from django.conf import settings as app_settings
1313

14-
from posthog.clickhouse.client.connection import Workload, get_pool
14+
from posthog.clickhouse.client.connection import Workload, get_client_from_pool
1515
from posthog.clickhouse.client.escape import substitute_params
1616
from posthog.clickhouse.query_tagging import get_query_tag_value, get_query_tags
1717
from posthog.errors import wrap_query_error
@@ -121,7 +121,7 @@ def sync_execute(
121121
if get_query_tag_value("id") == "posthog.tasks.tasks.process_query_task":
122122
workload = Workload.ONLINE
123123

124-
with sync_client or get_pool(workload, team_id, readonly).get_client() as client:
124+
with sync_client or get_client_from_pool(workload, team_id, readonly) as client:
125125
start_time = perf_counter()
126126

127127
prepared_sql, prepared_args, tags = _prepare_query(client=client, query=query, args=args, workload=workload)
@@ -137,6 +137,7 @@ def sync_execute(
137137
settings = {
138138
**core_settings,
139139
"log_comment": json.dumps(tags, separators=(",", ":")),
140+
"query_id": query_id,
140141
}
141142

142143
try:

posthog/clickhouse/client/test/test_execute_async.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,12 +358,12 @@ def test_client_strips_comments_from_request(self):
358358
# request routing information for debugging purposes
359359
self.assertIn(f"/* user_id:{self.user_id} request:1 */", first_query)
360360

361-
@patch("posthog.clickhouse.client.execute.get_pool")
362-
def test_offline_workload_if_personal_api_key(self, mock_get_pool):
361+
@patch("posthog.clickhouse.client.execute.get_client_from_pool")
362+
def test_offline_workload_if_personal_api_key(self, mock_get_client):
363363
from posthog.clickhouse.query_tagging import tag_queries
364364

365365
with self.capture_select_queries():
366366
tag_queries(kind="request", id="1", access_method="personal_api_key")
367367
sync_execute("select 1")
368368

369-
self.assertEqual(mock_get_pool.call_args[0][0], Workload.OFFLINE)
369+
self.assertEqual(mock_get_client.call_args[0][0], Workload.OFFLINE)

posthog/clickhouse/migrations/0064_materialize_elements_chain.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from infi.clickhouse_orm import migrations
22

3-
from posthog.clickhouse.client.connection import ch_pool
3+
from posthog.clickhouse.client.connection import get_client_from_pool
44
from posthog.settings import CLICKHOUSE_CLUSTER
55

66

@@ -22,7 +22,7 @@
2222

2323

2424
def add_columns_to_required_tables(_):
25-
with ch_pool.get_client() as client:
25+
with get_client_from_pool() as client:
2626
client.execute(ADD_COLUMNS_SHARDED_EVENTS.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER))
2727

2828
client.execute(ADD_COLUMNS_EVENTS.format(table="events", cluster=CLICKHOUSE_CLUSTER))

posthog/clickhouse/migrations/0072_materialize_elements_chain_ids.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from infi.clickhouse_orm import migrations
22

3-
from posthog.clickhouse.client.connection import ch_pool
3+
from posthog.clickhouse.client.connection import get_client_from_pool
44
from posthog.settings import CLICKHOUSE_CLUSTER
55

66

@@ -16,7 +16,7 @@
1616

1717

1818
def add_columns_to_required_tables(_):
19-
with ch_pool.get_client() as client:
19+
with get_client_from_pool() as client:
2020
client.execute(DROP_COLUMNS_SHARDED_EVENTS.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER))
2121
client.execute(ADD_COLUMNS_SHARDED_EVENTS.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER))
2222

posthog/clickhouse/migrations/0078_add_soft_delete_column_on_events.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from infi.clickhouse_orm import migrations
22

3-
from posthog.clickhouse.client.connection import ch_pool
3+
from posthog.clickhouse.client.connection import get_client_from_pool
44
from posthog.settings import CLICKHOUSE_CLUSTER
55

66

@@ -21,7 +21,7 @@
2121

2222

2323
def add_columns_to_required_tables(_):
24-
with ch_pool.get_client() as client:
24+
with get_client_from_pool() as client:
2525
client.execute(DROP_COLUMNS_EVENTS.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER))
2626
client.execute(DROP_COLUMNS_EVENTS.format(table="events", cluster=CLICKHOUSE_CLUSTER))
2727
client.execute(ADD_COLUMNS_EVENTS.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER))

posthog/settings/data_stores.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ def postgres_config(host: str) -> dict:
148148
CLICKHOUSE_OFFLINE_CLUSTER_HOST: str | None = os.getenv("CLICKHOUSE_OFFLINE_CLUSTER_HOST", None)
149149
CLICKHOUSE_USER: str = os.getenv("CLICKHOUSE_USER", "default")
150150
CLICKHOUSE_PASSWORD: str = os.getenv("CLICKHOUSE_PASSWORD", "")
151+
CLICKHOUSE_USE_HTTP: str = get_from_env("CLICKHOUSE_USE_HTTP", False, type_cast=str_to_bool)
151152
CLICKHOUSE_DATABASE: str = CLICKHOUSE_TEST_DB if TEST else os.getenv("CLICKHOUSE_DATABASE", "default")
152153
CLICKHOUSE_CLUSTER: str = os.getenv("CLICKHOUSE_CLUSTER", "posthog")
153154
CLICKHOUSE_CA: str | None = os.getenv("CLICKHOUSE_CA", None)

requirements-dev.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,10 @@ googleapis-common-protos==1.60.0
167167
# via
168168
# -c requirements.txt
169169
# opentelemetry-exporter-otlp-proto-grpc
170+
greenlet==3.1.1
171+
# via
172+
# -c requirements.txt
173+
# sqlalchemy
170174
grpcio==1.63.2
171175
# via
172176
# -c requirements.txt

requirements.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ boto3==1.28.16
1212
brotli==1.1.0
1313
celery==5.3.4
1414
celery-redbeat==2.1.1
15+
clickhouse-connect==0.8.11
1516
clickhouse-driver==0.2.7
1617
clickhouse-pool==0.5.3
1718
conditional-cache==1.2

requirements.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ celery-redbeat==2.1.1
8080
# via -r requirements.in
8181
certifi==2019.11.28
8282
# via
83+
# clickhouse-connect
8384
# httpcore
8485
# httpx
8586
# requests
@@ -110,6 +111,8 @@ click-plugins==1.1.1
110111
# via celery
111112
click-repl==0.3.0
112113
# via celery
114+
clickhouse-connect==0.8.11
115+
# via -r requirements.in
113116
clickhouse-driver==0.2.7
114117
# via
115118
# -r requirements.in
@@ -270,6 +273,8 @@ googleapis-common-protos==1.60.0
270273
# via
271274
# google-api-core
272275
# grpcio-status
276+
greenlet==3.1.1
277+
# via sqlalchemy
273278
grpcio==1.63.2
274279
# via
275280
# -r requirements.in
@@ -378,6 +383,8 @@ lxml==4.9.4
378383
# toronado
379384
# xmlsec
380385
# zeep
386+
lz4==4.3.3
387+
# via clickhouse-connect
381388
lzstring==1.0.4
382389
# via -r requirements.in
383390
makefun==1.15.2
@@ -562,6 +569,7 @@ python3-saml==1.12.0
562569
pytz==2023.3
563570
# via
564571
# -r requirements.in
572+
# clickhouse-connect
565573
# clickhouse-driver
566574
# dlt
567575
# infi-clickhouse-orm
@@ -776,6 +784,7 @@ uritemplate==4.1.1
776784
urllib3==1.26.18
777785
# via
778786
# botocore
787+
# clickhouse-connect
779788
# geoip2
780789
# google-auth
781790
# pdpyras
@@ -811,6 +820,8 @@ yarl==1.18.3
811820
# via aiohttp
812821
zeep==4.2.1
813822
# via simple-salesforce
823+
zstandard==0.23.0
824+
# via clickhouse-connect
814825
zstd==1.5.5.1
815826
# via -r requirements.in
816827
zxcvbn==4.4.28

0 commit comments

Comments
 (0)