1#![allow(clippy::type_complexity)]
2#![deny(missing_docs)]
3
4pub mod frame;
18pub mod invoke;
19pub mod serve;
20
21mod value;
22
23pub use frame::{
24 Accept, Decoder as FrameDecoder, Encoder as FrameEncoder, Frame, FrameRef, Server,
25};
26pub use invoke::{Invoke, InvokeExt};
27pub use send_future::SendFuture;
28pub use serve::{Serve, ServeExt};
29pub use value::*;
30
31#[cfg(any(target_family = "wasm", feature = "net"))]
32pub use frame::tcp;
33#[cfg(all(unix, feature = "net"))]
34pub use frame::unix;
35
36use core::mem;
37use core::pin::{pin, Pin};
38use core::task::{Context, Poll};
39
40use bytes::BytesMut;
41use tokio::io::{AsyncRead, ReadBuf};
42use tracing::trace;
43
44#[doc(hidden)]
49pub trait Captures<'a> {}
50
51impl<T: ?Sized> Captures<'_> for T {}
52
53pub trait Index<T> {
58 fn index(&self, path: &[usize]) -> anyhow::Result<T>;
60}
61
62pub struct Incoming<T> {
64 buffer: BytesMut,
65 inner: T,
66}
67
68impl<T: Index<T>> Index<Self> for Incoming<T> {
69 fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
70 let inner = self.inner.index(path)?;
71 Ok(Self {
72 buffer: BytesMut::default(),
73 inner,
74 })
75 }
76}
77
78impl<T: AsyncRead + Unpin> AsyncRead for Incoming<T> {
79 fn poll_read(
80 mut self: Pin<&mut Self>,
81 cx: &mut Context<'_>,
82 buf: &mut ReadBuf<'_>,
83 ) -> Poll<std::io::Result<()>> {
84 let cap = buf.remaining();
85 if cap == 0 {
86 trace!("attempt to read empty buffer");
87 return Poll::Ready(Ok(()));
88 }
89 if !self.buffer.is_empty() {
90 if self.buffer.len() > cap {
91 trace!(cap, len = self.buffer.len(), "reading part of buffer");
92 buf.put_slice(&self.buffer.split_to(cap));
93 } else {
94 trace!(cap, len = self.buffer.len(), "reading full buffer");
95 buf.put_slice(&mem::take(&mut self.buffer));
96 }
97 return Poll::Ready(Ok(()));
98 }
99 pin!(&mut self.inner).poll_read(cx, buf)
100 }
101}