Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion django/db/backends/base/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from django.db.backends.signals import connection_created
from django.db.backends.utils import debug_transaction
from django.db.transaction import TransactionManagementError
from django.db.utils import DatabaseErrorWrapper
from django.db.utils import DatabaseErrorWrapper, ProgrammingError
from django.utils.asyncio import async_unsafe
from django.utils.functional import cached_property

Expand Down Expand Up @@ -271,6 +271,10 @@ def check_settings(self):
def ensure_connection(self):
"""Guarantee that a connection to the database is established."""
if self.connection is None:
if self.in_atomic_block and self.closed_in_transaction:
raise ProgrammingError(
"Cannot open a new connection in an atomic block."
)
with self.wrap_database_errors:
self.connect()

Expand Down
145 changes: 122 additions & 23 deletions django/db/backends/postgresql/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from django.core.exceptions import ImproperlyConfigured
from django.db import DatabaseError as WrappedDatabaseError
from django.db import connections
from django.db.backends.base.base import BaseDatabaseWrapper
from django.db.backends.base.base import NO_DB_ALIAS, BaseDatabaseWrapper
from django.db.backends.utils import CursorDebugWrapper as BaseCursorDebugWrapper
from django.utils.asyncio import async_unsafe
from django.utils.functional import cached_property
Expand Down Expand Up @@ -86,6 +86,24 @@ def _get_varchar_column(data):
return "varchar(%(max_length)s)" % data


def ensure_timezone(connection, ops, timezone_name):
conn_timezone_name = connection.info.parameter_status("TimeZone")
if timezone_name and conn_timezone_name != timezone_name:
with connection.cursor() as cursor:
cursor.execute(ops.set_time_zone_sql(), [timezone_name])
return True
return False


def ensure_role(connection, ops, role_name):
if role_name:
with connection.cursor() as cursor:
sql = ops.compose_sql("SET ROLE %s", [role_name])
cursor.execute(sql)
return True
return False


class DatabaseWrapper(BaseDatabaseWrapper):
vendor = "postgresql"
display_name = "PostgreSQL"
Expand Down Expand Up @@ -179,6 +197,53 @@ class DatabaseWrapper(BaseDatabaseWrapper):
ops_class = DatabaseOperations
# PostgreSQL backend-specific attributes.
_named_cursor_idx = 0
_connection_pools = {}

@property
def pool(self):
pool_options = self.settings_dict["OPTIONS"].get("pool")
if self.alias == NO_DB_ALIAS or not pool_options:
return None

if self.alias not in self._connection_pools:
if self.settings_dict.get("CONN_MAX_AGE", 0) != 0:
raise ImproperlyConfigured(
"Pooling doesn't support persistent connections."
)
Comment on lines +209 to +212

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partly found my answer here :)

# Set the default options.
if pool_options is True:
pool_options = {}

try:
from psycopg_pool import ConnectionPool
except ImportError as err:
raise ImproperlyConfigured(
"Error loading psycopg_pool module.\nDid you install psycopg[pool]?"
) from err

connect_kwargs = self.get_connection_params()
# Ensure we run in autocommit, Django properly sets it later on.
connect_kwargs["autocommit"] = True
enable_checks = self.settings_dict["CONN_HEALTH_CHECKS"]
pool = ConnectionPool(
kwargs=connect_kwargs,
open=False, # Do not open the pool during startup.
configure=self._configure_connection,
check=ConnectionPool.check_connection if enable_checks else None,
**pool_options,
)
# setdefault() ensures that multiple threads don't set this in
# parallel. Since we do not open the pool during it's init above,
# this means that at worst during startup multiple threads generate
# pool objects and the first to set it wins.
self._connection_pools.setdefault(self.alias, pool)

return self._connection_pools[self.alias]

def close_pool(self):
if self.pool:
self.pool.close()
del self._connection_pools[self.alias]

def get_database_version(self):
"""
Expand Down Expand Up @@ -221,6 +286,11 @@ def get_connection_params(self):

conn_params.pop("assume_role", None)
conn_params.pop("isolation_level", None)

pool_options = conn_params.pop("pool", None)
if pool_options and not is_psycopg3:
raise ImproperlyConfigured("Database pooling requires psycopg >= 3")

server_side_binding = conn_params.pop("server_side_binding", None)
conn_params.setdefault(
"cursor_factory",
Expand Down Expand Up @@ -272,7 +342,12 @@ def get_new_connection(self, conn_params):
f"Invalid transaction isolation level {isolation_level_value} "
f"specified. Use one of the psycopg.IsolationLevel values."
)
connection = self.Database.connect(**conn_params)
if self.pool:
# If nothing else has opened the pool, open it now.
self.pool.open()
connection = self.pool.getconn()
else:
connection = self.Database.connect(**conn_params)
if set_isolation_level:
connection.isolation_level = self.isolation_level
if not is_psycopg3:
Expand All @@ -285,36 +360,52 @@ def get_new_connection(self, conn_params):
return connection

def ensure_timezone(self):
# Close the pool so new connections pick up the correct timezone.
self.close_pool()
if self.connection is None:
return False
conn_timezone_name = self.connection.info.parameter_status("TimeZone")
timezone_name = self.timezone_name
if timezone_name and conn_timezone_name != timezone_name:
with self.connection.cursor() as cursor:
cursor.execute(self.ops.set_time_zone_sql(), [timezone_name])
return True
return False

def ensure_role(self):
if new_role := self.settings_dict["OPTIONS"].get("assume_role"):
with self.connection.cursor() as cursor:
sql = self.ops.compose_sql("SET ROLE %s", [new_role])
cursor.execute(sql)
return True
return False
return ensure_timezone(self.connection, self.ops, self.timezone_name)

def init_connection_state(self):
super().init_connection_state()
def _configure_connection(self, connection):
# This function is called from init_connection_state and from the
# psycopg pool itself after a connection is opened. Make sure that
# whatever is done here does not access anything on self aside from
# variables.

# Commit after setting the time zone.
commit_tz = self.ensure_timezone()
commit_tz = ensure_timezone(connection, self.ops, self.timezone_name)
# Set the role on the connection. This is useful if the credential used
# to login is not the same as the role that owns database resources. As
# can be the case when using temporary or ephemeral credentials.
commit_role = self.ensure_role()
role_name = self.settings_dict["OPTIONS"].get("assume_role")
commit_role = ensure_role(connection, self.ops, role_name)

return commit_role or commit_tz

def _close(self):
if self.connection is not None:
# `wrap_database_errors` only works for `putconn` as long as there
# is no `reset` function set in the pool because it is deferred
# into a thread and not directly executed.
with self.wrap_database_errors:
if self.pool:
# Ensure the correct pool is returned. This is a workaround
# for tests so a pool can be changed on setting changes
# (e.g. USE_TZ, TIME_ZONE).
self.connection._pool.putconn(self.connection)
# Connection can no longer be used.
self.connection = None
else:
return self.connection.close()

if (commit_role or commit_tz) and not self.get_autocommit():
self.connection.commit()
def init_connection_state(self):
super().init_connection_state()

if self.connection is not None and not self.pool:
commit = self._configure_connection(self.connection)

if commit and not self.get_autocommit():
self.connection.commit()

@async_unsafe
def create_cursor(self, name=None):
Expand Down Expand Up @@ -396,6 +487,8 @@ def check_constraints(self, table_names=None):
cursor.execute("SET CONSTRAINTS ALL DEFERRED")

def is_usable(self):
if self.connection is None:
return False
try:
# Use a psycopg cursor directly, bypassing Django's utilities.
with self.connection.cursor() as cursor:
Expand All @@ -405,6 +498,12 @@ def is_usable(self):
else:
return True

def close_if_health_check_failed(self):
if self.pool:
# The pool only returns healthy connections.
return
return super().close_if_health_check_failed()

@contextmanager
def _nodb_cursor(self):
cursor = None
Expand Down
5 changes: 5 additions & 0 deletions django/db/backends/postgresql/creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def _clone_test_db(self, suffix, verbosity, keepdb=False):
# CREATE DATABASE ... WITH TEMPLATE ... requires closing connections
# to the template database.
self.connection.close()
self.connection.close_pool()

source_database_name = self.connection.settings_dict["NAME"]
target_database_name = self.get_test_db_clone_settings(suffix)["NAME"]
Expand All @@ -84,3 +85,7 @@ def _clone_test_db(self, suffix, verbosity, keepdb=False):
except Exception as e:
self.log("Got an error cloning the test database: %s" % e)
sys.exit(2)

def _destroy_test_db(self, test_database_name, verbosity):
self.connection.close_pool()
return super()._destroy_test_db(test_database_name, verbosity)
32 changes: 23 additions & 9 deletions django/db/backends/postgresql/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,29 @@ class DatabaseFeatures(BaseDatabaseFeatures):
test_now_utc_template = "STATEMENT_TIMESTAMP() AT TIME ZONE 'UTC'"
insert_test_table_with_defaults = "INSERT INTO {} DEFAULT VALUES"

django_test_skips = {
"opclasses are PostgreSQL only.": {
"indexes.tests.SchemaIndexesNotPostgreSQLTests."
"test_create_index_ignores_opclasses",
},
"PostgreSQL requires casting to text.": {
"lookup.tests.LookupTests.test_textfield_exact_null",
},
}
@cached_property
def django_test_skips(self):
skips = {
"opclasses are PostgreSQL only.": {
"indexes.tests.SchemaIndexesNotPostgreSQLTests."
"test_create_index_ignores_opclasses",
},
"PostgreSQL requires casting to text.": {
"lookup.tests.LookupTests.test_textfield_exact_null",
},
}
if self.connection.settings_dict["OPTIONS"].get("pool"):
skips.update(
{
"Pool does implicit health checks": {
"backends.base.test_base.ConnectionHealthChecksTests."
"test_health_checks_enabled",
"backends.base.test_base.ConnectionHealthChecksTests."
"test_set_autocommit_health_checks_enabled",
},
}
)
return skips

@cached_property
def django_test_expected_failures(self):
Expand Down
25 changes: 25 additions & 0 deletions docs/ref/databases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,31 @@ database configuration in :setting:`DATABASES`::
},
}

.. _postgresql-pool:

Connection pool
---------------

.. versionadded:: 5.1

To use a connection pool with `psycopg`_, you can either set ``"pool"`` in the
:setting:`OPTIONS` part of your database configuration in :setting:`DATABASES`
to be a dict to be passed to :class:`~psycopg:psycopg_pool.ConnectionPool`, or
to ``True`` to use the ``ConnectionPool`` defaults::

DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
# ...
"OPTIONS": {
"pool": True,
},
},
}

This option requires ``psycopg[pool]`` or :pypi:`psycopg-pool` to be installed
and is ignored with ``psycopg2``.

.. _database-server-side-parameters-binding:

Server-side parameters binding
Expand Down
3 changes: 3 additions & 0 deletions docs/releases/5.1.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ Database backends
to allow specifying :ref:`pragma options <sqlite-init-command>` to set upon
connection.

* ``"pool"`` option is now supported in :setting:`OPTIONS` on PostgreSQL to
allow using :ref:`connection pools <postgresql-pool>`.

Decorators
~~~~~~~~~~

Expand Down
Loading