@@ -2410,6 +2410,8 @@ cdef class DeqMessage(Message):
24102410 if num_bytes > 0 :
24112411 ptr = buf._get_raw(num_bytes)
24122412 self .props_impl.enq_txn_id = ptr[:num_bytes]
2413+ else :
2414+ self .props_impl.enq_txn_id = None
24132415 buf.read_ub4(& num_extensions) # number of extensions
24142416 if num_extensions > 0 :
24152417 buf.skip_ub1()
@@ -2544,8 +2546,10 @@ cdef class DeqMessage(Message):
25442546 buf.write_ub4(0 ) # condition length
25452547 buf.write_uint8(0 ) # extensions
25462548 buf.write_ub4(0 ) # number of extensions
2547- buf.write_uint8(0 ) # JSON payload
2548- buf.write_ub4(- 1 ) # shard id
2549+ if buf._caps.ttc_field_version >= TNS_CCAP_FIELD_VERSION_20_1:
2550+ buf.write_uint8(0 ) # JSON payload
2551+ if buf._caps.ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1:
2552+ buf.write_ub4(- 1 ) # shard id
25492553
25502554 buf.write_bytes_with_length(queue_name_bytes)
25512555 if consumer_name_bytes is not None :
@@ -2610,9 +2614,13 @@ cdef class EnqMessage(Message):
26102614 exceptionq_bytes = self .props_impl.exceptionq.encode()
26112615 buf.write_ub4(len (exceptionq_bytes))
26122616 buf.write_bytes_with_length(exceptionq_bytes)
2613- buf.write_ub4(self .props_impl.state)
2617+ buf.write_ub4(self .props_impl.state) # message state
26142618 buf.write_ub4(0 ) # enqueue time length
2615- buf.write_ub4(0 ) # enqueue transaction id length
2619+ if self .props_impl.enq_txn_id is None :
2620+ buf.write_ub4(0 ) # enqueue txn id length
2621+ else :
2622+ buf.write_ub4(len (self .props_impl.enq_txn_id))
2623+ buf.write_bytes_with_length(self .props_impl.enq_txn_id)
26162624 buf.write_ub4(4 ) # number of extensions
26172625 buf.write_uint8(0x0e ) # unknown extra byte
26182626 buf.write_extension_values(None , None , TNS_AQ_EXT_KEYWORD_AGENT_NAME)
@@ -2625,7 +2633,8 @@ cdef class EnqMessage(Message):
26252633 buf.write_ub4(0 ) # cscn
26262634 buf.write_ub4(0 ) # dscn
26272635 buf.write_ub4(0 ) # flags
2628- buf.write_ub4(0xffffffffl ) # shard id
2636+ if buf._caps.ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1:
2637+ buf.write_ub4(0xffffffffl ) # shard id
26292638
26302639 if self .props_impl.recipients is None :
26312640 buf.write_uint8(0 ) # recipients (pointer)
@@ -2655,7 +2664,7 @@ cdef class EnqMessage(Message):
26552664 buf.write_uint8(1 ) # return message id (pointer)
26562665 buf.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH) # return message id length
26572666 enq_flags = 0
2658- if ( self .enq_options_impl.delivery_mode == TNS_AQ_MSG_BUFFERED) :
2667+ if self .enq_options_impl.delivery_mode == TNS_AQ_MSG_BUFFERED:
26592668 enq_flags |= TNS_KPD_AQ_BUFMSG
26602669 buf.write_ub4(enq_flags) # enqueue flags
26612670 buf.write_uint8(0 ) # extensions 1 (pointer)
@@ -2675,10 +2684,11 @@ cdef class EnqMessage(Message):
26752684 buf.write_ub4(0 ) # sender address length
26762685 buf.write_uint8(0 ) # sender charset id (pointer)
26772686 buf.write_uint8(0 ) # sender ncharset id (pointer)
2678- if self .queue_impl.is_json:
2679- buf.write_uint8(1 ) # JSON payload (pointer)
2680- else :
2681- buf.write_uint8(0 ) # JSON payload (pointer)
2687+ if buf._caps.ttc_field_version >= TNS_CCAP_FIELD_VERSION_20_1:
2688+ if self .queue_impl.is_json:
2689+ buf.write_uint8(1 ) # JSON payload (pointer)
2690+ else :
2691+ buf.write_uint8(0 ) # JSON payload (pointer)
26822692
26832693 buf.write_bytes_with_length(queue_name_bytes)
26842694 buf.write_bytes(self .props_impl.toid)
0 commit comments