1use std::{
2 convert::{TryFrom, TryInto},
3 marker::PhantomData,
4};
5
6use crate::{env::EnvBinding, Date, Error, Result};
7use js_sys::Array;
8use serde::{de::DeserializeOwned, Serialize};
9use wasm_bindgen::{prelude::*, JsCast};
10use wasm_bindgen_futures::JsFuture;
11use worker_sys::{Message as MessageSys, MessageBatch as MessageBatchSys, Queue as EdgeQueue};
12
13#[derive(Debug)]
15pub struct MessageBatch<T> {
16 inner: MessageBatchSys,
17 phantom: PhantomData<T>,
18}
19
20impl<T> MessageBatch<T> {
21 pub fn queue(&self) -> String {
23 self.inner.queue().unwrap().into()
24 }
25
26 pub fn retry_all(&self) {
28 self.inner.retry_all(JsValue::null()).unwrap();
29 }
30
31 pub fn retry_all_with_options(&self, queue_retry_options: &QueueRetryOptions) {
33 self.inner
34 .retry_all(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
36 .unwrap();
37 }
38
39 pub fn ack_all(&self) {
41 self.inner.ack_all().unwrap();
42 }
43
44 pub fn raw_iter(&self) -> RawMessageIter {
46 let messages = self.inner.messages().unwrap();
47 RawMessageIter {
48 range: 0..messages.length(),
49 array: messages,
50 }
51 }
52}
53
54impl<T: DeserializeOwned> MessageBatch<T> {
55 pub fn messages(&self) -> Result<Vec<Message<T>>> {
57 self.iter().collect()
58 }
59
60 pub fn iter(&self) -> MessageIter<T> {
62 let messages = self.inner.messages().unwrap();
63 MessageIter {
64 range: 0..messages.length(),
65 array: messages,
66 marker: PhantomData,
67 }
68 }
69}
70
71impl<T> From<MessageBatchSys> for MessageBatch<T> {
72 fn from(value: MessageBatchSys) -> Self {
73 Self {
74 inner: value,
75 phantom: PhantomData,
76 }
77 }
78}
79
80#[derive(Debug)]
82pub struct Message<T> {
83 inner: MessageSys,
84 body: T,
85}
86
87impl<T> Message<T> {
88 pub fn body(&self) -> &T {
90 &self.body
91 }
92
93 pub fn into_body(self) -> T {
95 self.body
96 }
97
98 pub fn raw_body(&self) -> JsValue {
100 self.inner().body().unwrap()
101 }
102}
103
104impl<T> TryFrom<RawMessage> for Message<T>
105where
106 T: DeserializeOwned,
107{
108 type Error = Error;
109
110 fn try_from(value: RawMessage) -> std::result::Result<Self, Self::Error> {
111 let body = serde_wasm_bindgen::from_value(value.body())?;
112 Ok(Self {
113 inner: value.inner,
114 body,
115 })
116 }
117}
118
119#[derive(Debug)]
121pub struct RawMessage {
122 inner: MessageSys,
123}
124
125impl RawMessage {
126 pub fn body(&self) -> JsValue {
128 self.inner.body().unwrap()
129 }
130}
131
132impl From<MessageSys> for RawMessage {
133 fn from(value: MessageSys) -> Self {
134 Self { inner: value }
135 }
136}
137
138trait MessageSysInner {
139 fn inner(&self) -> &MessageSys;
140}
141
142impl MessageSysInner for RawMessage {
143 fn inner(&self) -> &MessageSys {
144 &self.inner
145 }
146}
147
148impl<T> MessageSysInner for Message<T> {
149 fn inner(&self) -> &MessageSys {
150 &self.inner
151 }
152}
153
154#[derive(Serialize)]
155#[serde(rename_all = "camelCase")]
156#[derive(Debug)]
157pub struct QueueRetryOptions {
159 delay_seconds: Option<u32>,
160}
161
162#[derive(Debug)]
163pub struct QueueRetryOptionsBuilder {
164 delay_seconds: Option<u32>,
165}
166
167impl QueueRetryOptionsBuilder {
168 pub fn new() -> Self {
170 Self {
171 delay_seconds: None,
172 }
173 }
174
175 #[must_use]
176 pub fn with_delay_seconds(mut self, delay_seconds: u32) -> Self {
178 self.delay_seconds = Some(delay_seconds);
179 self
180 }
181
182 pub fn build(self) -> QueueRetryOptions {
184 QueueRetryOptions {
185 delay_seconds: self.delay_seconds,
186 }
187 }
188}
189
190pub trait MessageExt {
191 fn id(&self) -> String;
193
194 fn timestamp(&self) -> Date;
196
197 fn retry(&self);
199
200 fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions);
202
203 fn ack(&self);
205}
206
207impl<T: MessageSysInner> MessageExt for T {
208 fn id(&self) -> String {
210 self.inner().id().unwrap().into()
211 }
212
213 fn timestamp(&self) -> Date {
215 Date::from(self.inner().timestamp().unwrap())
216 }
217
218 fn retry(&self) {
220 self.inner().retry(JsValue::null()).unwrap();
221 }
222
223 fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions) {
225 self.inner()
226 .retry(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
228 .unwrap();
229 }
230
231 fn ack(&self) {
233 self.inner().ack().unwrap();
234 }
235}
236
237#[derive(Debug)]
238pub struct MessageIter<T> {
239 range: std::ops::Range<u32>,
240 array: Array,
241 marker: PhantomData<T>,
242}
243
244impl<T> std::iter::Iterator for MessageIter<T>
245where
246 T: DeserializeOwned,
247{
248 type Item = Result<Message<T>>;
249
250 fn next(&mut self) -> Option<Self::Item> {
251 let index = self.range.next()?;
252 let value = self.array.get(index);
253 let raw_message = RawMessage::from(MessageSys::from(value));
254 Some(raw_message.try_into())
255 }
256
257 #[inline]
258 fn size_hint(&self) -> (usize, Option<usize>) {
259 self.range.size_hint()
260 }
261}
262
263impl<T> std::iter::DoubleEndedIterator for MessageIter<T>
264where
265 T: DeserializeOwned,
266{
267 fn next_back(&mut self) -> Option<Self::Item> {
268 let index = self.range.next_back()?;
269 let value = self.array.get(index);
270 let raw_message = RawMessage::from(MessageSys::from(value));
271 Some(raw_message.try_into())
272 }
273}
274
275impl<T> std::iter::FusedIterator for MessageIter<T> where T: DeserializeOwned {}
276
277impl<T> std::iter::ExactSizeIterator for MessageIter<T> where T: DeserializeOwned {}
278
279#[derive(Debug)]
280pub struct RawMessageIter {
281 range: std::ops::Range<u32>,
282 array: Array,
283}
284
285impl std::iter::Iterator for RawMessageIter {
286 type Item = RawMessage;
287
288 fn next(&mut self) -> Option<Self::Item> {
289 let index = self.range.next()?;
290 let value = self.array.get(index);
291 Some(MessageSys::from(value).into())
292 }
293
294 #[inline]
295 fn size_hint(&self) -> (usize, Option<usize>) {
296 self.range.size_hint()
297 }
298}
299
300impl std::iter::DoubleEndedIterator for RawMessageIter {
301 fn next_back(&mut self) -> Option<Self::Item> {
302 let index = self.range.next_back()?;
303 let value = self.array.get(index);
304 Some(MessageSys::from(value).into())
305 }
306}
307
308impl std::iter::FusedIterator for RawMessageIter {}
309
310impl std::iter::ExactSizeIterator for RawMessageIter {}
311
312#[derive(Debug, Clone)]
313pub struct Queue(EdgeQueue);
314
315unsafe impl Send for Queue {}
316unsafe impl Sync for Queue {}
317
318impl EnvBinding for Queue {
319 const TYPE_NAME: &'static str = "WorkerQueue";
320}
321
322impl JsCast for Queue {
323 fn instanceof(val: &JsValue) -> bool {
324 val.is_instance_of::<Queue>()
325 }
326
327 fn unchecked_from_js(val: JsValue) -> Self {
328 Self(val.into())
329 }
330
331 fn unchecked_from_js_ref(val: &JsValue) -> &Self {
332 unsafe { &*(val as *const JsValue as *const Self) }
333 }
334}
335
336impl From<Queue> for JsValue {
337 fn from(queue: Queue) -> Self {
338 JsValue::from(queue.0)
339 }
340}
341
342impl AsRef<JsValue> for Queue {
343 fn as_ref(&self) -> &JsValue {
344 &self.0
345 }
346}
347
348#[derive(Clone, Copy, Debug)]
349pub enum QueueContentType {
350 Json,
352 Text,
354 V8,
356}
357
358impl Serialize for QueueContentType {
359 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
360 where
361 S: serde::Serializer,
362 {
363 serializer.serialize_str(match self {
364 Self::Json => "json",
365 Self::Text => "text",
366 Self::V8 => "v8",
367 })
368 }
369}
370
371#[derive(Debug, Serialize)]
372#[serde(rename_all = "camelCase")]
373pub struct QueueSendOptions {
374 content_type: Option<QueueContentType>,
375 delay_seconds: Option<u32>,
376}
377
378#[derive(Debug)]
379pub struct MessageBuilder<T> {
380 message: T,
381 delay_seconds: Option<u32>,
382 content_type: QueueContentType,
383}
384
385impl<T: Serialize> MessageBuilder<T> {
386 pub fn new(message: T) -> Self {
388 Self {
389 message,
390 delay_seconds: None,
391 content_type: QueueContentType::Json,
392 }
393 }
394
395 #[must_use]
396 pub fn delay_seconds(mut self, delay_seconds: u32) -> Self {
398 self.delay_seconds = Some(delay_seconds);
399 self
400 }
401
402 #[must_use]
403 pub fn content_type(mut self, content_type: QueueContentType) -> Self {
406 self.content_type = content_type;
407 self
408 }
409
410 pub fn build(self) -> SendMessage<T> {
412 SendMessage {
413 message: self.message,
414 options: Some(QueueSendOptions {
415 content_type: Some(self.content_type),
416 delay_seconds: self.delay_seconds,
417 }),
418 }
419 }
420}
421
422#[derive(Debug)]
423pub struct RawMessageBuilder {
424 message: JsValue,
425 delay_seconds: Option<u32>,
426}
427
428impl RawMessageBuilder {
429 pub fn new(message: JsValue) -> Self {
431 Self {
432 message,
433 delay_seconds: None,
434 }
435 }
436
437 #[must_use]
438 pub fn delay_seconds(mut self, delay_seconds: u32) -> Self {
440 self.delay_seconds = Some(delay_seconds);
441 self
442 }
443
444 pub fn build_with_content_type(self, content_type: QueueContentType) -> SendMessage<JsValue> {
446 SendMessage {
447 message: self.message,
448 options: Some(QueueSendOptions {
449 content_type: Some(content_type),
450 delay_seconds: self.delay_seconds,
451 }),
452 }
453 }
454}
455
456#[derive(Debug)]
462pub struct SendMessage<T> {
463 message: T,
467
468 options: Option<QueueSendOptions>,
470}
471
472impl<T: Serialize> SendMessage<T> {
473 fn into_raw_send_message(self) -> Result<SendMessage<JsValue>> {
474 Ok(SendMessage {
475 message: serde_wasm_bindgen::to_value(&self.message)?,
476 options: self.options,
477 })
478 }
479}
480
481impl<T: Serialize> From<T> for SendMessage<T> {
482 fn from(message: T) -> Self {
483 Self {
484 message,
485 options: Some(QueueSendOptions {
486 content_type: Some(QueueContentType::Json),
487 delay_seconds: None,
488 }),
489 }
490 }
491}
492
493#[derive(Debug)]
494pub struct BatchSendMessage<T> {
495 body: Vec<SendMessage<T>>,
496 options: Option<QueueSendBatchOptions>,
497}
498
499#[derive(Debug, Serialize)]
500#[serde(rename_all = "camelCase")]
501pub struct QueueSendBatchOptions {
502 delay_seconds: Option<u32>,
503}
504
505#[derive(Debug)]
506pub struct BatchMessageBuilder<T> {
507 messages: Vec<SendMessage<T>>,
508 delay_seconds: Option<u32>,
509}
510
511impl<T> BatchMessageBuilder<T> {
512 pub fn new() -> Self {
514 Self {
515 messages: Vec::new(),
516 delay_seconds: None,
517 }
518 }
519
520 #[must_use]
521 pub fn message<U: Into<SendMessage<T>>>(mut self, message: U) -> Self {
523 self.messages.push(message.into());
524 self
525 }
526
527 #[must_use]
528 pub fn messages<U, V>(mut self, messages: U) -> Self
530 where
531 U: IntoIterator<Item = V>,
532 V: Into<SendMessage<T>>,
533 {
534 self.messages
535 .extend(messages.into_iter().map(std::convert::Into::into));
536 self
537 }
538
539 #[must_use]
540 pub fn delay_seconds(mut self, delay_seconds: u32) -> Self {
542 self.delay_seconds = Some(delay_seconds);
543 self
544 }
545
546 pub fn build(self) -> BatchSendMessage<T> {
547 BatchSendMessage {
548 body: self.messages,
549 options: self
550 .delay_seconds
551 .map(|delay_seconds| QueueSendBatchOptions {
552 delay_seconds: Some(delay_seconds),
553 }),
554 }
555 }
556}
557
558impl<T, U, V> From<U> for BatchSendMessage<T>
559where
560 U: IntoIterator<Item = V>,
561 V: Into<SendMessage<T>>,
562{
563 fn from(value: U) -> Self {
564 Self {
565 body: value.into_iter().map(std::convert::Into::into).collect(),
566 options: None,
567 }
568 }
569}
570
571impl<T: Serialize> BatchSendMessage<T> {
572 fn into_raw_batch_send_message(self) -> Result<BatchSendMessage<JsValue>> {
573 Ok(BatchSendMessage {
574 body: self
575 .body
576 .into_iter()
577 .map(SendMessage::into_raw_send_message)
578 .collect::<Result<_>>()?,
579 options: self.options,
580 })
581 }
582}
583
584impl Queue {
585 pub async fn send<T, U: Into<SendMessage<T>>>(&self, message: U) -> Result<()>
601 where
602 T: Serialize,
603 {
604 let message: SendMessage<T> = message.into();
605 let serialized_message = message.into_raw_send_message()?;
606 self.send_raw(serialized_message).await
607 }
608
609 pub async fn send_raw<T: Into<SendMessage<JsValue>>>(&self, message: T) -> Result<()> {
613 let message: SendMessage<JsValue> = message.into();
614 let options = match message.options {
615 Some(options) => serde_wasm_bindgen::to_value(&options)?,
616 None => JsValue::null(),
617 };
618
619 let fut: JsFuture = self.0.send(message.message, options)?.into();
620 fut.await.map_err(Error::from)?;
621 Ok(())
622 }
623
624 pub async fn send_batch<T: Serialize, U: Into<BatchSendMessage<T>>>(
640 &self,
641 messages: U,
642 ) -> Result<()> {
643 let messages: BatchSendMessage<T> = messages.into();
644 let serialized_messages = messages.into_raw_batch_send_message()?;
645 self.send_raw_batch(serialized_messages).await
646 }
647
648 pub async fn send_raw_batch<T: Into<BatchSendMessage<JsValue>>>(
654 &self,
655 messages: T,
656 ) -> Result<()> {
657 let messages: BatchSendMessage<JsValue> = messages.into();
658 let batch_send_options = serde_wasm_bindgen::to_value(&messages.options)?;
659
660 let messages = messages
661 .body
662 .into_iter()
663 .map(|message: SendMessage<JsValue>| {
664 let body = message.message;
665 let message_send_request = js_sys::Object::new();
666
667 js_sys::Reflect::set(&message_send_request, &"body".into(), &body)?;
668 js_sys::Reflect::set(
669 &message_send_request,
670 &"contentType".into(),
671 &serde_wasm_bindgen::to_value(
672 &message.options.as_ref().map(|o| o.content_type),
673 )?,
674 )?;
675 js_sys::Reflect::set(
676 &message_send_request,
677 &"delaySeconds".into(),
678 &serde_wasm_bindgen::to_value(
679 &message.options.as_ref().map(|o| o.delay_seconds),
680 )?,
681 )?;
682
683 Ok::<JsValue, Error>(message_send_request.into())
684 })
685 .collect::<Result<js_sys::Array>>()?;
686
687 let fut: JsFuture = self.0.send_batch(messages, batch_send_options)?.into();
688 fut.await.map_err(Error::from)?;
689 Ok(())
690 }
691}