-
Notifications
You must be signed in to change notification settings - Fork 36
Streaming support in Udash REST #1338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Since @mrzysztof is taking over this, I think @sebaciv could review. I'll take a look but won't have the time to be the primary reviewer here. |
rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala
Outdated
Show resolved
Hide resolved
@@ -129,6 +146,52 @@ trait RestApiTestScenarios extends RestApiTest { | |||
} | |||
} | |||
|
|||
class DirectRestApiTest extends RestApiTestScenarios { | |||
def clientHandle: HandleRequest = serverHandle | |||
// TODO streaming MORE tests: cancellation, timeouts, errors, errors after sending a few elements, custom format, slow source observable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't find tests for mid-stream Observable cancellation (both server and client side), is this corner case unhandled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tinkered with that, but in the end, I am not really sure what the expected outcome is here.
Suppose we cancel the subscription from the client. What do we want to test here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should do some microbenchmarking before shipping this, at least to have a sanity check whether we did not do anything too computationally expensive for this implementation to make sense in prod.
handleTimeout = handleTimeout, | ||
maxPayloadSize = maxPayloadSize, | ||
defaultStreamingBatchSize = DefaultStreamingBatchSize, | ||
) | ||
} | ||
|
||
class RestServlet( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How breaking are those changes to biggest internal clients?
case binary: StreamedBody.RawBinary => | ||
response.setContentType(binary.contentType) | ||
binary.content | ||
.consumeWith(Consumer.foreach { chunk => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason to use Consumer API instead of just .foreachL
in those cases?
} | ||
} else | ||
batch.foreach { e => | ||
response.getOutputStream.write(",".getBytes(jsonList.charset)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any charset where this isn't ','.toByte
?
import javax.servlet.http.{HttpServletRequest, HttpServletResponse} | ||
import scala.concurrent.Future | ||
|
||
class RestServletTest extends AnyFunSuite with ScalaFutures with Matchers with BeforeAndAfterEach with Eventually { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to do all this mocking when most (all?) of this could be tested end to end, as in other REST tests in udash-rest?
@GET | ||
def streamBinary(chunkSize: Int): Observable[Array[Byte]] = { | ||
val content = "HelloWorld".getBytes | ||
Observable.fromIterable(content.grouped(chunkSize).toSeq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this content too small to test chunks?
Utils.mergeArrays(binary.content).map(HttpBody.Binary(_, binary.contentType)) | ||
case jsonList: StreamedBody.JsonList => | ||
jsonList.elements | ||
.foldLeftL(new StringBuilder("[")) { case (sb, json) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StringBuilder
s are not safe for use with multiple threads. I don't think we're guaranteed it's the same thread here, only that it's one at a time.
import com.avsystem.commons.misc.{AbstractValueEnum, AbstractValueEnumCompanion, EnumCtx} | ||
import com.avsystem.commons.rpc._ | ||
import com.avsystem.commons.rpc.* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: in the future let's try to avoid reformatting unchanged files, the diff is huge anyway
StreamedBody.RawBinary(content = rawContentSubject) | ||
case Opt(HttpBody.JsonType) => | ||
val charset = contentTypeOpt.map(MimeTypes.getCharsetFromContentType).getOrElse(HttpBody.Utf8Charset) | ||
// suboptimal - maybe "online" parsing is possible using Jackson / other lib without waiting for full content ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should at least understand what is possible and make a decision here, as well as understand how bad this impacts perf.
publishSubject.subscription // wait for subscription | ||
.flatMapNow(_ => rawContentSubject.onNext(arr)) | ||
.mapNow { | ||
case Ack.Continue => demander.run() | ||
case Ack.Stop => () | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a discarded Future - if it's intended, it's not clear to me whether it's intended
1. The server does not specify a `Content-Length` header in the response | ||
2. The client detects the streaming nature of the response by the missing `Content-Length` | ||
3. Data is transferred incrementally in chunks as it becomes available | ||
4. The client processes each chunk as it arrives |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that even true given io/udash/rest/jetty/JettyRestClient.scala:95?
Enable returning monix.Observable in REST RPC traits and stream response when sending it over the wire.
State of this pull request (missing items):