3434import io .vertx .sqlclient .RowSet ;
3535import io .vertx .sqlclient .SqlConnection ;
3636import io .vertx .sqlclient .SqlResult ;
37- import io .vertx .sqlclient .Transaction ;
3837import io .vertx .sqlclient .Tuple ;
3938import io .vertx .sqlclient .spi .DatabaseMetadata ;
4039
40+ import static org .hibernate .reactive .util .impl .CompletionStages .failedFuture ;
4141import static org .hibernate .reactive .util .impl .CompletionStages .rethrow ;
42+ import static org .hibernate .reactive .util .impl .CompletionStages .supplyStage ;
4243import static org .hibernate .reactive .util .impl .CompletionStages .voidFuture ;
4344
4445/**
@@ -58,7 +59,10 @@ public class SqlClientConnection implements ReactiveConnection {
5859
5960 private final Pool pool ;
6061 private final SqlConnection connection ;
61- private Transaction transaction ;
62+
63+ // The close operation could be called multiple times if an error occurs,
64+ // if we execute it every time, we will have several useless messages in the log
65+ private boolean closed = false ;
6266
6367 SqlClientConnection (SqlConnection connection , Pool pool , SqlStatementLogger sqlStatementLogger , SqlExceptionHelper sqlExceptionHelper ) {
6468 this .pool = pool ;
@@ -285,43 +289,68 @@ private SqlConnection client() {
285289
286290 @ Override
287291 public CompletionStage <Void > beginTransaction () {
288- if ( transaction != null ) {
289- throw new IllegalStateException ( "Can't begin a new transaction as an active transaction is already associated to this connection" );
292+ if ( connection . transaction () != null ) {
293+ return failedFuture ( LOG . liveTransactionDetectedOnBeginTransaction () );
290294 }
291295 return connection .begin ()
292296 .onSuccess ( tx -> LOG .tracef ( "Transaction started: %s" , tx ) )
293- .onFailure ( v -> LOG .errorf ( "Failed to start a transaction: %s" , transaction ) )
297+ .onFailure ( throwable -> LOG .errorf ( "Failed to start a transaction: %s" , throwable . getMessage () ) )
294298 .toCompletionStage ()
295- .thenAccept ( this :: setTransaction );
299+ .thenCompose ( CompletionStages :: voidFuture );
296300 }
297301
298302 @ Override
299303 public CompletionStage <Void > commitTransaction () {
300- return transaction . commit ()
301- .onSuccess ( v -> LOG . tracef ( "Transaction committed: %s" , transaction ) )
302- .onFailure ( v -> LOG .errorf ( "Failed to commit transaction : %s" , transaction ) )
303- .toCompletionStage ( )
304- .whenComplete ( this :: clearTransaction );
304+ return connection . transaction ()
305+ .commit ( )
306+ .onSuccess ( v -> LOG .tracef ( "Transaction committed : %s" , connection . transaction () ) )
307+ .onFailure ( throwable -> LOG . errorf ( "Failed to commit transaction: %s" , throwable . getMessage () ) )
308+ .toCompletionStage ( );
305309 }
306310
307311 @ Override
308312 public CompletionStage <Void > rollbackTransaction () {
309- return transaction .rollback ()
310- .onFailure ( v -> LOG .errorf ( "Failed to rollback transaction: %s" , transaction ) )
311- .onSuccess ( v -> LOG .tracef ( "Transaction rolled back: %s" , transaction ) )
312- .toCompletionStage ()
313- .whenComplete ( this ::clearTransaction );
313+ if ( connection .transaction () != null ) {
314+ return connection .transaction ()
315+ .rollback ()
316+ .onFailure ( throwable -> LOG .errorf ( "Failed to rollback transaction: %s" , throwable .getMessage () ) )
317+ .onSuccess ( v -> LOG .tracef ( "Transaction rolled back: %s" , connection .transaction () ) )
318+ .toCompletionStage ();
319+ }
320+ LOG .trace ( "No transaction found to roll back" );
321+ return voidFuture ();
314322 }
315323
316324 @ Override
317325 public CompletionStage <Void > close () {
318- if ( transaction != null ) {
319- throw new IllegalStateException ( "Connection being closed with a live transaction associated to it" );
320- }
321- return connection .close ()
322- .onSuccess ( event -> LOG .tracef ( "Connection closed: %s" , connection ) )
323- .onFailure ( v -> LOG .errorf ( "Failed to close a connection: %s" , connection ) )
324- .toCompletionStage ();
326+ // We can probably skip the validation if the connection is already closed...but, you never know
327+ return validateNoTransactionInProgressOnClose ()
328+ .handle ( CompletionStages ::handle )
329+ .thenCompose ( validationHandler -> supplyStage ( () -> closed
330+ ? voidFuture ().thenAccept ( v -> LOG .trace ( "Connection already closed" ) )
331+ : connection .close ().toCompletionStage () )
332+ .handle ( CompletionStages ::handle )
333+ .thenCompose ( closeConnectionHandler -> {
334+ if ( closeConnectionHandler .hasFailed () ) {
335+ if ( validationHandler .hasFailed () ) {
336+ // Error closing the connection, include the validation error
337+ closeConnectionHandler .getThrowable ()
338+ .addSuppressed ( validationHandler .getThrowable () );
339+ }
340+ // Return a failed CompletionStage
341+ return closeConnectionHandler .getResultAsCompletionStage ();
342+ }
343+ if ( !closed ) {
344+ closed = true ;
345+ LOG .tracef ( "Connection closed: %s" , connection );
346+ }
347+ else {
348+ LOG .tracef ( "Connection was already closed: %s" , connection );
349+ }
350+ // Connection closed, return the result of the validation
351+ return validationHandler .getResultAsCompletionStage ();
352+ } )
353+ );
325354 }
326355
327356 @ SuppressWarnings ("unchecked" )
@@ -361,13 +390,24 @@ private static ResultSet getLastInsertedIdAsResultSet(RowSet<Row> rows, Class<?>
361390 return null ;
362391 }
363392
364- private void setTransaction (Transaction tx ) {
365- transaction = tx ;
366- }
367-
368- private void clearTransaction (Void v , Throwable x ) {
369- LOG .tracef ( "Clearing current transaction instance from connection: %s" , transaction );
370- transaction = null ;
393+ /**
394+ * If there's a transaction open, roll back it and return a failed CompletionStage.
395+ * The validation error is related to closing the connection.
396+ */
397+ private CompletionStage <Void > validateNoTransactionInProgressOnClose () {
398+ if ( connection .transaction () != null ) {
399+ return supplyStage ( this ::rollbackTransaction )
400+ .handle ( CompletionStages ::handle )
401+ .thenCompose ( rollbackHandler -> {
402+ final Throwable validationError = LOG .liveTransactionDetectedOnClose ();
403+ if ( rollbackHandler .hasFailed () ) {
404+ // Include the error that happened during rollback
405+ validationError .addSuppressed ( rollbackHandler .getThrowable () );
406+ }
407+ return failedFuture ( validationError );
408+ } );
409+ }
410+ return voidFuture ();
371411 }
372412
373413 private static class RowSetResult implements Result {
@@ -420,5 +460,4 @@ private static void translateNulls(Object[] paramValues) {
420460 }
421461 }
422462 }
423-
424463}
0 commit comments