Skip to content

Commit 368c848

Browse files
author
Petr Matousek
committed
incorporate recent development changes (3dcb6a78)
1 parent 4d791f0 commit 368c848

File tree

5 files changed

+55
-72
lines changed

5 files changed

+55
-72
lines changed

src/api/qpid-proton/reactor/handler/ConnectorHandler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ void ConnectorHandler::on_transport_close(transport &t) {
205205

206206
void ConnectorHandler::on_session_error(session &s) {
207207
logger(error) << "The remote peer at " << broker_url.getHost() << ":" << broker_url.getPort() <<
208-
" closed the session with an error condition";
208+
" closed the session with an error condition: " + s.error().what();
209209
closeObjects();
210210
}
211211

src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -133,51 +133,47 @@ int TxReceiverHandler::getBatchSize() const
133133
// reactor methods
134134

135135
void TxReceiverHandler::on_session_open(session &s) {
136-
logger(trace) << "[on_session_open] declare_txn started...";
137-
s.transaction_declare(*this);
138-
logger(trace) << "[on_session_open] declare_txn ended...";
139-
logger(debug) << "[on_session_open] transaction batch size: " << batch_size;
136+
if(!s.transaction_is_declared()) {
137+
logger(trace) << "[on_session_open] New session is open";
138+
s.transaction_declare(*this);
139+
} else {
140+
logger(trace) << "[on_session_open] Transaction is declared: " << s.transaction_id();
141+
recv.add_credit(batch_size);
142+
logger(debug) << "[on_session_open] Receiver credit: " << recv.credit();
143+
if (count != 0 && processed + batch_size > count) {
144+
batch_size = count % batch_size;
145+
}
146+
}
140147
}
141148

142-
void TxReceiverHandler::on_transaction_declare_failed(session) {}
143-
144-
void TxReceiverHandler::on_transaction_commit_failed(session s) {
145-
logger(debug) << "[on_transaction_commit_failed] Transaction Commit Failed";
149+
void TxReceiverHandler::on_session_transaction_commit_failed(session &s) {
150+
logger(debug) << "[on_session_transaction_commit_failed] Transaction Commit Failed";
146151
s.connection().close();
147152
exit(-1);
148153
}
149154

150-
void TxReceiverHandler::on_transaction_declared(session s) {
151-
// TODO python some weird magic around count 0, doesn't make much sense to me yet
152-
// when fixes take care about all count checks ofr zero
153-
if (count != 0 && processed + batch_size > count) {
154-
batch_size = count % batch_size;
155-
} else if (count != 0) {
156-
batch_size = count;
157-
}
158-
logger(trace) << "[on_transaction_declared] txn called " << s.transaction_id();
159-
}
160-
161-
void TxReceiverHandler::on_transaction_aborted(session s) {
155+
void TxReceiverHandler::on_session_transaction_aborted(session &s) {
162156
processed += current_batch;
163157
current_batch = 0;
164-
logger(debug) << "[on_transaction_aborted] messages aborted, processed: " << processed;
158+
logger(debug) << "[on_session_transaction_aborted] messages aborted, processed: " << processed;
165159
if (count == 0 || processed < count) {
160+
logger(info) << "[on_session_transaction_aborted] re-declaring transaction";
166161
s.transaction_declare(*this);
167162
} else {
168-
logger(info) << "[on_transaction_aborted] All messages processed";
163+
logger(info) << "[on_session_transaction_aborted] All messages processed";
169164
s.connection().close();
170165
}
171166
}
172167

173-
void TxReceiverHandler::on_transaction_committed(session s) {
168+
void TxReceiverHandler::on_session_transaction_committed(session &s) {
174169
processed += current_batch;
175170
current_batch = 0;
176-
logger(debug) << "[on_transaction_committed] messages committed, processed: " << processed;
171+
logger(debug) << "[on_session_transaction_committed] messages committed, processed: " << processed;
177172
if (count == 0 || processed < count) {
173+
logger(info) << "[on_session_transaction_committed] re-declaring transaction";
178174
s.transaction_declare(*this);
179175
} else {
180-
logger(info) << "[on_transaction_committed] All messages processed";
176+
logger(info) << "[on_session_transaction_committed] All messages processed";
181177
s.connection().close();
182178
}
183179
}
@@ -367,14 +363,10 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
367363
{
368364
logger(debug) << "[on_message] Processing received message";
369365

370-
// TODO legit?
371366
session s = d.session();
372-
373367
d.accept();
374368
current_batch += 1;
375369

376-
logger(debug) << "[on_message] current batch: " << current_batch;
377-
378370
if (log_msgs == "dict") {
379371
logger(trace) << "[on_message] Decoding message";
380372
ReactorDecoder decoder = ReactorDecoder(m);
@@ -436,8 +428,10 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
436428
#endif
437429
}
438430

431+
logger(debug) << "[on_message] Receiver credit: " << recv.credit();
432+
logger(debug) << "[on_message] Current batch: " << current_batch;
439433
if(current_batch == batch_size) {
440-
logger(debug) << "[send] Transaction attempt: " << tx_action;
434+
logger(debug) << "[on_message] Transaction attempt: " << tx_action;
441435
if (tx_action == "commit") {
442436
s.transaction_commit();
443437
} else if (tx_action == "rollback") {
@@ -459,7 +453,7 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
459453
}
460454

461455
} else if (count != 0 && processed + current_batch == count) {
462-
logger(debug) << "[send] Transaction attempt (endloop): " << tx_endloop_action;
456+
logger(debug) << "[on_message] Transaction attempt (endloop): " << tx_endloop_action;
463457
if (tx_endloop_action == "commit") {
464458
s.transaction_commit();
465459
} else if (tx_endloop_action == "rollback") {

src/api/qpid-proton/reactor/handler/TxReceiverHandler.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,9 @@ class TxReceiverHandler : public ReceiverHandler {
142142
// reactor method
143143
void on_session_open(session &s);
144144
void on_transaction_declare_failed(session);
145-
void on_transaction_commit_failed(session s);
146-
void on_transaction_declared(session s);
147-
void on_transaction_committed(session s);
148-
void on_transaction_aborted(session s);
145+
void on_session_transaction_commit_failed(session &s);
146+
void on_session_transaction_committed(session &s);
147+
void on_session_transaction_aborted(session &s);
149148

150149
// overrides
151150
void on_container_start(container &c);

src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,9 @@ void TxSenderHandler::checkIfCanSend() {
124124
}
125125
}
126126

127-
void TxSenderHandler::send(session s)
127+
void TxSenderHandler::send()
128128
{
129+
session s = sndr.session();
129130
logger(debug) << "[send] Preparing to send message";
130131
int credit = sndr.credit();
131132

@@ -223,49 +224,39 @@ void TxSenderHandler::on_sendable(sender &s)
223224
{
224225
logger(trace) << "[on_sendable] transaction: " << &s;
225226
if (ready) {
226-
send(s.session());
227+
send();
227228
}
228229
}
229230

230-
void TxSenderHandler::on_tracker_accept(tracker &t)
231-
{
232-
logger(trace) << "[on_tracker_accept] Message accepted, confirmed message delivery: " << processed;
233-
}
234-
235231
void TxSenderHandler::on_connection_close(connection &c)
236232
{
237233
current_batch = 0;
238234
logger(debug) << "[on_connection_close] Closing connection";
239235
}
240236

241-
void TxSenderHandler::on_transaction_declared(session s) {
242-
logger(trace) << "[on_transaction_declared] txn called " << s.transaction_id();
243-
send(s);
244-
}
245-
246-
void TxSenderHandler::on_transaction_committed(session s) {
247-
logger(trace) << "[on_transaction_committed] Messages committed";
237+
void TxSenderHandler::on_session_transaction_committed(session &s) {
238+
logger(trace) << "[on_session_transaction_committed] Messages committed";
248239
processed += current_batch;
249-
logger(debug) << "[on_transaction_committed] Messages processed" << processed;
240+
logger(debug) << "[on_session_transaction_committed] Messages processed: " << processed;
250241
if (processed == count) {
251-
logger(trace) << "[on_transaction_committed] All messages processed";
242+
logger(trace) << "[on_session_transaction_committed] All messages processed";
252243
s.connection().close();
253244
} else {
254-
logger(trace) << "[on_transaction_committed] Declaring new transaction";
245+
logger(trace) << "[on_session_transaction_committed] Declaring new transaction";
255246
current_batch = 0;
256247
s.transaction_declare(*this);
257248
}
258249
}
259250

260-
void TxSenderHandler::on_transaction_aborted(session s) {
261-
logger(trace) << "[on_transaction_aborted] Messages aborted";
251+
void TxSenderHandler::on_session_transaction_aborted(session &s) {
252+
logger(trace) << "[on_session_transaction_aborted] Messages aborted";
262253
processed += current_batch;
263-
logger(debug) << "[on_transaction_aborted] Messages processed" << processed;
254+
logger(debug) << "[on_session_transaction_aborted] Messages processed: " << processed;
264255
if (processed == count) {
265-
logger(trace) << "[on_transaction_aborted] All messages processed";
256+
logger(trace) << "[on_session_transaction_aborted] All messages processed";
266257
s.connection().close();
267258
} else {
268-
logger(trace) << "[on_transaction_aborted] Declaring new transaction";
259+
logger(trace) << "[on_session_transaction_aborted] Declaring new transaction";
269260
current_batch = 0;
270261
s.transaction_declare(*this);
271262
}
@@ -276,9 +267,13 @@ void TxSenderHandler::on_sender_close(sender &s) {
276267
}
277268

278269
void TxSenderHandler::on_session_open(session &s) {
279-
logger(trace) << "[on_session_open] declare_txn started...";
280-
s.transaction_declare(*this);
281-
logger(trace) << "[on_session_open] declare_txn ended...";
270+
if(!s.transaction_is_declared()) {
271+
logger(trace) << "[on_session_open] New session is open";
272+
s.transaction_declare(*this);
273+
} else {
274+
logger(trace) << "[on_session_open] Transaction is declared: " << s.transaction_id();
275+
send();
276+
}
282277
}
283278

284279
void TxSenderHandler::on_container_start(container &c)
@@ -377,10 +372,8 @@ void TxSenderHandler::on_container_start(container &c)
377372
// #endif
378373
}
379374

380-
void TxSenderHandler::on_transaction_declare_failed(session) {}
381-
382-
void TxSenderHandler::on_transaction_commit_failed(session s) {
383-
logger(error) << "[on_transaction_commit_failed] Transaction Commit Failed";
375+
void TxSenderHandler::on_session_transaction_commit_failed(session &s) {
376+
logger(error) << "[on_session_transaction_commit_failed] Transaction Commit Failed";
384377
s.connection().close();
385378
exit(1);
386379
}

src/api/qpid-proton/reactor/handler/TxSenderHandler.h

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,21 +111,18 @@ class TxSenderHandler : public SenderHandler {
111111

112112
// overrides
113113
void checkIfCanSend();
114-
void send(session s);
114+
void send();
115115

116116
// reactor methods
117117
void on_sender_close(sender &s);
118-
void on_transaction_declared(session s);
119-
void on_transaction_committed(session s);
120-
void on_transaction_aborted(session s);
121-
void on_transaction_declare_failed(session s);
122-
void on_transaction_commit_failed(session s);
118+
void on_session_transaction_commit_failed(session &s);
119+
void on_session_transaction_committed(session &s);
120+
void on_session_transaction_aborted(session &s);
123121

124122
// overrides
125123
void on_container_start(container &c);
126124
void on_session_open(session &s);
127125
void on_sendable(sender &s);
128-
void on_tracker_accept(tracker &t);
129126
void on_connection_close(connection &c);
130127

131128
private:

0 commit comments

Comments
 (0)