worker/
queue.rs

1use std::{
2    convert::{TryFrom, TryInto},
3    marker::PhantomData,
4};
5
6use crate::{env::EnvBinding, Date, Error, Result};
7use js_sys::Array;
8use serde::{de::DeserializeOwned, Serialize};
9use wasm_bindgen::{prelude::*, JsCast};
10use wasm_bindgen_futures::JsFuture;
11use worker_sys::{Message as MessageSys, MessageBatch as MessageBatchSys, Queue as EdgeQueue};
12
13/// A batch of messages that are sent to a consumer Worker.
14#[derive(Debug)]
15pub struct MessageBatch<T> {
16    inner: MessageBatchSys,
17    phantom: PhantomData<T>,
18}
19
20impl<T> MessageBatch<T> {
21    /// The name of the Queue that belongs to this batch.
22    pub fn queue(&self) -> String {
23        self.inner.queue().unwrap().into()
24    }
25
26    /// Marks every message to be retried in the next batch.
27    pub fn retry_all(&self) {
28        self.inner.retry_all(JsValue::null()).unwrap();
29    }
30
31    /// Marks every message to be retried in the next batch with options.
32    pub fn retry_all_with_options(&self, queue_retry_options: &QueueRetryOptions) {
33        self.inner
34            // SAFETY: QueueRetryOptions is controlled by this module and all data in it is serializable to a js value.
35            .retry_all(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
36            .unwrap();
37    }
38
39    /// Marks every message acknowledged in the batch.
40    pub fn ack_all(&self) {
41        self.inner.ack_all().unwrap();
42    }
43
44    /// Iterator for raw messages in the message batch. Ordering of messages is not guaranteed.
45    pub fn raw_iter(&self) -> RawMessageIter {
46        let messages = self.inner.messages().unwrap();
47        RawMessageIter {
48            range: 0..messages.length(),
49            array: messages,
50        }
51    }
52}
53
54impl<T: DeserializeOwned> MessageBatch<T> {
55    /// An array of messages in the batch. Ordering of messages is not guaranteed.
56    pub fn messages(&self) -> Result<Vec<Message<T>>> {
57        self.iter().collect()
58    }
59
60    /// Iterator for messages in the message batch. Ordering of messages is not guaranteed.
61    pub fn iter(&self) -> MessageIter<T> {
62        let messages = self.inner.messages().unwrap();
63        MessageIter {
64            range: 0..messages.length(),
65            array: messages,
66            marker: PhantomData,
67        }
68    }
69}
70
71impl<T> From<MessageBatchSys> for MessageBatch<T> {
72    fn from(value: MessageBatchSys) -> Self {
73        Self {
74            inner: value,
75            phantom: PhantomData,
76        }
77    }
78}
79
80/// A message that is sent to a consumer Worker.
81#[derive(Debug)]
82pub struct Message<T> {
83    inner: MessageSys,
84    body: T,
85}
86