Skip to content

Commit

Permalink
feat: allow sending non String payload with execute
Browse files Browse the repository at this point in the history
  • Loading branch information
loispostula committed Jul 24, 2024
1 parent 736ccc1 commit 9ea6373
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 61 deletions.
116 changes: 116 additions & 0 deletions src/body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use http_body_util::BodyExt;

use bytes::Bytes;
use http_body::Frame;
use snafu::{Backtrace, GenerateImplicitData};
use std::pin::Pin;
use std::task::{Context, Poll};

type BoxBody = http_body_util::combinators::BoxBody<Bytes, crate::Error>;
type BoxError = Box<dyn std::error::Error + Send + Sync>;

fn boxed<B>(body: B) -> BoxBody
where
B: http_body::Body<Data = Bytes> + Send + Sync + 'static,
B::Error: Into<BoxError>,
{
try_downcast(body).unwrap_or_else(|body| {
body.map_err(|e| crate::Error::Other {
source: e.into(),
backtrace: Backtrace::generate(),
})
.boxed()
})
}

fn try_downcast<T, K>(k: K) -> Result<T, K>
where
T: 'static,
K: Send + 'static,
{
let mut k = Some(k);
if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
Ok(k.take().unwrap())
} else {
Err(k.unwrap())
}
}

// Define octocrab Body
#[derive(Debug)]
pub struct OctoBody(BoxBody);

impl OctoBody {
/// Create a new `Body` that wraps another [`http_body::Body`].
pub fn new<B>(body: B) -> Self
where
B: http_body::Body<Data = Bytes> + Send + Sync + 'static,
B::Error: Into<BoxError>,
{
try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
}
/// Create an empty body.
pub fn empty() -> Self {
Self::new(http_body_util::Empty::new())
}
}

impl Default for OctoBody {
fn default() -> Self {
Self::empty()
}
}

// Implement standard Bodiesque casting
impl From<()> for OctoBody {
fn from(_: ()) -> Self {
Self::empty()
}
}

impl From<String> for OctoBody {
fn from(buf: String) -> Self {
Self::new(http_body_util::Full::from(buf))
}
}

impl From<Vec<u8>> for OctoBody {
fn from(buf: Vec<u8>) -> Self {
Self::new(http_body_util::Full::from(buf))
}
}

impl From<Bytes> for OctoBody {
fn from(buf: Bytes) -> Self {
Self::new(http_body_util::Full::from(buf))
}
}

impl From<&'static str> for OctoBody {
fn from(buf: &'static str) -> Self {
Self::new(http_body_util::Full::from(buf))
}
}

impl http_body::Body for OctoBody {
type Data = Bytes;
type Error = crate::Error;

#[inline]
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Pin::new(&mut self.0).poll_frame(cx)
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.0.size_hint()
}

#[inline]
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}
}
41 changes: 22 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
#![cfg_attr(test, recursion_limit = "512")]

mod api;
mod body;
mod error;
mod from_response;
mod page;
Expand All @@ -191,6 +192,7 @@ pub mod models;
pub mod params;
pub mod service;

use body::OctoBody;
use chrono::{DateTime, Utc};
use http::{HeaderMap, HeaderValue, Method, Uri};
use http_body_util::combinators::BoxBody;
Expand Down Expand Up @@ -422,7 +424,7 @@ impl<Config, Auth> OctocrabBuilder<NoSvc, Config, Auth, NotLayerReady> {

impl<Svc, Config, Auth, B> OctocrabBuilder<Svc, Config, Auth, LayerReady>
where
Svc: Service<Request<String>, Response = Response<B>> + Send + 'static,
Svc: Service<Request<OctoBody>, Response = Response<B>> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
Expand Down Expand Up @@ -467,7 +469,7 @@ impl<Svc, Auth, LayerState> OctocrabBuilder<Svc, NoConfig, Auth, LayerState> {

impl<Svc, B, LayerState> OctocrabBuilder<Svc, NoConfig, AuthState, LayerState>
where
Svc: Service<Request<String>, Response = Response<B>> + Send + 'static,
Svc: Service<Request<OctoBody>, Response = Response<B>> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + Sync + 'static,
Expand Down Expand Up @@ -584,8 +586,8 @@ impl OctocrabBuilder<NoSvc, DefaultOctocrabBuilderConfig, NoAuth, NotLayerReady>
#[cfg(feature = "retry")]
pub fn set_connector_retry_service<S>(
&self,
connector: hyper_util::client::legacy::Client<S, String>,
) -> Retry<RetryConfig, hyper_util::client::legacy::Client<S, String>> {
connector: hyper_util::client::legacy::Client<S, OctoBody>,
) -> Retry<RetryConfig, hyper_util::client::legacy::Client<S, OctoBody>> {
let retry_layer = RetryLayer::new(self.config.retry_config.clone());

retry_layer.layer(connector)
Expand All @@ -610,7 +612,7 @@ impl OctocrabBuilder<NoSvc, DefaultOctocrabBuilderConfig, NoAuth, NotLayerReady>
/// Build a [`Client`] instance with the current [`Service`] stack.
#[cfg(feature = "default-client")]
pub fn build(self) -> Result<Octocrab> {
let client: hyper_util::client::legacy::Client<_, String> = {
let client: hyper_util::client::legacy::Client<_, OctoBody> = {
#[cfg(all(not(feature = "opentls"), not(feature = "rustls")))]
let mut connector = hyper::client::conn::http1::HttpConnector::new();

Expand Down Expand Up @@ -646,7 +648,7 @@ impl OctocrabBuilder<NoSvc, DefaultOctocrabBuilderConfig, NoAuth, NotLayerReady>

#[cfg(feature = "tracing")]
let client = TraceLayer::new_for_http()
.make_span_with(|req: &Request<String>| {
.make_span_with(|req: &Request<OctoBody>| {
tracing::debug_span!(
"HTTP",
http.method = %req.method(),
Expand All @@ -657,7 +659,7 @@ impl OctocrabBuilder<NoSvc, DefaultOctocrabBuilderConfig, NoAuth, NotLayerReady>
otel.status_code = tracing::field::Empty,
)
})
.on_request(|_req: &Request<String>, _span: &Span| {
.on_request(|_req: &Request<OctoBody>, _span: &Span| {
tracing::debug!("requesting");
})
.on_response(
Expand Down Expand Up @@ -913,8 +915,8 @@ pub enum AuthState {
}

pub type OctocrabService = Buffer<
BoxService<http::Request<String>, http::Response<BoxBody<Bytes, Error>>, BoxError>,
http::Request<String>,
BoxService<http::Request<OctoBody>, http::Response<BoxBody<Bytes, Error>>, BoxError>,
http::Request<OctoBody>,
>;

/// The GitHub API client.
Expand Down Expand Up @@ -954,7 +956,7 @@ impl Octocrab {
/// Creates a new `Octocrab`.
fn new<S>(service: S, auth_state: AuthState) -> Self
where
S: Service<Request<String>, Response = Response<BoxBody<Bytes, crate::Error>>>
S: Service<Request<OctoBody>, Response = Response<BoxBody<Bytes, crate::Error>>>
+ Send
+ 'static,
S::Future: Send + 'static,
Expand Down Expand Up @@ -1363,7 +1365,7 @@ impl Octocrab {
&self,
mut builder: Builder,
body: Option<&B>,
) -> Result<http::Request<String>> {
) -> Result<http::Request<OctoBody>> {
// Since Octocrab doesn't require streamable bodies(aka, file upload) because it is serde::Serialize),
// we can just use String body, since it is both http_body::Body(required by Hyper::Client), and Clone(required by BoxService).

Expand All @@ -1372,14 +1374,14 @@ impl Octocrab {

if let Some(body) = body {
builder = builder.header(http::header::CONTENT_TYPE, "application/json");
let request = builder
.body(serde_json::to_string(body).context(SerdeSnafu)?)
.context(HttpSnafu)?;
let serialized = serde_json::to_string(body).context(SerdeSnafu)?;
let body: OctoBody = serialized.into();
let request = builder.body(body).context(HttpSnafu)?;
Ok(request)
} else {
Ok(builder
.header(http::header::CONTENT_LENGTH, "0")
.body(String::new())
.body(OctoBody::empty())
.context(HttpSnafu)?)
}
}
Expand Down Expand Up @@ -1442,7 +1444,7 @@ impl Octocrab {
.method(http::Method::POST)
.uri(uri);
let response = self
.send(request.body("{}".to_string()).context(HttpSnafu)?)
.send(request.body("{}".into()).context(HttpSnafu)?)
.await?;
let _status = response.status();

Expand Down Expand Up @@ -1470,7 +1472,7 @@ impl Octocrab {
/// Send the given request to the underlying service
pub async fn send(
&self,
request: Request<String>,
request: Request<OctoBody>,
) -> Result<http::Response<BoxBody<Bytes, crate::Error>>> {
let mut svc = self.client.clone();
let response: Response<BoxBody<Bytes, crate::Error>> = svc
Expand All @@ -1494,11 +1496,12 @@ impl Octocrab {
}

/// Execute the given `request` using octocrab's Client.
pub async fn execute(
pub(crate) async fn execute(
&self,
request: http::Request<String>,
request: http::Request<impl Into<OctoBody>>,
) -> Result<http::Response<BoxBody<Bytes, crate::Error>>> {
let (mut parts, body) = request.into_parts();
let body: OctoBody = body.into();
// Saved request that we can retry later if necessary
let auth_header: Option<HeaderValue> = match self.auth_state {
AuthState::None => None,
Expand Down
50 changes: 8 additions & 42 deletions src/service/middleware/retry.rs
Original file line number Diff line number Diff line change
@@ -1,67 +1,33 @@
use futures_util::future;
use http::{Request, Response};
use hyper_util::client::legacy::Error;
use tower::retry::Policy;

use crate::body::OctoBody;

#[derive(Clone)]
pub enum RetryConfig {
None,
Simple(usize),
}

impl<B> Policy<Request<String>, Response<B>, Error> for RetryConfig {
impl<B> Policy<Request<OctoBody>, Response<B>, Error> for RetryConfig {
type Future = futures_util::future::Ready<Self>;

fn retry(
&self,
_req: &Request<String>,
result: Result<&Response<B>, &Error>,
_req: &Request<OctoBody>,
_result: Result<&Response<B>, &Error>,
) -> Option<Self::Future> {
match self {
RetryConfig::None => None,
RetryConfig::Simple(count) => match result {
Ok(response) => {
if response.status().is_server_error() || response.status() == 429 {
if *count > 0 {
Some(future::ready(RetryConfig::Simple(count - 1)))
} else {
None
}
} else {
None
}
}
Err(_) => {
if *count > 0 {
Some(future::ready(RetryConfig::Simple(count - 1)))
} else {
None
}
}
},
RetryConfig::Simple(_count) => None,
}
}

fn clone_request(&self, req: &Request<String>) -> Option<Request<String>> {
fn clone_request(&self, _req: &Request<OctoBody>) -> Option<Request<OctoBody>> {
match self {
RetryConfig::None => None,
_ => {
// `Request` can't be cloned
let mut new_req = Request::builder()
.uri(req.uri())
.method(req.method())
.version(req.version());
for (name, value) in req.headers() {
new_req = new_req.header(name, value);
}

let body = req.body().clone();
let new_req = new_req.body(body).expect(
"This should never panic, as we are cloning a components from existing request",
);

Some(new_req)
}
_ => None,
}
}
}

0 comments on commit 9ea6373

Please sign in to comment.