Skip to content

Commit e3eab78

Browse files
authored
feat: use SSE in forc-publish for long-running uploads (#7178)
This pull request introduces changes to enhance the functionality of the `forc-publish` plugin, including improvements to error handling, support for streaming server responses, and updates to dependencies. The most significant changes include adding a new error variant, updating the `upload` method to handle streaming responses, and modifying dependencies to include `futures-util` and additional features for `reqwest`. ### Enhancements to Error Handling: * Added a new `ServerError` variant to the `Error` enum in `forc-plugins/forc-publish/src/error.rs` to handle server-related errors. ### Improvements to `upload` Method: * Updated the `upload` method in `forc-publish/src/forc_pub_client.rs` to process streaming server responses using `futures-util::StreamExt`. This includes handling chunked data, parsing JSON responses, and printing event updates. [[1]](diffhunk://#diff-c706a808124a73081593a5e34fb999aa1a446f97d97da1657ddb4b3c55d24371R43) [[2]](diffhunk://#diff-c706a808124a73081593a5e34fb999aa1a446f97d97da1657ddb4b3c55d24371L53-R97) ### Dependency Updates: * Added `futures-util` as a dependency in `Cargo.toml` and `forc-plugins/forc-publish/Cargo.toml` to support streaming operations. [[1]](diffhunk://#diff-2e9d962a08321605940b5a657135052fbcef87b5e360662bb527c96d9a615542R157) [[2]](diffhunk://#diff-2bd34a7b1fa0f1850ad25c6e080d410b2df98c16b6cd75445a138d04e617311aL16-R17) * Enabled the `stream` feature for `reqwest` in `forc-plugins/forc-publish/Cargo.toml` to facilitate streaming server responses. ### Minor Adjustments: * Added empty `println!` calls in `main.rs` to improve output formatting during error handling and publishing operations. [[1]](diffhunk://#diff-2c9e261f1e02238f33a9403387f40481b5b372475c0de3782146b614210675b3R32) [[2]](diffhunk://#diff-2c9e261f1e02238f33a9403387f40481b5b372475c0de3782146b614210675b3R52) ![May-16-2025 03-56-49](https://github.com/user-attachments/assets/f3fd0a36-a3e1-41df-9039-b0d489d9e3b5)
1 parent a428376 commit e3eab78

File tree

6 files changed

+93
-14
lines changed

6 files changed

+93
-14
lines changed

Cargo.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ filecheck = "0.5"
154154
flate2 = "1.0"
155155
fs_extra = "1.2"
156156
futures = { version = "0.3", default-features = false }
157+
futures-util = "0.3"
157158
gag = "1.0"
158159
gimli = "0.31"
159160
git2 = "0.19"

forc-plugins/forc-publish/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ clap = { workspace = true, features = ["derive", "env"] }
1313
flate2.workspace = true
1414
forc-tracing.workspace = true
1515
forc-util.workspace = true
16-
reqwest = { workspace = true, features = ["json"] }
16+
futures-util.workspace = true
17+
reqwest = { workspace = true, features = ["json", "stream"] }
1718
semver = { workspace = true, features = ["serde"] }
1819
serde = { workspace = true, features = ["derive"] }
1920
serde_json.workspace = true

forc-plugins/forc-publish/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ pub enum Error {
2828

2929
#[error("Forc.toml not found in the current directory")]
3030
ForcTomlNotFound,
31+
32+
#[error("Server error")]
33+
ServerError,
3134
}
3235

3336
#[derive(Deserialize)]

forc-plugins/forc-publish/src/forc_pub_client.rs

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::error::Error;
22
use crate::error::Result;
3+
use reqwest::StatusCode;
34
use semver::Version;
45
use serde::{Deserialize, Serialize};
56
use std::fs;
@@ -39,6 +40,8 @@ impl ForcPubClient {
3940

4041
/// Uploads the given file to the server
4142
pub async fn upload<P: AsRef<Path>>(&self, file_path: P, forc_version: &str) -> Result<Uuid> {
43+
use futures_util::StreamExt;
44+
use std::io::{stdout, Write};
4245
let url = self
4346
.uri
4447
.join(&format!("upload_project?forc_version={}", forc_version))?;
@@ -50,16 +53,55 @@ impl ForcPubClient {
5053
.header("Content-Type", "application/gzip")
5154
.body(file_bytes)
5255
.send()
53-
.await?;
54-
55-
let status = response.status();
56+
.await;
5657

57-
if status.is_success() {
58-
// Extract `upload_id` from the response if available
59-
let upload_response: UploadResponse = response.json().await?;
60-
Ok(upload_response.upload_id)
58+
if let Ok(response) = response {
59+
let mut stream = response.bytes_stream();
60+
61+
// Process the SSE stream.
62+
// The server sends events in the format: "data: <event>\n\n" or
63+
// ": <event>\n\n" for keep-alive events.
64+
// The first event is usually a progress event, and the last one contains the upload_id
65+
// or an error message. If the stream is open for more than 60 seconds, it will be closed
66+
// by the server, and we will return an HTTPError.
67+
while let Some(chunk) = stream.next().await {
68+
match chunk {
69+
Ok(bytes) => {
70+
let event_str = String::from_utf8_lossy(&bytes);
71+
for event in event_str.split("\n\n") {
72+
if let Some(stripped) = event.strip_prefix("data:") {
73+
let data = &stripped.trim();
74+
if let Ok(upload_response) =
75+
serde_json::from_str::<UploadResponse>(data)
76+
{
77+
return Ok(upload_response.upload_id);
78+
} else if data.starts_with("{") {
79+
// Attempt to parse error from JSON
80+
return Err(Error::ApiResponseError {
81+
status: StatusCode::INTERNAL_SERVER_ERROR,
82+
error: data.to_string(),
83+
});
84+
} else {
85+
// Print the event data, replacing the previous message.
86+
print!("\r\x1b[2K => {}", data);
87+
stdout().flush().unwrap();
88+
}
89+
}
90+
// else if event.starts_with(":") {
91+
// These are keep-alive events. Uncomment if you need to debug them.
92+
// println!("Keep-alive event: {}", event);
93+
// }
94+
}
95+
}
96+
Err(e) => {
97+
return Err(Error::HttpError(e));
98+
}
99+
}
100+
}
101+
Err(Error::ServerError)
61102
} else {
62-
Err(Error::from_response(response).await)
103+
eprintln!("Error during upload initiation: {:?}", response);
104+
Err(Error::ServerError)
63105
}
64106
}
65107

@@ -110,12 +152,22 @@ mod test {
110152
async fn test_upload_success() {
111153
let (client, mock_server) = get_mock_client_server().await;
112154
let upload_id = Uuid::new_v4();
113-
let success_response = serde_json::json!({ "upload_id": upload_id });
155+
156+
// Simulate SSE response with a progress event and a final upload_id event
157+
let sse_body = format!(
158+
"data: uploading...\n\n\
159+
data: {{\"upload_id\":\"{}\"}}\n\n",
160+
upload_id
161+
);
114162

115163
Mock::given(method("POST"))
116164
.and(path("/upload_project"))
117165
.and(query_param("forc_version", "0.66.5"))
118-
.respond_with(ResponseTemplate::new(200).set_body_json(&success_response))
166+
.respond_with(
167+
ResponseTemplate::new(200)
168+
.insert_header("Content-Type", "text/event-stream")
169+
.set_body_string(sse_body),
170+
)
119171
.mount(&mock_server)
120172
.await;
121173

@@ -133,11 +185,15 @@ mod test {
133185
async fn test_upload_server_error() {
134186
let (client, mock_server) = get_mock_client_server().await;
135187

188+
// Simulate SSE error event
189+
let sse_body = "data: {\"error\":\"Internal Server Error\"}\n\n";
190+
136191
Mock::given(method("POST"))
137192
.and(path("/upload_project"))
138193
.respond_with(
139-
ResponseTemplate::new(500)
140-
.set_body_json(serde_json::json!({ "error": "Internal Server Error" })),
194+
ResponseTemplate::new(200)
195+
.insert_header("Content-Type", "text/event-stream")
196+
.set_body_string(sse_body),
141197
)
142198
.mount(&mock_server)
143199
.await;
@@ -151,7 +207,7 @@ mod test {
151207
match result {
152208
Err(Error::ApiResponseError { status, error }) => {
153209
assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
154-
assert_eq!(error, "Internal Server Error");
210+
assert_eq!(error, "{\"error\":\"Internal Server Error\"}");
155211
}
156212
_ => panic!("Expected ApiResponseError"),
157213
}

forc-plugins/forc-publish/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ async fn main() {
2929
init_tracing_subscriber(TracingSubscriberOptions::default());
3030

3131
if let Err(err) = run().await {
32+
println!();
3233
println_error(&format!("{err}"));
3334
std::process::exit(1);
3435
}
@@ -48,6 +49,7 @@ async fn run() -> Result<()> {
4849
let upload_id = client.upload(file_path, forc_version).await?;
4950
let published = client.publish(upload_id, &auth_token).await?;
5051

52+
println!();
5153
println_action_green(
5254
"Published",
5355
&format!("{} {}", published.name, published.version),

0 commit comments

Comments
 (0)