1use std::{
2 convert::{TryFrom, TryInto},
3 marker::PhantomData,
4};
5
6use crate::{env::EnvBinding, Date, Error, Result};
7use js_sys::Array;
8use serde::{de::DeserializeOwned, Serialize};
9use wasm_bindgen::{prelude::*, JsCast};
10use wasm_bindgen_futures::JsFuture;
11use worker_sys::{Message as MessageSys, MessageBatch as MessageBatchSys, Queue as EdgeQueue};
12
13#[derive(Debug)]
15pub struct MessageBatch<T> {
16 inner: MessageBatchSys,
17 phantom: PhantomData<T>,
18}
19
20impl<T> MessageBatch<T> {
21 pub fn queue(&self) -> String {
23 self.inner.queue().unwrap().into()
24 }
25
26 pub fn retry_all(&self) {
28 self.inner.retry_all(JsValue::null()).unwrap();
29 }
30
31 pub fn retry_all_with_options(&self, queue_retry_options: &QueueRetryOptions) {
33 self.inner
34 .retry_all(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
36 .unwrap();
37 }
38
39 pub fn ack_all(&self) {
41 self.inner.ack_all().unwrap();
42 }
43
44 pub fn raw_iter(&self) -> RawMessageIter {
46 let messages = self.inner.messages().unwrap();
47 RawMessageIter {
48 range: 0..messages.length(),
49 array: messages,
50 }
51 }
52}
53
54impl<T: DeserializeOwned> MessageBatch<T> {
55 pub fn messages(&self) -> Result<Vec<Message<T>>> {
57 self.iter().collect()
58 }
59
60 pub fn iter(&self) -> MessageIter<T> {
62 let messages = self.inner.messages().unwrap();
63 MessageIter {
64 range: 0..messages.length(),
65 array: messages,
66 marker: PhantomData,
67 }
68 }
69}
70
71impl<T> From<MessageBatchSys> for MessageBatch<T> {
72 fn from(value: MessageBatchSys) -> Self {
73 Self {
74 inner: value,
75 phantom: PhantomData,
76 }
77 }
78}
79
80#[derive(Debug)]
82pub struct Message<T> {
83 inner: MessageSys,
84 body: T,
85}
86
87impl<T> Message<T> {
88 pub fn body(&self) -> &T {
90 &self.body
91 }
92
93 pub fn into_body(self) -> T {
95 self.body
96 }
97
98 pub fn raw_body(&self) -> JsValue {
100 self.inner().body().unwrap()
101 }
102}
103
104impl<T> TryFrom<RawMessage> for Message<T>
105where
106 T: DeserializeOwned,
107{
108 type Error = Error;
109
110 fn try_from(value: RawMessage) -> std::result::Result<Self, Self::Error> {
111 let body = serde_wasm_bindgen::from_value(value.body())?;
112 Ok(Self {
113 inner: value.inner,
114 body,
115 })
116 }
117}
118
119#[derive(Debug)]
121pub struct RawMessage {
122 inner: MessageSys,
123}
124
125impl RawMessage {
126 pub fn body(&self) -> JsValue {
128 self.inner.body().unwrap()
129 }
130}
131
132impl From<MessageSys> for RawMessage {
133 fn from(value: MessageSys) -> Self {
134 Self { inner: value }
135 }
136}
137
138trait MessageSysInner {
139 fn inner(&self) -> &MessageSys;
140}
141
142impl MessageSysInner for RawMessage {
143 fn inner(&self) -> &MessageSys {
144 &self.inner
145 }
146}
147
148impl<T> MessageSysInner for Message<T> {
149 fn inner(&self) -> &MessageSys {
150 &self.inner
151 }
152}
153
154#[derive(Serialize)]
155#[serde(rename_all = "camelCase")]
156#[derive(Debug)]
157pub struct QueueRetryOptions {
159 delay_seconds: Option<u32>,
160}
161
162#[derive(Debug)]
163pub struct QueueRetryOptionsBuilder {
164 delay_seconds: Option<u32>,
165}
166
167impl QueueRetryOptionsBuilder {
168 pub fn new() -> Self {
170 Self {
171 delay_seconds: None,
172 }
173 }
174
175 #[must_use]
176 pub fn with_delay_seconds(mut self, delay_seconds: u32) -> Self {
178 self.delay_seconds = Some(delay_seconds);
179 self
180 }
181
182 pub fn build(self) -> QueueRetryOptions {
184 QueueRetryOptions {
185 delay_seconds: self.delay_seconds,
186 }
187 }
188}
189
190pub trait MessageExt {
191 fn id(&self) -> String;
193
194 fn timestamp(&self) -> Date;
196
197 fn retry(&self);
199
200 fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions);
202
203 fn ack(&self);
205}
206