wrpc_transport/
lib.rs

1#![allow(clippy::type_complexity)]
2#![deny(missing_docs)]
3
4//! wRPC transport abstractions, codec and framing
5//!
6//! wRPC is an RPC framework based on [WIT](http://component-model.bytecodealliance.org/design/wit.html).
7//! It follows client-server model, where peers (servers) may serve function and method calls invoked by the other peers (clients).
8//!
9//! The two main abstractions on top of which wRPC is built are:
10//! - [Invoke] - the client-side handle to a wRPC transport, allowing clients to *invoke* WIT functions over wRPC transport
11//! - [Serve] - the server-side handle to a wRPC transport, allowing servers to *serve* WIT functions over wRPC transport
12//!
13//! Implementations of [Invoke] and [Serve] define transport-specific, multiplexed bidirectional byte stream types:
14//! - [`Invoke::Incoming`] and [`Serve::Incoming`] represent the stream *incoming* from a peer.
15//! - [`Invoke::Outgoing`] and [`Serve::Outgoing`] represent the stream *outgoing* to a peer.
16
17pub mod frame;
18pub mod invoke;
19pub mod serve;
20
21mod value;
22
23pub use frame::{
24    Accept, Decoder as FrameDecoder, Encoder as FrameEncoder, Frame, FrameRef, Server,
25};
26pub use invoke::{Invoke, InvokeExt};
27pub use send_future::SendFuture;
28pub use serve::{Serve, ServeExt};
29pub use value::*;
30
31#[cfg(any(target_family = "wasm", feature = "net"))]
32pub use frame::tcp;
33#[cfg(all(unix, feature = "net"))]
34pub use frame::unix;
35
36use core::mem;
37use core::pin::{pin, Pin};
38use core::task::{Context, Poll};
39
40use bytes::BytesMut;
41use tokio::io::{AsyncRead, ReadBuf};
42use tracing::trace;
43
44/// Internal workaround trait
45///
46/// This is an internal trait used as a workaround for
47/// http://github.com/rust-lang/rust/issues/63033
48#[doc(hidden)]
49pub trait Captures<'a> {}
50
51impl<T: ?Sized> Captures<'_> for T {}
52
53/// Multiplexes streams
54///
55/// Implementations of this trait define multiplexing for underlying connections
56/// using a particular structural `path`
57pub trait Index<T> {
58    /// Index the entity using a structural `path`
59    fn index(&self, path: &[usize]) -> anyhow::Result<T>;
60}
61
62/// Buffered incoming stream used for decoding values
63pub struct Incoming<T> {
64    buffer: BytesMut,
65    inner: T,
66}
67
68impl<T: Index<T>> Index<Self> for Incoming<T> {
69    fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
70        let inner = self.inner.index(path)?;
71        Ok(Self {
72            buffer: BytesMut::default(),
73            inner,
74        })
75    }
76}
77
78impl<T: AsyncRead + Unpin> AsyncRead for Incoming<T> {
79    fn poll_read(
80        mut self: Pin<&mut Self>,
81        cx: &mut Context<'_>,
82        buf: &mut ReadBuf<'_>,
83    ) -> Poll<std::io::Result<()>> {
84        let cap = buf.remaining();
85        if cap == 0 {
86            trace!("attempt to read empty buffer");
87            return Poll::Ready(Ok(()));
88        }
89        if !self.buffer.is_empty() {
90            if self.buffer.len() > cap {
91                trace!(cap, len = self.buffer.len(), "reading part of buffer");
92                buf.put_slice(&self.buffer.split_to(cap));
93            } else {
94                trace!(cap, len = self.buffer.len(), "reading full buffer");
95                buf.put_slice(&mem::take(&mut self.buffer));
96            }
97            return Poll::Ready(Ok(()));
98        }
99        pin!(&mut self.inner).poll_read(cx, buf)
100    }
101}