worker/
queue.rs

1use std::{
2    convert::{TryFrom, TryInto},
3    marker::PhantomData,
4};
5
6use crate::{env::EnvBinding, Date, Error, Result};
7use js_sys::Array;
8use serde::{de::DeserializeOwned, Serialize};
9use wasm_bindgen::{prelude::*, JsCast};
10use wasm_bindgen_futures::JsFuture;
11use worker_sys::{Message as MessageSys, MessageBatch as MessageBatchSys, Queue as EdgeQueue};
12
13/// A batch of messages that are sent to a consumer Worker.
14#[derive(Debug)]
15pub struct MessageBatch<T> {
16    inner: MessageBatchSys,
17    phantom: PhantomData<T>,
18}
19
20impl<T> MessageBatch<T> {
21    /// The name of the Queue that belongs to this batch.
22    pub fn queue(&self) -> String {
23        self.inner.queue().unwrap().into()
24    }
25
26    /// Marks every message to be retried in the next batch.
27    pub fn retry_all(&self) {
28        self.inner.retry_all(JsValue::null()).unwrap();
29    }
30
31    /// Marks every message to be retried in the next batch with options.
32    pub fn retry_all_with_options(&self, queue_retry_options: &QueueRetryOptions) {
33        self.inner
34            // SAFETY: QueueRetryOptions is controlled by this module and all data in it is serializable to a js value.
35            .retry_all(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
36            .unwrap();
37    }
38
39    /// Marks every message acknowledged in the batch.
40    pub fn ack_all(&self) {
41        self.inner.ack_all().unwrap();
42    }
43
44    /// Iterator for raw messages in the message batch. Ordering of messages is not guaranteed.
45    pub fn raw_iter(&self) -> RawMessageIter {
46        let messages = self.inner.messages().unwrap();
47        RawMessageIter {
48            range: 0..messages.length(),
49            array: messages,
50        }
51    }
52}
53
54impl<T: DeserializeOwned> MessageBatch<T> {
55    /// An array of messages in the batch. Ordering of messages is not guaranteed.
56    pub fn messages(&self) -> Result<Vec<Message<T>>> {
57        self.iter().collect()
58    }
59
60    /// Iterator for messages in the message batch. Ordering of messages is not guaranteed.
61    pub fn iter(&self) -> MessageIter<T> {
62        let messages = self.inner.messages().unwrap();
63        MessageIter {
64            range: 0..messages.length(),
65            array: messages,
66            marker: PhantomData,
67        }
68    }
69}
70
71impl<T> From<MessageBatchSys> for MessageBatch<T> {
72    fn from(value: MessageBatchSys) -> Self {
73        Self {
74            inner: value,
75            phantom: PhantomData,
76        }
77    }
78}
79
80/// A message that is sent to a consumer Worker.
81#[derive(Debug)]
82pub struct Message<T> {
83    inner: MessageSys,
84    body: T,
85}
86
87impl<T> Message<T> {
88    /// The body of the message.
89    pub fn body(&self) -> &T {
90        &self.body
91    }
92
93    /// The body of the message.
94    pub fn into_body(self) -> T {
95        self.body
96    }
97
98    /// The raw body of the message.
99    pub fn raw_body(&self) -> JsValue {
100        self.inner().body().unwrap()
101    }
102}
103
104impl<T> TryFrom<RawMessage> for Message<T>
105where
106    T: DeserializeOwned,
107{
108    type Error = Error;
109
110    fn try_from(value: RawMessage) -> std::result::Result<Self, Self::Error> {
111        let body = serde_wasm_bindgen::from_value(value.body())?;
112        Ok(Self {
113            inner: value.inner,
114            body,
115        })
116    }
117}
118
119/// A message that is sent to a consumer Worker.
120#[derive(Debug)]
121pub struct RawMessage {