-
Notifications
You must be signed in to change notification settings - Fork 51
Description
when the invocation takes some time and blocks, other calls to outbox.flush (e.g. from other instances of the application) will be blocked, effectively blocking the whole outbox.
Details
In my example, i have setup outbox to send messages to kafka (using ordered) after writing to db. I setup an experiment where i delay sending indefinitely (i.e. thread.sleep(...)).
Then i trigger another call to flush outbox on a different thread, but it waits and doesnt return. the problematic quey is also logged as slowQuery:
Query:["UPDATE TXNO_OUTBOX SET lastAttemptTime = ?, nextAttemptTime = ?, attempts = ?, blocked = ?, processed = ?, version = ? WHERE id = ? and version = ?"]
only after the original invocation finishes, other threads are unblocked with Beaten to optimistic lock on as expected.
for testing, i am using postgresql:17
Edit:
i tried an alternative query and this seemed to work better:
UPDATE TXNO_OUTBOX SET lastAttemptTime = ?, nextAttemptTime = ?, attempts = ?, blocked = ?, processed = ?, version = ? WHERE id = (SELECT id from TXNO_OUTBOX where id = ? and version = ? FOR UPDATE SKIP LOCKED)
i guess, this query somehow needs to be made configurable via dialects or so...
Logs
I attached the whole log where you can follow the sequence of events happening:
you can see that thread nio-8080-exec-7 is blocked (this is the one that executes outbox.flush a second time)
2025-01-24T08:39:17.143+01:00 DEBUG 34618 --- [nio-8080-exec-6] c.g.t.TransactionOutboxImpl : Flushing stale tasks
2025-01-24T08:39:17.150+01:00 DEBUG 34618 --- [nio-8080-exec-6] c.g.transactionoutbox.DefaultPersistor : Found 0 results
2025-01-24T08:39:17.151+01:00 DEBUG 34618 --- [nio-8080-exec-6] c.g.t.TransactionOutboxImpl : Got batch of 0
2025-01-24T08:39:17.152+01:00 DEBUG 34618 --- [nio-8080-exec-6] c.g.t.TransactionOutboxImpl : Submitted batch
2025-01-24T08:39:17.156+01:00 DEBUG 34618 --- [nio-8080-exec-6] c.g.t.TransactionOutboxImpl : No records found to delete as of 2025-01-24T07:39:17.143928Z
2025-01-24T08:39:17.156+01:00 DEBUG 34618 --- [nio-8080-exec-6] c.g.t.TransactionOutboxImpl : Flushing topics
2025-01-24T08:39:17.160+01:00 DEBUG 34618 --- [nio-8080-exec-6] c.g.transactionoutbox.DefaultPersistor : Found TransactionOutboxEntry(id=dac14d73-2f47-48f0-be38-bf476f0c2422, uniqueRequestId=null, topic=animal, sequence=2, invocation=Invocation(className=kafkaFacade, methodName=send, parameterTypes=[class java.lang.String, class java.lang.Long, class java.lang.String], args=[animal, 2, kafka-slow], mdc=null), lastAttemptTime=null, nextAttemptTime=2025-01-24T07:39:06.921771Z, attempts=0, blocked=false, processed=false, version=0)
2025-01-24T08:39:17.160+01:00 DEBUG 34618 --- [nio-8080-exec-6] c.g.transactionoutbox.DefaultPersistor : Found 1 results
2025-01-24T08:39:17.160+01:00 DEBUG 34618 --- [nio-8080-exec-6] c.g.t.TransactionOutboxImpl : Triggering kafkaFacade.send("animal", 2, "kafka-slow") [dac14d73-2f47-48f0-be38-bf476f0c2422] seq=[animal/2]
2025-01-24T08:39:17.164+01:00 DEBUG 34618 --- [nio-8080-exec-6] c.g.transactionoutbox.DefaultPersistor : Updated kafkaFacade.send("animal", 2, "kafka-slow") [dac14d73-2f47-48f0-be38-bf476f0c2422] seq=[animal/2]
2025-01-24T08:39:17.166+01:00 DEBUG 34618 --- [nio-8080-exec-6] c.g.t.TransactionOutboxImpl : Got batch of 1
2025-01-24T08:39:17.167+01:00 DEBUG 34618 --- [nio-8080-exec-6] c.g.transactionoutbox.ExecutorSubmitter : Submitted kafkaFacade.send("animal", 2, "kafka-slow") [dac14d73-2f47-48f0-be38-bf476f0c2422] seq=[animal/2] for immediate processing
2025-01-24T08:39:17.167+01:00 DEBUG 34618 --- [nio-8080-exec-6] c.g.t.TransactionOutboxImpl : Submitted batch
2025-01-24T08:39:17.171+01:00 INFO 34618 --- [pool-2-thread-1] c.g.t.TransactionOutboxImpl : Processing kafkaFacade.send("animal", 2, "kafka-slow") [dac14d73-2f47-48f0-be38-bf476f0c2422] seq=[animal/2]
2025-01-24T08:39:17.171+01:00 DEBUG 34618 --- [pool-2-thread-1] c.g.t.TransactionOutboxImpl : Created instance de.kleinanzeigen.outbox.sample.KafkaFacade@2625eb84
2025-01-24T08:39:17.171+01:00 DEBUG 34618 --- [pool-2-thread-1] c.gruelbox.transactionoutbox.Invocation : Invoking method public void de.kleinanzeigen.outbox.sample.KafkaFacade.send(java.lang.String,java.lang.Long,java.lang.String) with args [animal, 2, kafka-slow]
######## pool-2-thread-1 Slowing down kafka producer...
2025-01-24T08:39:20.067+01:00 DEBUG 34618 --- [nio-8080-exec-7] c.g.t.TransactionOutboxImpl : Flushing stale tasks
2025-01-24T08:39:20.076+01:00 DEBUG 34618 --- [nio-8080-exec-7] c.g.transactionoutbox.DefaultPersistor : Found 0 results
2025-01-24T08:39:20.078+01:00 DEBUG 34618 --- [nio-8080-exec-7] c.g.t.TransactionOutboxImpl : Got batch of 0
2025-01-24T08:39:20.078+01:00 DEBUG 34618 --- [nio-8080-exec-7] c.g.t.TransactionOutboxImpl : Submitted batch
2025-01-24T08:39:20.085+01:00 DEBUG 34618 --- [nio-8080-exec-7] c.g.t.TransactionOutboxImpl : No records found to delete as of 2025-01-24T07:39:20.067281Z
2025-01-24T08:39:20.085+01:00 DEBUG 34618 --- [nio-8080-exec-7] c.g.t.TransactionOutboxImpl : Flushing topics
2025-01-24T08:39:20.090+01:00 DEBUG 34618 --- [nio-8080-exec-7] c.g.transactionoutbox.DefaultPersistor : Found TransactionOutboxEntry(id=dac14d73-2f47-48f0-be38-bf476f0c2422, uniqueRequestId=null, topic=animal, sequence=2, invocation=Invocation(className=kafkaFacade, methodName=send, parameterTypes=[class java.lang.String, class java.lang.Long, class java.lang.String], args=[animal, 2, kafka-slow], mdc=null), lastAttemptTime=2025-01-24T07:39:17.161045Z, nextAttemptTime=2025-01-24T07:39:18.161Z, attempts=0, blocked=false, processed=false, version=1)
2025-01-24T08:39:20.091+01:00 DEBUG 34618 --- [nio-8080-exec-7] c.g.transactionoutbox.DefaultPersistor : Found 1 results
2025-01-24T08:39:20.091+01:00 DEBUG 34618 --- [nio-8080-exec-7] c.g.t.TransactionOutboxImpl : Triggering kafkaFacade.send("animal", 2, "kafka-slow") [dac14d73-2f47-48f0-be38-bf476f0c2422] seq=[animal/2]
2025-01-24T08:44:20.089+01:00 WARN 34618 --- [pool-1-thread-1] n.t.d.l.logging.SLF4JSlowQueryListener :
Name:dataSource, Connection:31, Time:299993, Success:False
Type:Prepared, Batch:False, QuerySize:1, BatchSize:0
Query:["UPDATE TXNO_OUTBOX SET lastAttemptTime = ?, nextAttemptTime = ?, attempts = ?, blocked = ?, processed = ?, version = ? WHERE id = ? and version = ?"]
Params:[(2025-01-24 08:39:20.091643,2025-01-24 08:39:21.091,0,false,false,2,dac14d73-2f47-48f0-be38-bf476f0c2422,1)]
######## pool-2-thread-1 Sending message to kafka
2025-01-24T08:45:17.175+01:00 DEBUG 34618 --- [pool-2-thread-1] c.g.transactionoutbox.DefaultPersistor : Deleted kafkaFacade.send("animal", 2, "kafka-slow") [dac14d73-2f47-48f0-be38-bf476f0c2422] seq=[animal/2]
2025-01-24T08:45:17.178+01:00 INFO 34618 --- [pool-2-thread-1] c.g.t.TransactionOutboxImpl : Processed kafkaFacade.send("animal", 2, "kafka-slow") [dac14d73-2f47-48f0-be38-bf476f0c2422] seq=[animal/2]
2025-01-24T08:45:17.178+01:00 DEBUG 34618 --- [nio-8080-exec-7] c.g.t.TransactionOutboxImpl : Beaten to optimistic lock on kafkaFacade.send("animal", 2, "kafka-slow") [dac14d73-2f47-48f0-be38-bf476f0c2422] seq=[animal/2]
Success: TransactionOutboxEntry(id=dac14d73-2f47-48f0-be38-bf476f0c2422, uniqueRequestId=null, topic=animal, sequence=2, invocation=Invocation(className=kafkaFacade, methodName=send, parameterTypes=[class java.lang.String, class java.lang.Long, class java.lang.String], args=[animal, 2, kafka-slow], mdc=null), lastAttemptTime=2025-01-24T07:39:17.161045Z, nextAttemptTime=2025-01-24T07:39:18.161Z, attempts=0, blocked=false, processed=false, version=1)
2025-01-24T08:45:17.179+01:00 DEBUG 34618 --- [nio-8080-exec-7] c.g.t.TransactionOutboxImpl : Got batch of 0
2025-01-24T08:45:17.179+01:00 DEBUG 34618 --- [nio-8080-exec-7] c.g.t.TransactionOutboxImpl : Submitted batch
2025-01-24T08:48:17.185+01:00 INFO 34618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node -1 disconnected.