Redis Streams: xadd + xread

Esta ocasión mostraré un ejemplo de Redis Streams, con los metodos xadd y xread utilizando la librería Lettuce

Generar Mensajes

El Stream tendrá el nombre mensajes y cada mensaje enviado tendrá un UUID y un número indicando la cantidad de mensajes enviados:
  1. final var cliente = RedisClient.create("redis://localhost:6379");
  2. try (final var con = cliente.connect()) {
  3. final var cmd = con.sync();
  4. int i = 0;
  5. while (i < 500_000) {
  6. final var res =
  7. cmd.xadd("mensajes", Map.of(UUID.randomUUID().toString(),
  8. String.valueOf(i)));
  9. i++;
  10. LOGGER.info("enviados: {} - id: {}", i, res);
  11. Thread.sleep(1_000);
  12. }
  13. }
  14. cliente.shutdown(); 

Recibir Mensajes
El cliente lee un mensaje a la vez y recuerda el último ID leido para preguntar por los nuevos mensajes y se bloquea 5 segundos esperando mensajes:
  1. final var cliente = RedisClient.create("redis://localhost:6379");
  2. String ultimo = "0";
  3. boolean leyendo = true;
  4. var cuentaMensajes = 0;
  5. try (final var con = cliente.connect()) {
  6. final var cmd = con.sync();
  7. while (leyendo) {
  8. final var mensajes = cmd.xread(
  9. XReadArgs.Builder.block(Duration.ofSeconds(5)).count(1),
  10. StreamOffset.from("mensajes", ultimo));
  11. cuentaMensajes += mensajes.size();
  12. LOGGER.info("leidos: {}", cuentaMensajes);
  13. for (final var m : mensajes) {
  14. LOGGER.info("{}", m.getId());
  15. ultimo = m.getId();
  16. }
  17. }
  18. }
  19. cliente.shutdown(); 
El código es bastante simple (quitando los extras como la pausa y los comentarios quedarian menos lineas) y cumple el objetivo de mostrar lo sencillo que es utilizar xadd y xread con Lettuce para Streams del poderoso Redis 💪.

El código queda disponible en GitHub 🙃.



No hay comentarios.:

Publicar un comentario