worker/
queue.rs

1use std::{
2    convert::{TryFrom, TryInto},
3    marker::PhantomData,
4};
5
6use crate::{env::EnvBinding, Date, Error, Result};
7use js_sys::Array;
8use serde::{de::DeserializeOwned, Serialize};
9use wasm_bindgen::{prelude::*, JsCast};
10use wasm_bindgen_futures::JsFuture;
11use worker_sys::{Message as MessageSys, MessageBatch as MessageBatchSys, Queue as EdgeQueue};
12
13/// A batch of messages that are sent to a consumer Worker.
14#[derive(Debug)]
15pub struct MessageBatch<T> {
16    inner: MessageBatchSys,
17    phantom: PhantomData<T>,
18}
19
20impl<T> MessageBatch<T> {
21    /// The name of the Queue that belongs to this batch.
22    pub fn queue(&self) -> String {
23        self.inner.queue().unwrap().into()
24    }
25
26    /// Marks every message to be retried in the next batch.
27    pub fn retry_all(&self) {
28        self.inner.retry_all(JsValue::null()).unwrap();
29    }
30
31    /// Marks every message to be retried in the next batch with options.
32    pub fn retry_all_with_options(&self, queue_retry_options: &QueueRetryOptions) {
33        self.inner
34            // SAFETY: QueueRetryOptions is controlled by this module and all data in it is serializable to a js value.
35            .retry_all(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
36            .unwrap();
37    }
38
39    /// Marks every message acknowledged in the batch.
40    pub fn ack_all(&self) {
41        self.inner.ack_all().unwrap();
42    }
43
44    /// Iterator for raw messages in the message batch. Ordering of messages is not guaranteed.
45    pub fn raw_iter(&self) -> RawMessageIter {
46        let messages = self.inner.messages().unwrap();
47        RawMessageIter {
48            range: 0..messages.length(),
49            array: messages,
50        }
51    }
52}
53
54impl<T: DeserializeOwned> MessageBatch<T> {
55    /// An array of messages in the batch. Ordering of messages is not guaranteed.
56    pub fn messages(&self) -> Result<Vec<Message<T>>> {
57        self.iter().collect()
58    }
59
60    /// Iterator for messages in the message batch. Ordering of messages is not guaranteed.
61    pub fn iter(&self) -> MessageIter<T> {
62        let messages = self.inner.messages().unwrap();
63        MessageIter {
64            range: 0..messages.length(),
65            array: messages,
66            marker: PhantomData,
67        }
68    }
69}
70
71impl<T> From<MessageBatchSys> for MessageBatch<T> {
72    fn from(value: MessageBatchSys) -> Self {
73        Self {
74            inner: value,
75            phantom: PhantomData,
76        }
77    }
78}
79
80/// A message that is sent to a consumer Worker.
81#[derive(Debug)]
82pub struct Message<T> {
83    inner: MessageSys,
84    body: T,
85}
86
87impl<T> Message<T> {
88    /// The body of the message.
89    pub fn body(&self) -> &T {
90        &self.body
91    }
92
93    /// The body of the message.
94    pub fn into_body(self) -> T {
95        self.body
96    }
97
98    /// The raw body of the message.
99    pub fn raw_body(&self) -> JsValue {
100        self.inner().body().unwrap()
101    }
102}
103
104impl<T> TryFrom<RawMessage> for Message<T>
105where
106    T: DeserializeOwned,
107{
108    type Error = Error;
109
110    fn try_from(value: RawMessage) -> std::result::Result<Self, Self::Error> {
111        let body = serde_wasm_bindgen::from_value(value.body())?;
112        Ok(Self {
113            inner: value.inner,
114            body,
115        })
116    }
117}
118
119/// A message that is sent to a consumer Worker.
120#[derive(Debug)]
121pub struct RawMessage {
122    inner: MessageSys,
123}
124
125impl RawMessage {
126    /// The body of the message.
127    pub fn body(&self) -> JsValue {
128        self.inner.body().unwrap()
129    }
130}
131
132impl From<MessageSys> for RawMessage {
133    fn from(value: MessageSys) -> Self {
134        Self { inner: value }
135    }
136}
137
138trait MessageSysInner {
139    fn inner(&self) -> &MessageSys;
140}
141
142impl MessageSysInner for RawMessage {
143    fn inner(&self) -> &MessageSys {
144        &self.inner
145    }
146}
147
148impl<T> MessageSysInner for Message<T> {
149    fn inner(&self) -> &MessageSys {
150        &self.inner
151    }
152}
153
154#[derive(Serialize)]
155#[serde(rename_all = "camelCase")]
156#[derive(Debug)]
157/// Optional configuration when marking a message or a batch of messages for retry.
158pub struct QueueRetryOptions {
159    delay_seconds: Option<u32>,
160}
161
162#[derive(Debug)]
163pub struct QueueRetryOptionsBuilder {
164    delay_seconds: Option<u32>,
165}
166
167impl QueueRetryOptionsBuilder {
168    /// Creates a new retry options builder.
169    pub fn new() -> Self {
170        Self {
171            delay_seconds: None,
172        }
173    }
174
175    #[must_use]
176    /// The number of seconds to delay a message for within the queue, before it can be delivered to a consumer
177    pub fn with_delay_seconds(mut self, delay_seconds: u32) -> Self {
178        self.delay_seconds = Some(delay_seconds);
179        self
180    }
181
182    /// Build the retry options.
183    pub fn build(self) -> QueueRetryOptions {
184        QueueRetryOptions {
185            delay_seconds: self.delay_seconds,
186        }
187    }
188}
189
190pub trait MessageExt {
191    /// A unique, system-generated ID for the message.
192    fn id(&self) -> String;
193
194    /// A timestamp when the message was sent.
195    fn timestamp(&self) -> Date;
196
197    /// Marks message to be retried.
198    fn retry(&self);
199
200    /// Marks message to be retried with options.
201    fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions);
202
203    /// Marks message acknowledged.
204    fn ack(&self);
205}
206
207impl<T: MessageSysInner> MessageExt for T {
208    /// A unique, system-generated ID for the message.
209    fn id(&self) -> String {
210        self.inner().id().unwrap().into()
211    }
212
213    /// A timestamp when the message was sent.
214    fn timestamp(&self) -> Date {
215        Date::from(self.inner().timestamp().unwrap())
216    }
217
218    /// Marks message to be retried.
219    fn retry(&self) {
220        self.inner().retry(JsValue::null()).unwrap();
221    }
222
223    /// Marks message to be retried with options.
224    fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions) {
225        self.inner()
226            // SAFETY: QueueRetryOptions is controlled by this module and all data in it is serializable to a js value.
227            .retry(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
228            .unwrap();
229    }
230
231    /// Marks message acknowledged.
232    fn ack(&self) {
233        self.inner().ack().unwrap();
234    }
235}
236
237#[derive(Debug)]
238pub struct MessageIter<T> {
239    range: std::ops::Range<u32>,
240    array: Array,
241    marker: PhantomData<T>,
242}
243
244impl<T> std::iter::Iterator for MessageIter<T>
245where
246    T: DeserializeOwned,
247{
248    type Item = Result<Message<T>>;
249
250    fn next(&mut self) -> Option<Self::Item> {
251        let index = self.range.next()?;
252        let value = self.array.get(index);
253        let raw_message = RawMessage::from(MessageSys::from(value));
254        Some(raw_message.try_into())
255    }
256
257    #[inline]
258    fn size_hint(&self) -> (usize, Option<usize>) {
259        self.range.size_hint()
260    }
261}
262
263impl<T> std::iter::DoubleEndedIterator for MessageIter<T>
264where
265    T: DeserializeOwned,
266{
267    fn next_back(&mut self) -> Option<Self::Item> {
268        let index = self.range.next_back()?;
269        let value = self.array.get(index);
270        let raw_message = RawMessage::from(MessageSys::from(value));
271        Some(raw_message.try_into())
272    }
273}
274
275impl<T> std::iter::FusedIterator for MessageIter<T> where T: DeserializeOwned {}
276
277impl<T> std::iter::ExactSizeIterator for MessageIter<T> where T: DeserializeOwned {}
278
279#[derive(Debug)]
280pub struct RawMessageIter {
281    range: std::ops::Range<u32>,
282    array: Array,
283}
284
285impl std::iter::Iterator for RawMessageIter {
286    type Item = RawMessage;
287
288    fn next(&mut self) -> Option<Self::Item> {
289        let index = self.range.next()?;
290        let value = self.array.get(index);
291        Some(MessageSys::from(value).into())
292    }
293
294    #[inline]
295    fn size_hint(&self) -> (usize, Option<usize>) {
296        self.range.size_hint()
297    }
298}
299
300impl std::iter::DoubleEndedIterator for RawMessageIter {
301    fn next_back(&mut self) -> Option<Self::Item> {
302        let index = self.range.next_back()?;
303        let value = self.array.get(index);
304        Some(MessageSys::from(value).into())
305    }
306}
307
308impl std::iter::FusedIterator for RawMessageIter {}
309
310impl std::iter::ExactSizeIterator for RawMessageIter {}
311
312#[derive(Debug, Clone)]
313pub struct Queue(EdgeQueue);
314
315unsafe impl Send for Queue {}
316unsafe impl Sync for Queue {}
317
318impl EnvBinding for Queue {
319    const TYPE_NAME: &'static str = "WorkerQueue";
320}
321
322impl JsCast for Queue {
323    fn instanceof(val: &JsValue) -> bool {
324        val.is_instance_of::<Queue>()
325    }
326
327    fn unchecked_from_js(val: JsValue) -> Self {
328        Self(val.into())
329    }
330
331    fn unchecked_from_js_ref(val: &JsValue) -> &Self {
332        unsafe { &*(val as *const JsValue as *const Self) }
333    }
334}
335
336impl From<Queue> for JsValue {
337    fn from(queue: Queue) -> Self {
338        JsValue::from(queue.0)
339    }
340}
341
342impl AsRef<JsValue> for Queue {
343    fn as_ref(&self) -> &JsValue {
344        &self.0
345    }
346}
347
348#[derive(Clone, Copy, Debug)]
349pub enum QueueContentType {
350    /// Send a JavaScript object that can be JSON-serialized. This content type can be previewed from the Cloudflare dashboard.
351    Json,
352    /// Send a String. This content type can be previewed with the List messages from the dashboard feature.
353    Text,
354    /// Send a JavaScript object that cannot be JSON-serialized but is supported by structured clone (for example Date and Map). This content type cannot be previewed from the Cloudflare dashboard and will display as Base64-encoded.
355    V8,
356}
357
358impl Serialize for QueueContentType {
359    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
360    where
361        S: serde::Serializer,