Skip to content

Streaming support in Udash REST #1338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 26 commits into
base: master
Choose a base branch
from
Draft

Streaming support in Udash REST #1338

wants to merge 26 commits into from

Conversation

sebaciv
Copy link
Contributor

@sebaciv sebaciv commented Mar 27, 2025

Enable returning monix.Observable in REST RPC traits and stream response when sending it over the wire.

State of this pull request (missing items):

  • error / corner case handling
  • comprehensive unit tests
  • ScalaDocs
  • Examples for documentation
  • Sttp implementation integration (*optional?)
  • Json list streaming on client side (*optional)

@sebaciv sebaciv self-assigned this Mar 27, 2025
@sebaciv sebaciv requested a review from ddworak March 27, 2025 16:07
@ddworak
Copy link
Member

ddworak commented Apr 2, 2025

Since @mrzysztof is taking over this, I think @sebaciv could review. I'll take a look but won't have the time to be the primary reviewer here.

@ddworak ddworak assigned mrzysztof and unassigned sebaciv Apr 2, 2025
@@ -129,6 +146,52 @@ trait RestApiTestScenarios extends RestApiTest {
}
}

class DirectRestApiTest extends RestApiTestScenarios {
def clientHandle: HandleRequest = serverHandle
// TODO streaming MORE tests: cancellation, timeouts, errors, errors after sending a few elements, custom format, slow source observable
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find tests for mid-stream Observable cancellation (both server and client side), is this corner case unhandled?

Copy link

@mrzysztof mrzysztof Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tinkered with that, but in the end, I am not really sure what the expected outcome is here.
Suppose we cancel the subscription from the client. What do we want to test here?

@sebaciv sebaciv assigned ddworak and unassigned mrzysztof Apr 25, 2025
Copy link
Member

@ddworak ddworak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do some microbenchmarking before shipping this, at least to have a sanity check whether we did not do anything too computationally expensive for this implementation to make sense in prod.

handleTimeout = handleTimeout,
maxPayloadSize = maxPayloadSize,
defaultStreamingBatchSize = DefaultStreamingBatchSize,
)
}

class RestServlet(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How breaking are those changes to biggest internal clients?

case binary: StreamedBody.RawBinary =>
response.setContentType(binary.contentType)
binary.content
.consumeWith(Consumer.foreach { chunk =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to use Consumer API instead of just .foreachL in those cases?

}
} else
batch.foreach { e =>
response.getOutputStream.write(",".getBytes(jsonList.charset))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any charset where this isn't ','.toByte?

import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import scala.concurrent.Future

class RestServletTest extends AnyFunSuite with ScalaFutures with Matchers with BeforeAndAfterEach with Eventually {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to do all this mocking when most (all?) of this could be tested end to end, as in other REST tests in udash-rest?

@GET
def streamBinary(chunkSize: Int): Observable[Array[Byte]] = {
val content = "HelloWorld".getBytes
Observable.fromIterable(content.grouped(chunkSize).toSeq)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this content too small to test chunks?

Utils.mergeArrays(binary.content).map(HttpBody.Binary(_, binary.contentType))
case jsonList: StreamedBody.JsonList =>
jsonList.elements
.foldLeftL(new StringBuilder("[")) { case (sb, json) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringBuilders are not safe for use with multiple threads. I don't think we're guaranteed it's the same thread here, only that it's one at a time.

import com.avsystem.commons.misc.{AbstractValueEnum, AbstractValueEnumCompanion, EnumCtx}
import com.avsystem.commons.rpc._
import com.avsystem.commons.rpc.*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in the future let's try to avoid reformatting unchanged files, the diff is huge anyway

StreamedBody.RawBinary(content = rawContentSubject)
case Opt(HttpBody.JsonType) =>
val charset = contentTypeOpt.map(MimeTypes.getCharsetFromContentType).getOrElse(HttpBody.Utf8Charset)
// suboptimal - maybe "online" parsing is possible using Jackson / other lib without waiting for full content ?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should at least understand what is possible and make a decision here, as well as understand how bad this impacts perf.

Comment on lines +142 to +147
publishSubject.subscription // wait for subscription
.flatMapNow(_ => rawContentSubject.onNext(arr))
.mapNow {
case Ack.Continue => demander.run()
case Ack.Stop => ()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a discarded Future - if it's intended, it's not clear to me whether it's intended

1. The server does not specify a `Content-Length` header in the response
2. The client detects the streaming nature of the response by the missing `Content-Length`
3. Data is transferred incrementally in chunks as it becomes available
4. The client processes each chunk as it arrives
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that even true given io/udash/rest/jetty/JettyRestClient.scala:95?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants