worker/
queue.rs

1use std::{
2    convert::{TryFrom, TryInto},
3    marker::PhantomData,
4};
5
6use crate::{env::EnvBinding, Date, Error, Result};
7use js_sys::Array;
8use serde::{de::DeserializeOwned, Serialize};
9use wasm_bindgen::{prelude::*, JsCast};
10use wasm_bindgen_futures::JsFuture;
11use worker_sys::{Message as MessageSys, MessageBatch as MessageBatchSys, Queue as EdgeQueue};
12
13/// A batch of messages that are sent to a consumer Worker.
14#[derive(Debug)]
15pub struct MessageBatch<T> {
16    inner: MessageBatchSys,
17    phantom: PhantomData<T>,
18}
19
20impl<T> MessageBatch<T> {
21    /// The name of the Queue that belongs to this batch.
22    pub fn queue(&self) -> String {
23        self.inner.queue().unwrap().into()
24    }
25
26    /// Marks every message to be retried in the next batch.
27    pub fn retry_all(&self) {
28        self.inner.retry_all(JsValue::null()).unwrap();
29    }
30
31    /// Marks every message to be retried in the next batch with options.
32    pub fn retry_all_with_options(&self, queue_retry_options: &QueueRetryOptions) {
33        self.inner
34            // SAFETY: QueueRetryOptions is controlled by this module and all data in it is serializable to a js value.
35            .retry_all(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
36            .unwrap();
37    }
38
39    /// Marks every message acknowledged in the batch.
40    pub fn ack_all(&self) {
41        self.inner.ack_all().unwrap();
42    }
43
44    /// Iterator for raw messages in the message batch. Ordering of messages is not guaranteed.
45    pub fn raw_iter(&self) -> RawMessageIter {
46        let messages = self.inner.messages().unwrap();
47        RawMessageIter {
48            range: 0..messages.length(),
49            array: messages,
50        }
51    }
52}
53
54impl<T: DeserializeOwned> MessageBatch<T> {
55    /// An array of messages in the batch. Ordering of messages is not guaranteed.
56    pub fn messages(&self) -> Result<Vec<Message<T>>> {
57        self.iter().collect()
58    }
59
60    /// Iterator for messages in the message batch. Ordering of messages is not guaranteed.
61    pub fn iter(&self) -> MessageIter<T> {
62        let messages = self.inner.messages().unwrap();
63        MessageIter {
64            range: 0..messages.length(),
65            array: messages,
66            marker: PhantomData,
67        }
68    }
69}
70
71impl<T> From<MessageBatchSys> for MessageBatch<T> {
72    fn from(value: MessageBatchSys) -> Self {
73        Self {
74            inner: value,
75            phantom: PhantomData,
76        }
77    }
78}
79
80/// A message that is sent to a consumer Worker.
81#[derive(Debug)]
82pub struct Message<T> {
83    inner: MessageSys,
84    body: T,
85}
86
87impl<T> Message<T> {
88    /// The body of the message.
89    pub fn body(&self) -> &T {
90        &self.body
91    }
92
93    /// The body of the message.
94    pub fn into_body(self) -> T {
95        self.body
96    }
97
98    /// The raw body of the message.
99    pub fn raw_body(&self) -> JsValue {
100        self.inner().body().unwrap()
101    }
102}
103
104impl<T> TryFrom<RawMessage> for Message<T>
105where
106    T: DeserializeOwned,
107{
108    type Error = Error;
109
110    fn try_from(value: RawMessage) -> std::result::Result<Self, Self::Error> {
111        let body = serde_wasm_bindgen::from_value(value.body())?;
112        Ok(Self {
113            inner: value.inner,
114            body,
115        })
116    }
117}
118
119/// A message that is sent to a consumer Worker.
120#[derive(Debug)]
121pub struct RawMessage {
122    inner: MessageSys,
123}
124
125impl RawMessage {
126    /// The body of the message.
127    pub fn body(&self) -> JsValue {
128        self.inner.body().unwrap()
129    }
130}
131
132impl From<MessageSys> for RawMessage {
133    fn from(value: MessageSys) -> Self {
134        Self { inner: value }
135    }
136}
137
138trait MessageSysInner {
139    fn inner(&self) -> &MessageSys;
140}
141
142impl MessageSysInner for RawMessage {
143    fn inner(&self) -> &MessageSys {
144        &self.inner
145    }
146}
147
148impl<T> MessageSysInner for Message<T> {
149    fn inner(&self) -> &MessageSys {
150        &self.inner
151    }
152}
153
154#[derive(Serialize)]
155#[serde(rename_all = "camelCase")]
156#[derive(Debug)]
157/// Optional configuration when marking a message or a batch of messages for retry.
158pub struct QueueRetryOptions {
159    delay_seconds: Option<u32>,
160}
161
162#[derive(Debug)]
163pub struct QueueRetryOptionsBuilder {
164    delay_seconds: Option<u32>,
165}
166
167impl QueueRetryOptionsBuilder {
168    /// Creates a new retry options builder.
169    pub fn new() -> Self {
170        Self {
171            delay_seconds: None,
172        }
173    }
174
175    #[must_use]
176    /// The number of seconds to delay a message for within the queue, before it can be delivered to a consumer
177    pub fn with_delay_seconds(mut self, delay_seconds: u32) -> Self {
178        self.delay_seconds = Some(delay_seconds);
179        self
180    }
181
182    /// Build the retry options.
183    pub fn build(self) -> QueueRetryOptions {
184        QueueRetryOptions {
185            delay_seconds: self.delay_seconds,
186        }
187    }
188}
189
190pub trait MessageExt {
191    /// A unique, system-generated ID for the message.
192    fn id(&self) -> String;
193
194    /// A timestamp when the message was sent.
195    fn timestamp(&self) -> Date;
196
197    /// Marks message to be retried.
198    fn retry(&self);
199
200    /// Marks message to be retried with options.
201    fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions);
202
203    /// Marks message acknowledged.
204    fn ack(&self);
205}
206
207impl<T: MessageSysInner> MessageExt for T {
208    /// A unique, system-generated ID for the message.
209    fn id(&self) -> String {
210        self.inner().id().unwrap().into()
211    }
212
213    /// A timestamp when the message was sent.
214    fn timestamp(&self) -> Date {
215        Date::from(self.inner().timestamp().unwrap())
216    }
217
218    /// Marks message to be retried.
219    fn retry(&self) {
220        self.inner().retry(JsValue::null()).unwrap();
221    }
222
223    /// Marks message to be retried with options.
224    fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions) {
225        self.inner()
226            // SAFETY: QueueRetryOptions is controlled by this module and all data in it is serializable to a js value.
227            .retry(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
228            .unwrap();
229    }
230
231    /// Marks message acknowledged.
232    fn ack(&self) {
233        self.inner().ack().unwrap();
234    }
235}
236
237#[derive(Debug)]
238pub struct MessageIter<T> {
239    range: std::ops::Range<u32>,
240    array: Array,
241    marker: PhantomData<T>,
242}
243
244impl<T> std::iter::Iterator for MessageIter<T>
245where
246    T: DeserializeOwned,
247{
248    type Item = Result<Message<T>>;
249
250    fn next(&mut self) -> Option<Self::Item> {
251        let index = self.range.next()?;
252        let value = self.array.get(index);
253        let raw_message = RawMessage::from(MessageSys::from(value));
254        Some(raw_message.try_into())
255    }
256
257    #[inline]
258    fn size_hint(&self) -> (usize, Option<usize>) {
259        self.range.size_hint()
260    }
261}
262
263impl<T> std::iter::DoubleEndedIterator for MessageIter<T>
264where
265    T: DeserializeOwned,
266{
267    fn next_back(&mut self) -> Option<Self::Item> {
268        let index = self.range.next_back()?;
269        let value = self.array.get(index);
270        let raw_message = RawMessage::from(MessageSys::from(value));
271        Some(raw_message.try_into())
272    }
273}
274
275impl<T> std::iter::FusedIterator for MessageIter<T> where T: DeserializeOwned {}
276
277impl<T> std::iter::ExactSizeIterator for MessageIter<T> where T: DeserializeOwned {}
278
279#[derive(Debug)]
280pub struct RawMessageIter {
281    range: std::ops::Range<u32>,
282    array: Array,
283}
284
285impl std::iter::Iterator for RawMessageIter {
286    type Item = RawMessage;
287
288    fn next(&mut self) -> Option<Self::Item> {
289        let index = self.range.next()?;
290        let value = self.array.get(index);
291        Some(MessageSys::from(value).into())
292    }
293
294    #[inline]
295    fn size_hint(&self) -> (usize, Option<usize>) {
296        self.range.size_hint()
297    }
298}
299
300impl std::iter::DoubleEndedIterator for RawMessageIter {
301    fn next_back(&mut self) -> Option<Self::Item> {
302        let index = self.range.next_back()?;
303        let value = self.array.get(index);
304        Some(MessageSys::from(value).into())
305    }
306}
307
308impl std::iter::FusedIterator for RawMessageIter {}
309
310impl std::iter::ExactSizeIterator for RawMessageIter {}
311
312#[derive(Debug, Clone)]
313pub struct Queue(EdgeQueue);
314
315unsafe impl Send for Queue {}
316unsafe impl Sync for Queue {}
317
318impl EnvBinding for Queue {
319    const TYPE_NAME: &'static str = "WorkerQueue";
320}
321
322impl JsCast for Queue {
323    fn instanceof(val: &JsValue) -> bool {
324        val.is_instance_of::<Queue>()
325    }
326
327    fn unchecked_from_js(val: JsValue) -> Self {
328        Self(val.into())
329    }
330
331    fn unchecked_from_js_ref(val: &JsValue) -> &Self {
332        unsafe { &*(val as *const JsValue as *const Self) }
333    }
334}
335
336impl From<Queue> for JsValue {
337    fn from(queue: Queue) -> Self {
338        JsValue::from(queue.0)
339    }
340}
341
342impl AsRef<JsValue> for Queue {
343    fn as_ref(&self) -> &JsValue {
344        &self.0
345    }
346}
347
348#[derive(Clone, Copy, Debug)]
349pub enum QueueContentType {
350    /// Send a JavaScript object that can be JSON-serialized. This content type can be previewed from the Cloudflare dashboard.
351    Json,
352    /// Send a String. This content type can be previewed with the List messages from the dashboard feature.
353    Text,
354    /// Send a JavaScript object that cannot be JSON-serialized but is supported by structured clone (for example Date and Map). This content type cannot be previewed from the Cloudflare dashboard and will display as Base64-encoded.
355    V8,
356}
357
358impl Serialize for QueueContentType {
359    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
360    where
361        S: serde::Serializer,
362    {
363        serializer.serialize_str(match self {
364            Self::Json => "json",
365            Self::Text => "text",
366            Self::V8 => "v8",
367        })
368    }
369}
370
371#[derive(Debug, Serialize)]
372#[serde(rename_all = "camelCase")]
373pub struct QueueSendOptions {
374    content_type: Option<QueueContentType>,
375    delay_seconds: Option<u32>,
376}
377
378#[derive(Debug)]
379pub struct MessageBuilder<T> {
380    message: T,
381    delay_seconds: Option<u32>,
382    content_type: QueueContentType,
383}
384
385impl<T: Serialize> MessageBuilder<T> {
386    /// Creates a new message builder. The message must be `serializable`.
387    pub fn new(message: T) -> Self {
388        Self {
389            message,
390            delay_seconds: None,
391            content_type: QueueContentType::Json,
392        }
393    }
394
395    #[must_use]
396    /// The number of seconds to delay a message for within the queue, before it can be delivered to a consumer
397    pub fn delay_seconds(mut self, delay_seconds: u32) -> Self {
398        self.delay_seconds = Some(delay_seconds);
399        self
400    }
401
402    #[must_use]
403    /// The content type of the message.
404    /// Default is `QueueContentType::Json`.
405    pub fn content_type(mut self, content_type: QueueContentType) -> Self {
406        self.content_type = content_type;
407        self
408    }
409
410    /// Build the message.
411    pub fn build(self) -> SendMessage<T> {
412        SendMessage {
413            message: self.message,
414            options: Some(QueueSendOptions {
415                content_type: Some(self.content_type),
416                delay_seconds: self.delay_seconds,
417            }),
418        }
419    }
420}
421
422#[derive(Debug)]
423pub struct RawMessageBuilder {
424    message: JsValue,
425    delay_seconds: Option<u32>,
426}
427
428impl RawMessageBuilder {
429    /// Creates a new raw message builder. The message must be a `JsValue`.
430    pub fn new(message: JsValue) -> Self {
431        Self {
432            message,
433            delay_seconds: None,
434        }
435    }
436
437    #[must_use]
438    /// The number of seconds to delay a message for within the queue, before it can be delivered to a consumer
439    pub fn delay_seconds(mut self, delay_seconds: u32) -> Self {
440        self.delay_seconds = Some(delay_seconds);
441        self
442    }
443
444    /// Build the message with a content type.
445    pub fn build_with_content_type(self, content_type: QueueContentType) -> SendMessage<JsValue> {
446        SendMessage {
447            message: self.message,
448            options: Some(QueueSendOptions {
449                content_type: Some(content_type),
450                delay_seconds: self.delay_seconds,
451            }),
452        }
453    }
454}
455
456/// A wrapper type used for sending message.
457///
458/// This type can't be constructed directly.
459///
460/// It should be constructed using the `MessageBuilder`, `RawMessageBuilder` or by calling `.into()` on a struct that is `serializable`.
461#[derive(Debug)]
462pub struct SendMessage<T> {
463    /// The body of the message.
464    ///
465    /// Can be either a serializable struct or a `JsValue`.
466    message: T,
467
468    /// Options to apply to the current message, including content type and message delay settings.
469    options: Option<QueueSendOptions>,
470}
471
472impl<T: Serialize> SendMessage<T> {
473    fn into_raw_send_message(self) -> Result<SendMessage<JsValue>> {
474        Ok(SendMessage {
475            message: serde_wasm_bindgen::to_value(&self.message)?,
476            options: self.options,
477        })
478    }
479}
480
481impl<T: Serialize> From<T> for SendMessage<T> {
482    fn from(message: T) -> Self {
483        Self {
484            message,
485            options: Some(QueueSendOptions {
486                content_type: Some(QueueContentType::Json),
487                delay_seconds: None,
488            }),
489        }
490    }
491}
492
493#[derive(Debug)]
494pub struct BatchSendMessage<T> {
495    body: Vec<SendMessage<T>>,
496    options: Option<QueueSendBatchOptions>,
497}
498
499#[derive(Debug, Serialize)]
500#[serde(rename_all = "camelCase")]
501pub struct QueueSendBatchOptions {
502    delay_seconds: Option<u32>,
503}
504
505#[derive(Debug)]
506pub struct BatchMessageBuilder<T> {
507    messages: Vec<SendMessage<T>>,
508    delay_seconds: Option<u32>,
509}
510
511impl<T> BatchMessageBuilder<T> {
512    /// Creates a new batch message builder.
513    pub fn new() -> Self {
514        Self {
515            messages: Vec::new(),
516            delay_seconds: None,
517        }
518    }
519
520    #[must_use]
521    /// Adds a message to the batch.
522    pub fn message<U: Into<SendMessage<T>>>(mut self, message: U) -> Self {
523        self.messages.push(message.into());
524        self
525    }
526
527    #[must_use]
528    /// Adds messages to the batch.
529    pub fn messages<U, V>(mut self, messages: U) -> Self
530    where
531        U: IntoIterator<Item = V>,
532        V: Into<SendMessage<T>>,
533    {
534        self.messages
535            .extend(messages.into_iter().map(std::convert::Into::into));
536        self
537    }
538
539    #[must_use]
540    /// The number of seconds to delay a message for within the queue, before it can be delivered to a consumer
541    pub fn delay_seconds(mut self, delay_seconds: u32) -> Self {
542        self.delay_seconds = Some(delay_seconds);
543        self
544    }
545
546    pub fn build(self) -> BatchSendMessage<T> {
547        BatchSendMessage {
548            body: self.messages,
549            options: self
550                .delay_seconds
551                .map(|delay_seconds| QueueSendBatchOptions {
552                    delay_seconds: Some(delay_seconds),
553                }),
554        }
555    }
556}
557
558impl<T, U, V> From<U> for BatchSendMessage<T>
559where
560    U: IntoIterator<Item = V>,
561    V: Into<SendMessage<T>>,
562{
563    fn from(value: U) -> Self {
564        Self {
565            body: value.into_iter().map(std::convert::Into::into).collect(),
566            options: None,
567        }
568    }
569}
570
571impl<T: Serialize> BatchSendMessage<T> {
572    fn into_raw_batch_send_message(self) -> Result<BatchSendMessage<JsValue>> {
573        Ok(BatchSendMessage {
574            body: self
575                .body
576                .into_iter()
577                .map(SendMessage::into_raw_send_message)
578                .collect::<Result<_>>()?,
579            options: self.options,
580        })
581    }
582}
583
584impl Queue {
585    /// Sends a message to the Queue.
586    ///
587    /// Accepts a struct that is `serializable`.
588    ///
589    /// If message options are needed use the `MessageBuilder` to create the message.
590    ///
591    /// ## Example
592    /// ```no_run
593    /// #[derive(Serialize)]
594    /// pub struct MyMessage {
595    ///     my_data: u32,
596    /// }
597    ///
598    /// queue.send(MyMessage{ my_data: 1}).await?;
599    /// ```
600    pub async fn send<T, U: Into<SendMessage<T>>>(&self, message: U) -> Result<()>
601    where
602        T: Serialize,
603    {
604        let message: SendMessage<T> = message.into();
605        let serialized_message = message.into_raw_send_message()?;
606        self.send_raw(serialized_message).await
607    }
608
609    /// Sends a raw `JsValue` to the Queue.
610    ///
611    /// Use the `RawMessageBuilder` to create the message.
612    pub async fn send_raw<T: Into<SendMessage<JsValue>>>(&self, message: T) -> Result<()> {
613        let message: SendMessage<JsValue> = message.into();
614        let options = match message.options {
615            Some(options) => serde_wasm_bindgen::to_value(&options)?,
616            None => JsValue::null(),
617        };
618
619        let fut: JsFuture = self.0.send(message.message, options)?.into();
620        fut.await.map_err(Error::from)?;
621        Ok(())
622    }
623
624    /// Sends a batch of messages to the Queue.
625    ///
626    /// Accepts an iterator that produces structs that are `serializable`.
627    ///
628    /// If message options are needed use the `BatchMessageBuilder` to create the batch.
629    ///
630    /// ## Example
631    /// ```no_run
632    /// #[derive(Serialize)]
633    /// pub struct MyMessage {
634    ///     my_data: u32,
635    /// }
636    ///
637    /// queue.send_batch(vec![MyMessage{ my_data: 1}]).await?;
638    /// ```
639    pub async fn send_batch<T: Serialize, U: Into<BatchSendMessage<T>>>(
640        &self,
641        messages: U,
642    ) -> Result<()> {
643        let messages: BatchSendMessage<T> = messages.into();
644        let serialized_messages = messages.into_raw_batch_send_message()?;
645        self.send_raw_batch(serialized_messages).await
646    }