Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.jboss.resteasy.spi.ResteasyConfiguration;
import org.jboss.resteasy.spi.ResteasyDeployment;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.quarkus.resteasy.runtime.NonJaxRsClassMappings;
import io.quarkus.resteasy.runtime.ResteasyVertxConfig;
import io.quarkus.runtime.LaunchMode;
Expand Down Expand Up @@ -61,8 +59,6 @@ public class ResteasyStandaloneRecorder {

static final String RESTEASY_URI_INFO = ResteasyUriInfo.class.getName();

private static boolean useDirect = true;

private static ResteasyDeployment deployment;
private static String contextPath;

Expand Down Expand Up @@ -98,15 +94,13 @@ public void run() {
}
}
});
useDirect = !isVirtual;
}

public Handler<RoutingContext> vertxRequestHandler(Supplier<Vertx> vertx, Executor executor,
Map<String, NonJaxRsClassMappings> nonJaxRsClassNameToMethodPaths) {
if (deployment != null) {
Handler<RoutingContext> handler = new VertxRequestHandler(vertx.get(), deployment, contextPath,
new ResteasyVertxAllocator(
runtimeConfig.responseBufferSize()),
runtimeConfig.responseBufferSize(),
executor,
httpRuntimeConfig.getValue().readTimeout().toMillis());

Expand Down Expand Up @@ -135,7 +129,7 @@ public Handler<RoutingContext> vertxFailureHandler(Supplier<Vertx> vertx, Execut
// allow customization of auth failures with exception mappers; this failure handler is only
// used when auth failed before RESTEasy Classic began processing the request
return new VertxRequestHandler(vertx.get(), deployment, contextPath,
new ResteasyVertxAllocator(runtimeConfig.responseBufferSize()), executor,
runtimeConfig.responseBufferSize(), executor,
httpRuntimeConfig.getValue().readTimeout().toMillis()) {

@Override
Expand Down Expand Up @@ -339,41 +333,4 @@ private Map<String, List<ResourceInvoker>> getBounded(Registry registry) {
return bounded;
}

private static class ResteasyVertxAllocator implements BufferAllocator {

private final int bufferSize;

private ResteasyVertxAllocator(int bufferSize) {
this.bufferSize = bufferSize;
}

@Override
public ByteBuf allocateBuffer() {
return allocateBuffer(useDirect);
}

@Override
public ByteBuf allocateBuffer(boolean direct) {
return allocateBuffer(direct, bufferSize);
}

@Override
public ByteBuf allocateBuffer(int bufferSize) {
return allocateBuffer(useDirect, bufferSize);
}

@Override
public ByteBuf allocateBuffer(boolean direct, int bufferSize) {
if (direct) {
return PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize);
} else {
return PooledByteBufAllocator.DEFAULT.heapBuffer(bufferSize);
}
}

@Override
public int getBufferSize() {
return bufferSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ public class VertxHttpResponse implements HttpResponse {
private final RoutingContext routingContext;

public VertxHttpResponse(HttpServerRequest request, ResteasyProviderFactory providerFactory,
final HttpMethod method, BufferAllocator allocator, VertxOutput output, RoutingContext routingContext) {
final HttpMethod method, int bufferSize, VertxOutput output, RoutingContext routingContext) {
this.routingContext = routingContext;
outputHeaders = new MultivaluedHashMap<String, Object>();
this.method = method;
os = (method == null || !method.equals(HttpMethod.HEAD)) ? new VertxOutputStream(this, allocator)
os = (method == null || !method.equals(HttpMethod.HEAD))
? new VertxOutputStream(this, bufferSize)
: null;
this.request = request;
this.response = request.response();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,26 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Function;

import jakarta.ws.rs.core.HttpHeaders;

import org.jboss.resteasy.spi.AsyncOutputStream;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.quarkus.vertx.utils.AppendBuffer;

public class VertxOutputStream extends AsyncOutputStream {

private final VertxHttpResponse response;
private final BufferAllocator allocator;
private ByteBuf pooledBuffer;
private final AppendBuffer appendBuffer;
private long written;
private final long contentLength;

private boolean closed;

/**
* Construct a new instance. No write timeout is configured.
*
*/
public VertxOutputStream(VertxHttpResponse response, BufferAllocator allocator) {
this.allocator = allocator;
public VertxOutputStream(VertxHttpResponse response, int bufferSize) {
this.appendBuffer = AppendBuffer.eager(bufferSize);
this.response = response;
Object length = response.getOutputHeaders().getFirst(HttpHeaders.CONTENT_LENGTH);
this.contentLength = length == null ? -1 : Long.parseLong(length.toString());
Expand Down Expand Up @@ -57,31 +52,19 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti
if (closed) {
throw new IOException("Stream is closed");
}

int rem = len;
int idx = off;
ByteBuf buffer = pooledBuffer;
try {
if (buffer == null) {
pooledBuffer = buffer = allocator.allocateBuffer();
}
while (rem > 0) {
int toWrite = Math.min(rem, buffer.writableBytes());
buffer.writeBytes(b, idx, toWrite);
rem -= toWrite;
idx += toWrite;
if (!buffer.isWritable()) {
ByteBuf tmpBuf = buffer;
this.pooledBuffer = buffer = allocator.allocateBuffer();
response.writeBlocking(tmpBuf, false);
appendBuffer.appendAndDrain(b, off, len, new AppendBuffer.DrainHandler() {
@Override
public void drain(ByteBuf buffer, boolean finished) throws IOException {
response.writeBlocking(buffer, finished);
}
}
});
} catch (Exception e) {
if (buffer != null && buffer.refCnt() > 0) {
buffer.release();
pooledBuffer = null;
closed = true;
ByteBuf leftover = appendBuffer.clear();
if (leftover != null) {
leftover.release();
}
closed = true;
throw new IOException(e);
}
updateWritten(len);
Expand All @@ -103,15 +86,11 @@ public void flush() throws IOException {
throw new IOException("Stream is closed");
}
try {
if (pooledBuffer != null) {
response.writeBlocking(pooledBuffer, false);
pooledBuffer = null;
ByteBuf buf = appendBuffer.clear();
if (buf != null) {
response.writeBlocking(buf, false);
}
} catch (Exception e) {
if (pooledBuffer != null) {
pooledBuffer.release();
pooledBuffer = null;
}
throw new IOException(e);
}
}
Expand All @@ -123,12 +102,11 @@ public void close() throws IOException {
if (closed)
return;
try {
response.writeBlocking(pooledBuffer, true);
response.writeBlocking(appendBuffer.clear(), true);
} catch (Exception e) {
throw new IOException(e);
} finally {
closed = true;
pooledBuffer = null;
}
}

Expand All @@ -143,10 +121,9 @@ private CompletionStage<Void> asyncFlush(boolean isLast) {
ret.completeExceptionally(new IOException("Stream is closed"));
return ret;
}
if (pooledBuffer != null) {
ByteBuf sentBuffer = pooledBuffer;
pooledBuffer = null;
return response.writeNonBlocking(sentBuffer, isLast);
ByteBuf buf = appendBuffer.clear();
if (buf != null) {
return response.writeNonBlocking(buf, isLast);
}
return CompletableFuture.completedFuture(null);
}
Expand All @@ -161,42 +138,30 @@ public CompletionStage<Void> asyncWrite(final byte[] b, final int off, final int
ret.completeExceptionally(new IOException("Stream is closed"));
return ret;
}

CompletableFuture<Void> ret = CompletableFuture.completedFuture(null);

ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(b, off, len);
if (pooledBuffer == null) {
pooledBuffer = allocator.allocateBuffer();
}
pooledBuffer.writeBytes(wrappedBuffer, Math.min(pooledBuffer.writableBytes(), wrappedBuffer.readableBytes()));
if (pooledBuffer.writableBytes() == 0) {
CompletableFuture<Void> cf = new CompletableFuture<>();
ret = cf;
ByteBuf filled = pooledBuffer;
pooledBuffer = null;
response.writeNonBlocking(filled, false).whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void unused, Throwable throwable) {
if (throwable != null) {
cf.completeExceptionally(throwable);
return;
return asyncDrain(b, off, len)
.thenCompose(new Function<Void, CompletionStage<Void>>() {
@Override
public CompletionStage<Void> apply(Void v) {
return VertxOutputStream.this.asyncUpdateWritten(len);
}
pooledBuffer = allocator.allocateBuffer();
pooledBuffer.writeBytes(wrappedBuffer,
Math.min(pooledBuffer.writableBytes(), wrappedBuffer.readableBytes()));

if (pooledBuffer.writableBytes() == 0) {
ByteBuf filled = pooledBuffer;
pooledBuffer = null;
response.writeNonBlocking(filled, false).whenComplete(this);
} else {
cf.complete(null);
}
}
});
}
});
}

return ret.thenCompose(v -> asyncUpdateWritten(len));
private CompletionStage<Void> asyncDrain(byte[] b, int off, int len) {
int written = appendBuffer.append(b, off, len);
if (written < len) {
ByteBuf buf = appendBuffer.clear();
int newOff = off + written;
int newLen = len - written;
return response.writeNonBlocking(buf, false)
.thenCompose(new Function<Void, CompletionStage<Void>>() {
@Override
public CompletionStage<Void> apply(Void v) {
return VertxOutputStream.this.asyncDrain(b, newOff, newLen);
}
});
}
return CompletableFuture.completedFuture(null);
}

CompletionStage<Void> asyncUpdateWritten(final long len) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
protected final Vertx vertx;
protected final RequestDispatcher dispatcher;
protected final String rootPath;
protected final BufferAllocator allocator;
protected final int bufferSize;
protected final CurrentIdentityAssociation association;
protected final CurrentVertxRequest currentVertxRequest;
protected final Executor executor;
Expand All @@ -58,12 +58,12 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
public VertxRequestHandler(Vertx vertx,
ResteasyDeployment deployment,
String rootPath,
BufferAllocator allocator, Executor executor, long readTimeout) {
int bufferSize, Executor executor, long readTimeout) {
this.vertx = vertx;
this.dispatcher = new RequestDispatcher((SynchronousDispatcher) deployment.getDispatcher(),
deployment.getProviderFactory(), null, Thread.currentThread().getContextClassLoader());
this.rootPath = rootPath;
this.allocator = allocator;
this.bufferSize = bufferSize;
this.executor = executor;
this.readTimeout = readTimeout;
this.customNotFoundExist = deployment.getProviderFactory()
Expand Down Expand Up @@ -134,7 +134,7 @@ private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput
ResteasyHttpHeaders headers = VertxUtil.extractHttpHeaders(request);
HttpServerResponse response = request.response();
VertxHttpResponse vertxResponse = new VertxHttpResponse(request, dispatcher.getProviderFactory(),
request.method(), allocator, output, routingContext);
request.method(), bufferSize, output, routingContext);

// using a supplier to make the remote Address resolution lazy: often it's not needed and it's not very cheap to create.
LazyHostSupplier hostSupplier = new LazyHostSupplier(request);
Expand Down
4 changes: 4 additions & 0 deletions extensions/vertx-http/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus.vertx.utils</groupId>
<artifactId>quarkus-vertx-utils</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web</artifactId>
Expand Down
Loading
Loading