Spring WebFlux 3: Comunicación, Datos y Errores Reactivos
- Mauricio ECR
- Arquitectura
- 24 May, 2025
¡Continuemos nuestro viaje por el fascinante mundo de Spring WebFlux!
En la Parte 1, sentamos las bases de la programación reactiva y exploramos Project Reactor, el corazón de WebFlux. En la Parte 2, nos adentramos en la arquitectura de WebFlux y aprendimos a construir endpoints utilizando tanto anotaciones como el enfoque funcional.
Ahora, en esta Parte 3, nos enfocaremos en cómo las aplicaciones WebFlux interactúan con el mundo exterior: cómo consumen otros servicios de manera reactiva, cómo persisten y recuperan datos en bases de datos reactivas, y, crucialmente, cómo gestionamos los errores que inevitablemente surgen en estos flujos asíncronos.
Comunicación con Servicios Externos (WebClient)
En el ecosistema de microservicios actual, es muy común que nuestras aplicaciones necesiten consumir APIs externas. Spring WebFlux nos proporciona una herramienta poderosa y reactiva para esto: WebClient. Es la contraparte no bloqueante de RestTemplate y la forma recomendada de hacer llamadas HTTP en un contexto reactivo.
WebClient
WebClient es un cliente HTTP no bloqueante que forma parte del módulo spring-webflux. Está diseñado para aprovechar la pila reactiva de principio a fin, lo que significa que no bloqueará hilos mientras espera respuestas de servicios externos, maximizando la eficiencia de tu aplicación WebFlux.
Su API es fluida y declarativa, similar a la forma en que construyes flujos con Mono y Flux.
Configuración Básica:
Puedes configurar WebClient de diversas maneras. La forma más común es inyectarlo como un bean en tu clase, o construir una instancia en línea. Puedes especificar una URL base, encabezados comunes, timeouts, filtros y más.
// Configuración como Bean (ejemplo en una clase @Configuration)
@Configuration
public class WebClientConfig {
@Bean
public WebClient externalApiClient(WebClient.Builder webClientBuilder) {
return webClientBuilder
.baseUrl("https://api.example.com") // URL base para todas las peticiones
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) // Encabezado por defecto
.clientConnector(new ReactorClientHttpConnector(
HttpClient.create().responseTimeout(Duration.ofSeconds(5)) // Timeout de 5 segundos
))
.build();
}
}
Consumo de Respuestas Reactivas:
Después de definir la petición (GET, POST, PUT, DELETE, etc.), usas métodos como:
.retrieve(): Inicia la recuperación de la respuesta..bodyToMono(Class<T> type): Convierte el cuerpo de la respuesta en unMonode un objeto de tipoT. Útil cuando esperas una única respuesta (ej., un objeto JSON)..bodyToFlux(Class<T> type): Convierte el cuerpo de la respuesta en unFluxde objetos de tipoT. Útil para listas o streams de datos (ej., una lista de objetos JSON)..bodyToMono(ParameterizedTypeReference<T> typeRef)/.bodyToFlux(ParameterizedTypeReference<T> typeRef): Útil para tipos genéricos (ej.,List<MyObject>)..toEntity(Class<T> type)/.toEntityList(Class<T> type)/.toEntityFlux(Class<T> type): Devuelve unMono<ResponseEntity<T>>oMono<ResponseEntity<List<T>>>para acceder a la respuesta completa (estado HTTP, cabeceras, cuerpo).
Casos Típicos/Práctica
-
Llamada GET a un servicio externo y procesar la respuesta reactivamente:
Asumiendo que
externalApiClientes unWebClientbean inyectado.public Mono<MyObject> getObjectById(String id) { return externalApiClient.get() // Inicia una petición GET .uri("/objects/{id}", id) // Define la URI con PathVariable .retrieve() // Recupera la respuesta .bodyToMono(MyObject.class); // Convierte el cuerpo a Mono<MyObject> } -
Llamada POST enviando un
Mono<?>como body:public Mono<MyObject> createObject(Mono<MyObject> newObjectMono) { return externalApiClient.post() // Inicia una petición POST .uri("/objects") .body(newObjectMono, MyObject.class) // Envía el Mono<MyObject> como cuerpo .retrieve() .bodyToMono(MyObject.class); // Espera la respuesta como Mono<MyObject> } -
Manejar múltiples llamadas a servicios externos en paralelo (
Mono.zip,Flux.merge,flatMap):-
Mono.zip: Combina los resultados de múltiplesMonos (oFluxs que emiten un solo elemento) en un soloMonoque contiene una tupla de sus resultados. Las operaciones se ejecutan en paralelo. Ideal para combinar resultados de diferentes tipos que son necesarios simultáneamente.public Mono<CombinedData> getCombinedData(String id) { Mono<User> userMono = externalApiClient.get().uri("/users/{id}", id).retrieve().bodyToMono(User.class); Mono<Order> orderMono = externalApiClient.get().uri("/orders/{id}", id).retrieve().bodyToMono(Order.class); return Mono.zip(userMono, orderMono, (user, order) -> { // Aquí se combinan los resultados cuando ambos Monos han completado return new CombinedData(user, order); }); } -
Flux.merge: Combina múltiplesPublishers (Mono o Flux) en un únicoFlux, entrelazando sus elementos tan pronto como son emitidos. Las operaciones se ejecutan en paralelo, y el orden de los elementos resultantes no está garantizado.public Flux<Item> getItemsFromMultipleSources() { Flux<Item> source1 = externalApiClient.get().uri("/items/source1").retrieve().bodyToFlux(Item.class); Flux<Item> source2 = externalApiClient.get().uri("/items/source2").retrieve().bodyToFlux(Item.class); return Flux.merge(source1, source2); // Los ítems de source1 y source2 se entrelazan } -
flatMap: (Ya cubierto en Parte 1, pero clave aquí) Úsalo cuando la transformación de un elemento inicial te lleva a realizar otra operación asíncrona que devuelve unMonooFlux. Permite encadenar operaciones secuenciales asíncronas.public Mono<OrderDetail> getOrderDetails(String orderId) { return externalApiClient.get().uri("/orders/{id}", orderId).retrieve().bodyToMono(Order.class) // 1. Obtener la orden .flatMap(order -> externalApiClient .get() .uri("/products/{id}", order.getProductId()).retrieve().bodyToMono(Product.class) // 2. Obtener el producto de la orden .map(product -> new OrderDetail(order, product))); // 3. Combinar y devolver OrderDetail }
-
-
Manejar errores de un servicio externo llamado con WebClient:
WebClientlanzaWebClientResponseException(o subclases comoWebClientResponseException.NotFound) si la respuesta HTTP es un error (4xx, 5xx). Puedes usar operadores de manejo de errores de Reactor comoonErrorResumeoonErrorReturn.public Mono<MyObject> getObjectByIdHandlingError(String id) { return externalApiClient.get() .uri("/objects/{id}", id) .retrieve() .onStatus(HttpStatus.NOT_FOUND::equals, // Si el estado es 404 response -> Mono.error(new MyCustomNotFoundException("Object not found: " + id))) // Mapea a una excepción personalizada .onStatus(HttpStatus::is5xxServerError, // Si es un error 5xx response -> Mono.error(new RuntimeException("External service error"))) // Mapea a otra excepción .bodyToMono(MyObject.class) .onErrorResume(MyCustomNotFoundException.class, e -> { // Si es MyCustomNotFoundException, devuelve un Mono.empty() o un default System.err.println("Handling not found: " + e.getMessage()); return Mono.empty(); // O Mono.just(new MyObject("Default object")); }) .onErrorReturn(RuntimeException.class, new MyObject("Error occurred, returning default")); // Si es RuntimeException, devuelve un objeto por defecto }
Manejo de Datos Reactivos
Una aplicación reactiva es más eficiente si toda su pila es no bloqueante, y esto incluye la capa de persistencia de datos. Acceder a bases de datos de forma reactiva es crucial para evitar cuellos de botella por I/O bloqueante.
Integración de WebFlux con Bases de Datos Reactivas
Para bases de datos relacionales, la API estándar para acceso reactivo es R2DBC (Reactive Relational Database Connectivity). Es el equivalente reactivo de JDBC, pero diseñado desde cero para ser no bloqueante y asíncrono. Spring Data ha adoptado R2DBC, proporcionando integraciones para bases de datos como PostgreSQL, H2, MySQL (con driver de terceros) y SQL Server.
Para bases de datos NoSQL, muchos de los drivers ya están diseñados para ser reactivos. Por ejemplo, Spring Data tiene módulos reactivos para:
- MongoDB:
spring-data-mongodb-reactive - Cassandra:
spring-data-cassandra-reactive - Redis:
spring-data-redis-reactive
Repositorios Reactivos:
Spring Data extiende sus interfaces de repositorio para el contexto reactivo. En lugar de extender CrudRepository, extiendes interfaces como ReactiveCrudRepository, ReactiveMongoRepository, ReactiveCassandraRepository, etc. Los métodos de estas interfaces devuelven Mono<?> o Flux<?>.
Casos Típicos/Práctica
Asumiendo una entidad User y un repositorio UserRepository que extiende ReactiveCrudRepository<User, Long> (para R2DBC) o ReactiveMongoRepository<User, String> (para MongoDB).
-
Guardar (
save):// En un servicio @Autowired private UserRepository userRepository; public Mono<User> saveUser(User user) { return userRepository.save(user); // Devuelve Mono<User> } -
Encontrar por ID (
findById):public Mono<User> findUserById(Long id) { return userRepository.findById(id); // Devuelve Mono<User> } -
Encontrar todos (
findAll):public Flux<User> findAllUsers() { return userRepository.findAll(); // Devuelve Flux<User> } -
Manejo de Transacciones en un Contexto Reactivo: Este es un tema un poco más avanzado y complejo. En un contexto bloqueante, las transacciones se manejan con
@Transactional, que delega a unThreadLocal. Sin embargo, losThreadLocalno funcionan en un contexto reactivo porque los elementos pueden pasar por diferentes hilos en diferentes momentos.Para transacciones reactivas, Spring Data proporciona la anotación
@Transactionalen combinación con la infraestructura de transacciones reactivas de Spring (por ejemplo,ReactiveTransactionManagerpara R2DBC). Cuando usas@Transactionalen un método reactivo, Spring se asegura de que todas las operaciones reactivas dentro de ese método (que interactúan con la misma base de datos) se ejecuten dentro de la misma transacción.Es importante entender que una transacción se “adjunta” al
MonooFluxque se crea, no al hilo. Es decir, las operaciones dentro del flujo reactivo, si son parte de la misma transacción, se aseguran de comprometerse o revertirse juntas.@Service public class UserServiceImpl implements UserService { @Autowired private UserRepository userRepository; @Transactional // Esta anotación ahora trabaja con ReactiveTransactionManager public Mono<User> createUserAndAudit(User user) { return userRepository.save(user) // Guarda el usuario .flatMap(savedUser -> { // Simula una operación de auditoría que debe ser parte de la misma transacción // Si AuditRepository fuera reactivo y manejara transacciones. // return auditRepository.save(new AuditLog(savedUser.getId(), "User created")); System.out.println("User saved, attempting audit for: " + savedUser.getUsername()); return Mono.just(savedUser); // Devolver el usuario guardado }) .doOnError(e -> System.err.println("Transaction rolled back due to: " + e.getMessage())); // Manejo de error de transacción } }El desafío es que todas las operaciones dentro de la transacción deben ser reactivas y deben usar la misma conexión transaccional. Es un área donde la depuración puede ser más compleja que con las transacciones síncronas.
Manejo de Errores en Streams Reactivos
El manejo de errores es crucial en cualquier aplicación, y en los flujos reactivos tiene sus propias particularidades. Como ya mencionamos, cuando un error es emitido (onError), la secuencia se termina. Para evitar que toda la aplicación se caiga o para proporcionar una recuperación elegante, Reactor ofrece operadores específicos.
Operadores de Manejo de Errores
-
onErrorReturn(T fallbackValue): Cuando elPublisheremite un error, este operador intercepta el error, emite un valor de respaldo (fallbackValue), y luego completa la secuencia normalmente (onComplete). El error original es consumido.// Si ocurre un error, devuelve el valor por defecto "Default Message" Mono.error(new RuntimeException("Simulated error")) .onErrorReturn("Default Message") .subscribe(System.out::println, System.err::println); // Imprime "Default Message" -
onErrorResume(Function<Throwable, Mono<T>> fallbackMonoProvider): Si ocurre un error, este operador intercepta el error y cambia a unPublisheralternativo (fallbackMonoProvider). Es útil cuando necesitas ejecutar una lógica asíncrona para recuperarte del error.// Si ocurre un error, cambia a un Mono que simula una recuperación Mono.error(new RuntimeException("Simulated error")) .onErrorResume(e -> { System.err.println("Error caught, resuming with alternative: " + e.getMessage()); return Mono.just("Recovered from error!"); }) .subscribe(System.out::println, System.err::println); // Imprime "Recovered from error!" -
onErrorMap(Function<Throwable, Throwable> errorMapper): Transforma un tipo de excepción en otro. Esto es útil para encapsular excepciones internas en excepciones más significativas para tu dominio de negocio.// Transforma RuntimeException en CustomBusinessException Mono.error(new RuntimeException("Database error")) .onErrorMap(RuntimeException.class, e -> new MyCustomBusinessException("Failed to process data: " + e.getMessage())) .subscribe(System.out::println, System.err::println); // Lanza MyCustomBusinessException -
doOnError(Consumer<Throwable> errorConsumer): Ejecuta una acción de efecto secundario cuando un error ocurre, pero no consume el error. El error continúa propagándose por el stream. Útil para logging o métricas sin alterar el flujo de error.// Logea el error, pero el error sigue propagándose Mono.error(new RuntimeException("Another simulated error")) .doOnError(e -> System.err.println("Logging error before propagation: " + e.getMessage())) .subscribe(System.out::println, System.err::println); // Imprime el log y luego lanza RuntimeException -
retry(long numRetries)/retryWhen(Function<Flux<Throwable>, Publisher<?>> retrySignal): Intenta re-suscribirse alPublisheroriginal un número de veces o bajo ciertas condiciones.
Manejo Global de Errores en WebFlux (ErrorWebExceptionHandler)
Para centralizar el manejo de errores y proporcionar respuestas HTTP consistentes (ej. JSON con un formato de error estándar), WebFlux proporciona la interfaz ErrorWebExceptionHandler. Puedes implementar esta interfaz y registrarla como un bean para manejar todas las excepciones no capturadas por los operadores en tus flujos.
@Component
@Order(-1) // Asegura que este handler sea el primero en la cadena
public class GlobalErrorWebExceptionHandler implements ErrorWebExceptionHandler {
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
HttpStatus status;
String errorMessage;
if (ex instanceof MyCustomNotFoundException) {
status = HttpStatus.NOT_FOUND;
errorMessage = ex.getMessage();
} else if (ex instanceof IllegalArgumentException) {
status = HttpStatus.BAD_REQUEST;
errorMessage = "Invalid input: " + ex.getMessage();
} else {
status = HttpStatus.INTERNAL_SERVER_ERROR;
errorMessage = "An unexpected error occurred: " + ex.getMessage();
// Considerar logear la excepción aquí
}
// Construir la respuesta de error JSON
ErrorResponse errorResponse = new ErrorResponse(status.value(), errorMessage);
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
DataBuffer buffer = bufferFactory.wrap(toJson(errorResponse).getBytes()); // Convierte el objeto a JSON
exchange.getResponse().setStatusCode(status);
exchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON);
return exchange.getResponse().writeWith(Mono.just(buffer));
}
private String toJson(Object obj) {
// Implementa la lógica para convertir el objeto a JSON (ej. con ObjectMapper de Jackson)
try {
return new ObjectMapper().writeValueAsString(obj);
} catch (JsonProcessingException e) {
return "{\"status\":500, \"message\":\"Error converting error response to JSON\"}";
}
}
// Clase auxiliar para la respuesta de error
private static class ErrorResponse {
public int status;
public String message;
public ErrorResponse(int status, String message) { this.status = status; this.message = message; }
}
}
Casos Típicos/Práctica
-
Manejo de un error específico dentro de una cadena de operadores: Supongamos un servicio que busca un usuario, pero puede lanzar
UserNotFoundExceptionsi no lo encuentra.public Mono<User> getUserProfile(String userId) { return userRepository.findById(userId) // Simula buscar en DB .switchIfEmpty(Mono.error(new UserNotFoundException("User not found with ID: " + userId))) // Si Mono.empty(), lanza excepción .onErrorResume(UserNotFoundException.class, e -> { System.err.println("Handled specific UserNotFoundException: " + e.getMessage()); return Mono.just(new User("defaultUser", "Default User")); // Devuelve un usuario por defecto }); } -
Centralizar el manejo de errores para devolver respuestas HTTP consistentes: Como se mostró en el ejemplo de
GlobalErrorWebExceptionHandlerarriba.- 404 Not Found: Mapear
MyCustomNotFoundExceptionaHttpStatus.NOT_FOUND. - 500 Internal Server Error: Para excepciones inesperadas, mapear a
HttpStatus.INTERNAL_SERVER_ERROR. - 400 Bad Request: Para errores de validación o entrada incorrecta, mapear a
HttpStatus.BAD_REQUEST.
El
GlobalErrorWebExceptionHandleres el lugar ideal para definir el formato JSON estándar de tus mensajes de error y sus códigos de estado HTTP asociados, asegurando que todos los errores que atraviesan tu aplicación sean presentados de manera uniforme al cliente. - 404 Not Found: Mapear
Conclusión
En esta tercera entrega, hemos cubierto pilares fundamentales para construir aplicaciones WebFlux robustas: la comunicación reactiva con servicios externos utilizando WebClient, la persistencia de datos con bases de datos reactivas a través de Spring Data R2DBC o drivers NoSQL, y el vital manejo de errores en los flujos reactivos, tanto a nivel de operador como de forma global con ErrorWebExceptionHandler.
Estos conocimientos son esenciales para construir aplicaciones que no solo sean rápidas y escalables, sino también resilientes y fáciles de mantener. En la Parte 4 y final de nuestra serie, abordaremos temas más avanzados como Server-Sent Events, el concepto de Backpressure y el Contexto Reactivo, y, por supuesto, cómo probar eficazmente nuestras aplicaciones WebFlux.
¡Nos vemos en la última parte para solidificar aún más tu conocimiento en WebFlux!