diff --git a/Services/RequestServer/ConnectionFromClient.cpp b/Services/RequestServer/ConnectionFromClient.cpp index f9c73f5a22..daf09cdc3b 100644 --- a/Services/RequestServer/ConnectionFromClient.cpp +++ b/Services/RequestServer/ConnectionFromClient.cpp @@ -104,7 +104,6 @@ struct ConnectionFromClient::ActiveRequest { CURL* easy { nullptr }; Vector curl_string_lists; i32 request_id { 0 }; - RefPtr notifier; WeakPtr client; int writer_fd { 0 }; HTTP::HeaderMap headers; @@ -114,6 +113,9 @@ struct ConnectionFromClient::ActiveRequest { String url; Optional reason_phrase; ByteBuffer body; + AllocatingMemoryStream send_buffer; + NonnullRefPtr write_notifier; + bool done_fetching { false }; ActiveRequest(ConnectionFromClient& client, CURLM* multi, CURL* easy, i32 request_id, int writer_fd) : multi(multi) @@ -121,11 +123,53 @@ struct ConnectionFromClient::ActiveRequest { , request_id(request_id) , client(client) , writer_fd(writer_fd) + , write_notifier(Core::Notifier::construct(writer_fd, Core::NotificationType::Write)) { + write_notifier->set_enabled(false); + write_notifier->on_activation = [this] { + write_queued_bytes_without_blocking(); + }; + } + + void schedule_self_destruction() const + { + Core::deferred_invoke([this] { + if (client) + client->m_active_requests.remove(request_id); + }); + } + + void write_queued_bytes_without_blocking() + { + Vector bytes_to_send; + bytes_to_send.resize(send_buffer.used_buffer_size()); + send_buffer.peek_some(bytes_to_send); + auto result = Core::System::write(this->writer_fd, bytes_to_send); + if (result.is_error()) { + if (result.error().code() != EAGAIN) { + VERIFY_NOT_REACHED(); + } + write_notifier->set_enabled(true); + return; + } + + MUST(send_buffer.discard(result.value())); + write_notifier->set_enabled(!send_buffer.is_eof()); + if (send_buffer.is_eof() && done_fetching) + schedule_self_destruction(); + } + + void notify_about_fetching_completion() + { + done_fetching = true; + if (send_buffer.is_eof()) + schedule_self_destruction(); } ~ActiveRequest() { + VERIFY(send_buffer.is_eof()); + if (writer_fd > 0) MUST(Core::System::close(writer_fd)); @@ -187,30 +231,10 @@ size_t ConnectionFromClient::on_data_received(void* buffer, size_t size, size_t request->flush_headers_if_needed(); size_t total_size = size * nmemb; - - size_t remaining_length = total_size; - u8 const* remaining_data = static_cast(buffer); - while (remaining_length > 0) { - auto result = Core::System::write(request->writer_fd, { remaining_data, remaining_length }); - if (result.is_error()) { - if (result.error().code() != EAGAIN) { - dbgln("on_data_received: write failed: {}", result.error()); - VERIFY_NOT_REACHED(); - } - sched_yield(); - continue; - } - auto nwritten = result.value(); - if (nwritten == 0) { - dbgln("on_data_received: write returned 0"); - VERIFY_NOT_REACHED(); - } - remaining_data += nwritten; - remaining_length -= nwritten; - } - + ReadonlyBytes bytes { static_cast(buffer), total_size }; + MUST(request->send_buffer.write_some(bytes)); + request->write_queued_bytes_without_blocking(); request->downloaded_so_far += total_size; - return total_size; } @@ -663,7 +687,7 @@ void ConnectionFromClient::check_active_requests() async_request_finished(request->request_id, request->downloaded_so_far, timing_info, network_error); } - m_active_requests.remove(request->request_id); + request->notify_about_fetching_completion(); } }