1use std::{
2 convert::{TryFrom, TryInto},
3 marker::PhantomData,
4};
5
6use crate::{env::EnvBinding, Date, Error, Result};
7use js_sys::Array;
8use serde::{de::DeserializeOwned, Serialize};
9use wasm_bindgen::{prelude::*, JsCast};
10use wasm_bindgen_futures::JsFuture;
11use worker_sys::{Message as MessageSys, MessageBatch as MessageBatchSys, Queue as EdgeQueue};
12
13#[derive(Debug)]
15pub struct MessageBatch<T> {
16 inner: MessageBatchSys,
17 phantom: PhantomData<T>,
18}
19
20impl<T> MessageBatch<T> {
21 pub fn queue(&self) -> String {
23 self.inner.queue().unwrap().into()
24 }
25
26 pub fn retry_all(&self) {
28 self.inner.retry_all(JsValue::null()).unwrap();
29 }
30
31 pub fn retry_all_with_options(&self, queue_retry_options: &QueueRetryOptions) {
33 self.inner
34 .retry_all(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
36 .unwrap();
37 }
38
39 pub fn ack_all(&self) {
41 self.inner.ack_all().unwrap();
42 }
43
44 pub fn raw_iter(&self) -> RawMessageIter {
46 let messages = self.inner.messages().unwrap();
47 RawMessageIter {
48 range: 0..messages.length(),
49 array: messages,
50 }
51 }
52}
53
54impl<T: DeserializeOwned> MessageBatch<T> {
55 pub fn messages(&self) -> Result<Vec<Message<T>>> {
57 self.iter().collect()
58 }
59
60 pub fn iter(&self) -> MessageIter<T> {
62 let messages = self.inner.messages().unwrap();
63 MessageIter {
64 range: 0..messages.length(),
65 array: messages,
66 marker: PhantomData,
67 }
68 }
69}
70
71impl<T> From<MessageBatchSys> for MessageBatch<T> {
72 fn from(value: MessageBatchSys) -> Self {
73 Self {
74 inner: value,
75 phantom: PhantomData,
76 }
77 }
78}
79
80#[derive(Debug)]
82pub struct Message<T> {
83 inner: MessageSys,
84 body: T,
85}
86
87impl<T> Message<T> {
88 pub fn body(&self) -> &T {
90 &self.body
91 }
92
93 pub fn into_body(self) -> T {
95 self.body
96 }
97
98 pub fn raw_body(&self) -> JsValue {
100 self.inner().body().unwrap()
101 }
102}
103
104impl<T> TryFrom<RawMessage> for Message<T>
105where
106 T: DeserializeOwned,
107{
108 type Error = Error;
109
110 fn try_from(value: RawMessage) -> std::result::Result<Self, Self::Error> {
111 let body = serde_wasm_bindgen::from_value(value.body())?;
112 Ok(Self {
113 inner: value.inner,
114 body,
115 })
116 }
117}
118
119#[derive(Debug)]
121pub struct RawMessage {
122 inner: MessageSys,
123}
124
125impl RawMessage {
126 pub fn body(&self) -> JsValue {
128 self.inner.body().unwrap()
129 }
130}
131
132impl From<MessageSys> for RawMessage {
133 fn from(value: MessageSys) -> Self {
134 Self { inner: value }
135 }
136}
137
138trait MessageSysInner {
139 fn inner(&self) -> &MessageSys;
140}
141
142impl MessageSysInner for RawMessage {
143 fn inner(&self) -> &MessageSys {
144 &self.inner
145 }
146}
147
148impl<T> MessageSysInner for Message<T> {
149 fn inner(&self) -> &MessageSys {
150 &self.inner
151 }
152}
153
154#[derive(Serialize)]
155#[serde(rename_all = "camelCase")]
156#[derive(Debug)]
157pub struct QueueRetryOptions {
159 delay_seconds: Option<u32>,
160}
161
162#[derive(Debug)]
163pub struct QueueRetryOptionsBuilder {
164 delay_seconds: Option<u32>,
165}
166
167impl QueueRetryOptionsBuilder {
168 pub fn new() -> Self {
170 Self {
171 delay_seconds: None,
172 }
173 }
174
175 #[must_use]
176 pub fn with_delay_seconds(mut self, delay_seconds: u32) -> Self {
178 self.delay_seconds = Some(delay_seconds);
179 self
180 }
181
182 pub fn build(self) -> QueueRetryOptions {
184 QueueRetryOptions {
185 delay_seconds: self.delay_seconds,
186 }
187 }
188}
189
190pub trait MessageExt {
191 fn id(&self) -> String;
193
194 fn timestamp(&self) -> Date;
196
197 fn retry(&self);
199
200 fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions);
202
203 fn ack(&self);
205}
206
207impl<T: MessageSysInner> MessageExt for T {
208 fn id(&self) -> String {
210 self.inner().id().unwrap().into()
211 }
212
213 fn timestamp(&self) -> Date {
215 Date::from(self.inner().timestamp().unwrap())
216 }
217
218 fn retry(&self) {
220 self.inner().retry(JsValue::null()).unwrap();
221 }
222
223 fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions) {
225 self.inner()
226 .retry(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
228 .unwrap();
229 }
230
231 fn ack(&self) {
233 self.inner().ack().unwrap();
234 }
235}
236
237#[derive(Debug)]
238pub struct MessageIter<T> {
239 range: std::ops::Range<u32>,
240 array: Array,
241 marker: PhantomData<T>,
242}
243
244impl<T> std::iter::Iterator for MessageIter<T>
245where
246 T: DeserializeOwned,
247{
248 type Item = Result<Message<T>>;
249
250 fn next(&mut self) -> Option<Self::Item> {
251 let index = self.range.next()?;
252 let value = self.array.get(index);
253 let raw_message = RawMessage::from(MessageSys::from(value));
254 Some(raw_message.try_into())
255 }
256
257 #[inline]
258 fn size_hint(&self) -> (usize, Option<usize>) {
259 self.range.size_hint()
260 }
261}
262
263impl<T> std::iter::DoubleEndedIterator for MessageIter<T>
264where
265 T: DeserializeOwned,
266{
267 fn next_back(&mut self) -> Option<Self::Item> {
268 let index = self.range.next_back()?;
269 let value = self.array.get(index);
270 let raw_message = RawMessage::from(MessageSys::from(value));
271 Some(raw_message.try_into())
272 }
273}
274
275impl<T> std::iter::FusedIterator for MessageIter<T> where T: DeserializeOwned {}
276
277impl<T> std::iter::ExactSizeIterator for MessageIter<T> where T: DeserializeOwned {}
278
279#[derive(Debug)]
280pub struct RawMessageIter {
281 range: std::ops::Range<u32>,
282 array: Array,
283}
284
285impl std::iter::Iterator for RawMessageIter {
286 type Item = RawMessage;
287
288 fn next(&mut self) -> Option<Self::Item> {
289 let index = self.range.next()?;
290 let value = self.array.get(index);
291 Some(MessageSys::from(value).into())
292 }
293
294 #[inline]
295 fn size_hint(&self) -> (usize, Option<usize>) {
296 self.range.size_hint()
297 }
298}
299
300impl std::iter::DoubleEndedIterator for RawMessageIter {
301 fn next_back(&mut self) -> Option<Self::Item> {
302 let index = self.range.next_back()?;
303 let value = self.array.get(index);
304 Some(MessageSys::from(value).into())
305 }
306}
307
308impl std::iter::FusedIterator for RawMessageIter {}
309
310impl std::iter::ExactSizeIterator for RawMessageIter {}
311
312#[derive(Debug, Clone)]
313pub struct Queue(EdgeQueue);
314
315unsafe impl Send for Queue {}
316unsafe impl Sync for Queue {}
317
318impl EnvBinding for Queue {
319 const TYPE_NAME: &'static str = "WorkerQueue";
320}
321
322impl JsCast for Queue {
323 fn instanceof(val: &JsValue) -> bool {
324 val.is_instance_of::<Queue>()
325 }
326
327 fn unchecked_from_js(val: JsValue) -> Self {
328 Self(val.into())
329 }
330
331 fn unchecked_from_js_ref(val: &JsValue) -> &Self {
332 unsafe { &*(val as *const JsValue as *const Self) }
333 }
334}
335
336impl From<Queue> for JsValue {
337 fn from(queue: Queue) -> Self {
338 JsValue::from(queue.0)
339 }
340}
341
342impl AsRef<JsValue> for Queue {
343 fn as_ref(&self) -> &JsValue {
344 &self.0
345 }
346}
347
348#[derive(Clone, Copy, Debug)]
349pub enum QueueContentType {
350 Json,
352 Text,
354 V8,
356}
357
358impl Serialize for QueueContentType {
359 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
360 where
361 S: serde::Serializer,
362 {
363 serializer.serialize_str(match self {
364 Self::Json => "json",
365 Self::Text => "text",
366 Self::V8 => "v8",
367 })
368 }
369}
370
371#[derive(Debug, Serialize)]
372#[serde(rename_all = "camelCase")]
373pub struct QueueSendOptions {
374 content_type: Option<QueueContentType>,
375 delay_seconds: Option<u32>,
376}
377
378#[derive(Debug)]
379pub struct MessageBuilder<T> {
380 message: T,
381 delay_seconds: Option<u32>,
382 content_type: QueueContentType,
383}
384
385impl<T: Serialize> MessageBuilder<T> {
386 pub fn new(message: T) -> Self {
388 Self {
389 message,
390 delay_seconds: None,
391 content_type: QueueContentType::Json,