@@ -39,6 +39,7 @@ pub const READY_FOR_QUERY_TAG: u8 = b'Z';
3939// replication message tags
4040pub const XLOG_DATA_TAG : u8 = b'w' ;
4141pub const PRIMARY_KEEPALIVE_TAG : u8 = b'k' ;
42+ pub const INTERPRETED_WAL_RECORD_TAG : u8 = b'0' ;
4243
4344// logical replication message tags
4445const BEGIN_TAG : u8 = b'B' ;
@@ -325,6 +326,7 @@ impl Message {
325326pub enum ReplicationMessage < D > {
326327 XLogData ( XLogDataBody < D > ) ,
327328 PrimaryKeepAlive ( PrimaryKeepAliveBody ) ,
329+ RawInterpretedWalRecords ( RawInterpretedWalRecordsBody < D > ) ,
328330}
329331
330332impl ReplicationMessage < Bytes > {
@@ -370,6 +372,21 @@ impl ReplicationMessage<Bytes> {
370372 reply,
371373 } )
372374 }
375+ INTERPRETED_WAL_RECORD_TAG => {
376+ let streaming_lsn = buf. read_u64 :: < BigEndian > ( ) ?;
377+ let commit_lsn = buf. read_u64 :: < BigEndian > ( ) ?;
378+ let next_record_lsn = match buf. read_u64 :: < BigEndian > ( ) ? {
379+ 0 => None ,
380+ lsn => Some ( lsn) ,
381+ } ;
382+
383+ ReplicationMessage :: RawInterpretedWalRecords ( RawInterpretedWalRecordsBody {
384+ streaming_lsn,
385+ commit_lsn,
386+ next_record_lsn,
387+ data : buf. read_all ( ) ,
388+ } )
389+ }
373390 tag => {
374391 return Err ( io:: Error :: new (
375392 io:: ErrorKind :: InvalidInput ,
@@ -955,6 +972,36 @@ impl<D> XLogDataBody<D> {
955972 }
956973}
957974
975+ #[ derive( Debug ) ]
976+ pub struct RawInterpretedWalRecordsBody < D > {
977+ streaming_lsn : u64 ,
978+ commit_lsn : u64 ,
979+ next_record_lsn : Option < u64 > ,
980+ data : D ,
981+ }
982+
983+ impl < D > RawInterpretedWalRecordsBody < D > {
984+ #[ inline]
985+ pub fn streaming_lsn ( & self ) -> u64 {
986+ self . streaming_lsn
987+ }
988+
989+ #[ inline]
990+ pub fn commit_lsn ( & self ) -> u64 {
991+ self . commit_lsn
992+ }
993+
994+ #[ inline]
995+ pub fn next_record_lsn ( & self ) -> Option < u64 > {
996+ self . next_record_lsn
997+ }
998+
999+ #[ inline]
1000+ pub fn data ( & self ) -> & D {
1001+ & self . data
1002+ }
1003+ }
1004+
9581005#[ derive( Debug ) ]
9591006pub struct PrimaryKeepAliveBody {
9601007 wal_end : u64 ,
0 commit comments