worker/
queue.rs

1use std::{
2    convert::{TryFrom, TryInto},
3    marker::PhantomData,
4};
5
6use crate::{env::EnvBinding, Date, Error, Result};
7use js_sys::Array;
8use serde::{de::DeserializeOwned, Serialize};
9use wasm_bindgen::{prelude::*, JsCast};
10use wasm_bindgen_futures::JsFuture;
11use worker_sys::{Message as MessageSys, MessageBatch as MessageBatchSys, Queue as EdgeQueue};
12
13/// A batch of messages that are sent to a consumer Worker.
14#[derive(Debug)]
15pub struct MessageBatch<T> {
16    inner: MessageBatchSys,
17    phantom: PhantomData<T>,
18}
19
20impl<T> MessageBatch<T> {
21    /// The name of the Queue that belongs to this batch.
22    pub fn queue(&self) -> String {
23        self.inner.queue().unwrap().into()
24    }
25
26    /// Marks every message to be retried in the next batch.
27    pub fn retry_all(&self) {
28        self.inner.retry_all(JsValue::null()).unwrap();
29    }
30
31    /// Marks every message to be retried in the next batch with options.
32    pub fn retry_all_with_options(&self, queue_retry_options: &QueueRetryOptions) {
33        self.inner
34            // SAFETY: QueueRetryOptions is controlled by this module and all data in it is serializable to a js value.
35            .retry_all(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
36            .unwrap();
37    }
38
39    /// Marks every message acknowledged in the batch.
40    pub fn ack_all(&self) {
41        self.inner.ack_all().unwrap();
42    }
43
44    /// Iterator for raw messages in the message batch. Ordering of messages is not guaranteed.
45    pub fn raw_iter(&self) -> RawMessageIter {
46        let messages = self.inner.messages().unwrap();
47        RawMessageIter {
48            range: 0..messages.length(),
49            array: messages,
50        }
51    }
52}
53
54impl<T: DeserializeOwned> MessageBatch<T> {
55    /// An array of messages in the batch. Ordering of messages is not guaranteed.
56    pub fn messages(&self) -> Result<Vec<Message<T>>> {
57        self.iter().collect()
58    }
59
60    /// Iterator for messages in the message batch. Ordering of messages is not guaranteed.
61    pub fn iter(&self) -> MessageIter<T> {
62        let messages = self.inner.messages().unwrap();
63        MessageIter {
64            range: 0..messages.length(),
65            array: messages,
66            marker: PhantomData,
67        }
68    }
69}
70
71impl<T> From<MessageBatchSys> for MessageBatch<T> {
72    fn from(value: MessageBatchSys) -> Self {
73        Self {
74            inner: value,
75            phantom: PhantomData,
76        }
77    }
78}
79
80/// A message that is sent to a consumer Worker.
81#[derive(Debug)]
82pub struct Message<T> {
83    inner: MessageSys,
84    body: T,
85}
86
87impl<T> Message<T> {
88    /// The body of the message.
89    pub fn body(&self) -> &T {
90        &self.body
91    }
92
93    /// The body of the message.
94    pub fn into_body(self) -> T {
95        self.body
96    }
97
98    /// The raw body of the message.
99    pub fn raw_body(&self) -> JsValue {
100        self.inner().body().unwrap()
101    }
102}
103
104impl<T> TryFrom<RawMessage> for Message<T>
105where
106    T: DeserializeOwned,
107{
108    type Error = Error;
109
110    fn try_from(value: RawMessage) -> std::result::Result<Self, Self::Error> {
111        let body = serde_wasm_bindgen::from_value(value.body())?;
112        Ok(Self {
113            inner: value.inner,
114            body,
115        })
116    }
117}
118
119/// A message that is sent to a consumer Worker.
120#[derive(Debug)]
121pub struct RawMessage {
122    inner: MessageSys,
123}
124
125impl RawMessage {
126    /// The body of the message.
127    pub fn body(&self) -> JsValue {
128        self.inner.body().unwrap()
129    }
130}
131
132impl From<MessageSys> for RawMessage {
133    fn from(value: MessageSys) -> Self {
134        Self { inner: value }
135    }
136}
137
138trait MessageSysInner {
139    fn inner(&self) -> &MessageSys;
140}
141
142impl MessageSysInner for RawMessage {
143    fn inner(&self) -> &MessageSys {
144        &self.inner
145    }
146}
147
148impl<T> MessageSysInner for Message<T> {
149    fn inner(&self) -> &MessageSys {
150        &self.inner
151    }
152}
153
154#[derive(Serialize)]
155#[serde(rename_all = "camelCase")]
156#[derive(Debug)]
157/// Optional configuration when marking a message or a batch of messages for retry.
158pub struct QueueRetryOptions {
159    delay_seconds: Option<u32>,
160}
161
162#[derive(Debug)]
163pub struct QueueRetryOptionsBuilder {
164    delay_seconds: Option<u32>,
165}
166
167impl QueueRetryOptionsBuilder {
168    /// Creates a new retry options builder.
169    pub fn new() -> Self {
170        Self {
171            delay_seconds: None,
172        }
173    }
174
175    #[must_use]
176    /// The number of seconds to delay a message for within the queue, before it can be delivered to a consumer
177    pub fn with_delay_seconds(mut self, delay_seconds: u32) -> Self {
178        self.delay_seconds = Some(delay_seconds);
179        self
180    }
181
182    /// Build the retry options.
183    pub fn build(self) -> QueueRetryOptions {
184        QueueRetryOptions {
185            delay_seconds: self.delay_seconds,
186        }
187    }
188}
189
190pub trait MessageExt {
191    /// A unique, system-generated ID for the message.
192    fn id(&self) -> String;
193
194    /// A timestamp when the message was sent.
195    fn timestamp(&self) -> Date;
196
197    /// Marks message to be retried.
198    fn retry(&self);
199
200    /// Marks message to be retried with options.
201    fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions);
202
203    /// Marks message acknowledged.
204    fn ack(&self);
205}
206
207impl<T: MessageSysInner> MessageExt for T {
208    /// A unique, system-generated ID for the message.
209    fn id(&self) -> String {
210        self.inner().id().unwrap().into()
211    }
212
213    /// A timestamp when the message was sent.
214    fn timestamp(&self) -> Date {
215        Date::from(self.inner().timestamp().unwrap())
216    }
217
218    /// Marks message to be retried.
219    fn retry(&self) {
220        self.inner().retry(JsValue::null()).unwrap();
221    }
222
223    /// Marks message to be retried with options.
224    fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions) {
225        self.inner()
226            // SAFETY: QueueRetryOptions is controlled by this module and all data in it is serializable to a js value.
227            .retry(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
228            .unwrap();
229    }
230
231    /// Marks message acknowledged.
232    fn ack(&self) {
233        self.inner().ack().unwrap();
234    }
235}
236
237#[derive(Debug)]
238pub struct MessageIter<T> {
239    range: std::ops::Range<u32>,
240    array: Array,
241    marker: PhantomData<T>,