Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
279d5fe
refactor: factor out types and dataclasses
cfm Sep 27, 2025
a0e903b
feat: scaffold event types and handlers
cfm Sep 27, 2025
ae800d3
refactor(all_source_replies): factor out utils.save_reply()
cfm Oct 2, 2025
3014e0c
feat: handle happy path for "reply_sent" event
cfm Oct 2, 2025
1b66d3a
feat(EventResult): "status" field provides tuple of code and optional…
cfm Oct 10, 2025
c3cad2c
refactor: narrow dictionary types so that {<uuid>: None} indicates de…
cfm Oct 10, 2025
f6da148
feat: handle "item_deleted" event
cfm Oct 10, 2025
b2f2f8f
refactor(EventHandler): require (and document) explicit registration …
cfm Oct 10, 2025
e6dd637
feat(EventHandler): cache IDs of seen events in Redis to enforce idem…
cfm Oct 11, 2025
991dffe
feat(/data): order events by snowflake ID
cfm Oct 14, 2025
31e0999
Have mypy lint with Python 3.12 as the base version
legoktm Oct 14, 2025
f379217
fix(EventHandler): namespace idempotence (cache) keys to prevent cros…
cfm Oct 16, 2025
4ca3bca
docs: expand reference to #3918
cfm Oct 16, 2025
05c240a
chore: keep more-standard reference to os.path
cfm Oct 16, 2025
3e8de14
refactor(save_reply): raise InvalidUUID instead of uuid.UUID()'s gene…
cfm Oct 16, 2025
a0b7c3e
docs(EventResult): emphasize deletion case
cfm Oct 17, 2025
c295740
refactor(/data): perform top-level deserialization of all events first
cfm Oct 17, 2025
66d7f8e
feat(EventHandler): mark progress at the start of processing to preve…
cfm Oct 17, 2025
444bf7a
feat(/data): cap BatchRequest.events at EVENTS_MAX
cfm Oct 17, 2025
c8ee0f4
refactor(Event): validate in dataclass __post_init__()
cfm Oct 17, 2025
0a35036
test(test_api2_item_deleted): round trip
cfm Oct 17, 2025
9e1bf85
test(test_api2_reply_sent): round trip, including download and decryp…
cfm Oct 17, 2025
2cccf5b
docs: specify single-round-trip-consistency
cfm Oct 17, 2025
9d13690
Merge pull request #7672 from freedomofpress/7665-batched-events
legoktm Oct 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ no_implicit_optional = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
warn_unused_configs = true
python_version = "3.8"
python_version = "3.12"
plugins = "sqlmypy"

[[tool.mypy.overrides]]
Expand Down
45 changes: 8 additions & 37 deletions securedrop/journalist_app/api.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
import collections.abc
import json
from datetime import datetime, timezone
from os import path
from typing import Set, Tuple, Union
from uuid import UUID

import flask
import werkzeug
from db import db
from flask import Blueprint, abort, jsonify, request
from journalist_app import utils
from journalist_app.api2.shared import save_reply
from journalist_app.sessions import session
from models import (
InvalidUsernameException,
InvalidUUID,
Journalist,
LoginThrottledException,
Reply,
SeenReply,
Source,
Submission,
WrongPasswordException,
)
from sqlalchemy import Column
from sqlalchemy.exc import IntegrityError
from store import NotEncrypted, Storage
from store import NotEncrypted
from two_factor import OtpSecretInvalid, OtpTokenInvalid
from werkzeug.exceptions import default_exceptions

Expand Down Expand Up @@ -224,43 +223,15 @@ def all_source_replies(source_uuid: str) -> Tuple[flask.Response, int]:
if not data["reply"]:
abort(400, "reply should not be empty")

source.interaction_count += 1
try:
filename = Storage.get_default().save_pre_encrypted_reply(
source.filesystem_id,
source.interaction_count,
source.journalist_filename,
data["reply"],
)
except NotEncrypted:
return jsonify({"message": "You must encrypt replies client side"}), 400

# issue #3918
filename = path.basename(filename)

reply = Reply(session.get_user(), source, filename, Storage.get_default())

reply_uuid = data.get("uuid", None)
if reply_uuid is not None:
# check that is is parseable
try:
UUID(reply_uuid)
except ValueError:
abort(400, "'uuid' was not a valid UUID")
reply.uuid = reply_uuid

try:
db.session.add(reply)
seen_reply = SeenReply(reply=reply, journalist=session.get_user())
db.session.add(seen_reply)
db.session.add(source)
db.session.commit()
reply = save_reply(source, data)
except IntegrityError as e:
db.session.rollback()
if "UNIQUE constraint failed: replies.uuid" in str(e):
abort(409, "That UUID is already in use.")
else:
raise e
except NotEncrypted:
return jsonify({"message": "You must encrypt replies client side"}), 400
except InvalidUUID:
abort(400, "reply does not have a valid UUID")

return (
jsonify(
Expand Down
37 changes: 37 additions & 0 deletions securedrop/journalist_app/api2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,40 @@ else Out of date
Server ->> Client: MetadataResponse
end
```

### Batched events from client

```mermaid
sequenceDiagram
participant Client
participant Server

Note over Client: Global version abcdef
Note over Server: Global version abcdef

Client ->> Client: reply_sent {id: X, uuid: Y, source: Z, ...}
Client -->> Server: POST /api/v2/metadata<br>BatchRequest
alt Already processed:
Server ->> Server: look up status of event {id: X}
Note over Server: Return status of event {id: X},<br>in addition to anything else requested.
Server ->> Client: BatchResponse
else
Server ->> Server: process "reply_sent" event for reply {uuid: Y}
Note over Server: Return new item {uuid: Y} and updated source {uuid: Z},<br>in addition to anything else requested.
Note over Server: Global version uvwxyz
Server ->> Client: BatchResponse
Note over Client: Global version uvwxyz
end
```

This diagram implies single-round-trip consistency. To make that expectation
explicit:

1. If the server $S$ currently has exactly one active client $C$; and

2. $C$ submits a valid `BatchRequest` $BR$ with $n$ events $\{E_0, \dots,
E_n\}$; and

3. $S$ accepts $BR$ as valid and successfully processes all $E_i$; then

4. $C$'s index SHOULD match $S$'s index without a subsequent synchronization.
111 changes: 55 additions & 56 deletions securedrop/journalist_app/api2/__init__.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
import hashlib
from dataclasses import asdict, dataclass, field
from typing import (
Any,
Dict,
Mapping,
NewType,
Optional,
Set,
)
from dataclasses import asdict
from typing import Mapping, Optional

from flask import Blueprint, abort, json, jsonify, request
from journalist_app.api2.events import EventHandler
from journalist_app.api2.types import (
BatchRequest,
BatchResponse,
Event,
Index,
Version,
)
from journalist_app.sessions import session
from models import EagerQuery, Journalist, Reply, Source, Submission, eager_query
from redis import Redis
from sdconfig import SecureDropConfig
from sqlalchemy.inspection import inspect
from sqlalchemy.orm.exc import MultipleResultsFound
from werkzeug.wrappers.response import Response

blp = Blueprint("api2", __name__, url_prefix="/api/v2")

EVENTS_MAX = 50
PREFIX_MAX_LEN = inspect(Source).columns["uuid"].type.length


Version = NewType("Version", str)


def json_version(d: Mapping) -> Version:
"""
Calculate the version (BLAKE2s digest) of the normalized JSON representation
Expand All @@ -37,22 +39,6 @@ def json_version(d: Mapping) -> Version:
return Version(hashlib.blake2s(b).hexdigest())


# TODO: generic UUID[T] in Python 3.12
SourceUUID = NewType("SourceUUID", str)
ItemUUID = NewType("ItemUUID", str)
JournalistUUID = NewType("JournalistUUID", str)


@dataclass
class Index:
# Source metadata, optionally filtered by `source_prefix`:
sources: Dict[SourceUUID, Version] = field(default_factory=dict)
items: Dict[ItemUUID, Version] = field(default_factory=dict)

# Non-source metadata (always returned):
journalists: Dict[JournalistUUID, Version] = field(default_factory=dict)


@blp.get("/index")
@blp.get("/index/<string:source_prefix>")
def index(source_prefix: Optional[str] = None) -> Response:
Expand Down Expand Up @@ -99,43 +85,56 @@ def index(source_prefix: Optional[str] = None) -> Response:
return response.make_conditional(request)


@dataclass
class MetadataRequest:
# Source metadata:
sources: Set[SourceUUID] = field(default_factory=set)
items: Set[ItemUUID] = field(default_factory=set)

# Non-source metadata:
journalists: Set[JournalistUUID] = field(default_factory=set)


@dataclass
class MetadataResponse:
# Source metadata:
sources: Dict[SourceUUID, Any] = field(default_factory=dict)
items: Dict[ItemUUID, Any] = field(default_factory=dict)

# Non-source metadata:
journalists: Dict[JournalistUUID, Any] = field(default_factory=dict)


@blp.post("/metadata")
def metadata() -> Response:
@blp.post("/data") # read-write BatchRequest
@blp.post("/metadata") # DEPRECATED: read-only MetadataRequest
def data() -> Response:
"""
Return the ``MetadataResponse`` requested in the ``MetadataRequest``. The
Return the ``BatchResponse`` requested in the ``BatchRequest``. The
client MAY choose an arbitrary list of objects with each request, e.g. from
a shard retrieved from ``/index/<source_prefix>``.

NB. Returning sources is O(1) from the eagerly-loaded ``all_sources()``.
Returning items is O(2), since we have to search both the ``Submission`` and
the ``Reply`` tables for the set of all item UUIDs.
The client MAY include a list of ``Event``s for the server to process over
arbitrary sources and items. Ordering is guaranteed within a given
``BatchRequest``. Sources and items changed by one or more events will be
returned in their most-recent state in the ``BatchResponse`` whether or not
they were explicitly requested in the ``BatchRequest``.

NB. Reading sources (without any side effects from processing events) is
O(1) from the eagerly-loaded ``all_sources()``. Reading items is O(2),
since we have to search both the ``Submission`` and the ``Reply`` tables for
the set of all item UUIDs.
"""
try:
requested = MetadataRequest(**request.json) # type: ignore
requested = BatchRequest(**request.json) # type: ignore
except (TypeError, ValueError) as exc:
abort(422, f"malformed request; {exc}")

response = MetadataResponse()
response = BatchResponse()

if requested.events:
if len(requested.events) > EVENTS_MAX:
abort(429, f"a BatchRequest MUST NOT include more than {EVENTS_MAX} events")

try:
events = [Event(**d) for d in requested.events]
except (TypeError, ValueError) as e:
abort(400, f"invalid event: {e}")

# Don't set up the EventHandler, connect to Redis, etc., unless we have
# events to process.
config = SecureDropConfig.get_current()
handler = EventHandler(
session=session, redis=Redis(decode_responses=True, **config.REDIS_KWARGS)
)

# Process events in snowflake order.
for event in sorted(events, key=lambda e: e.id):
result = handler.process(event)
for uuid, source in result.sources.items():
response.sources[uuid] = source.to_api_v2() if source is not None else None
for uuid, item in result.items.items():
response.items[uuid] = item.to_api_v2() if item is not None else None
response.events[result.event_id] = result.status

if requested.sources:
source_query: EagerQuery = eager_query("Source")
Expand Down
Loading
Loading