@@ -39,6 +39,7 @@ pub const READY_FOR_QUERY_TAG: u8 = b'Z';
3939// replication message tags
4040pub const XLOG_DATA_TAG : u8 = b'w' ;
4141pub const PRIMARY_KEEPALIVE_TAG : u8 = b'k' ;
42+ pub const INTERPRETED_WAL_RECORD_TAG : u8 = b'0' ;
4243
4344// logical replication message tags
4445const BEGIN_TAG : u8 = b'B' ;
@@ -325,6 +326,7 @@ impl Message {
325326pub enum ReplicationMessage < D > {
326327 XLogData ( XLogDataBody < D > ) ,
327328 PrimaryKeepAlive ( PrimaryKeepAliveBody ) ,
329+ RawInterpretedWalRecords ( RawInterpretedWalRecordsBody < D > ) ,
328330}
329331
330332impl ReplicationMessage < Bytes > {
@@ -370,6 +372,21 @@ impl ReplicationMessage<Bytes> {
370372 reply,
371373 } )
372374 }
375+ INTERPRETED_WAL_RECORD_TAG => {
376+ let streaming_lsn = buf. read_u64 :: < BigEndian > ( ) ?;
377+ let commit_lsn = buf. read_u64 :: < BigEndian > ( ) ?;
378+ let next_record_lsn = match buf. read_u64 :: < BigEndian > ( ) ? {
379+ 0 => None ,
380+ lsn => Some ( lsn) ,
381+ } ;
382+
383+ ReplicationMessage :: RawInterpretedWalRecords ( RawInterpretedWalRecordsBody {
384+ streaming_lsn,
385+ commit_lsn,
386+ next_record_lsn,
387+ data : buf. read_all ( ) ,
388+ } )
389+ }
373390 tag => {
374391 return Err ( io:: Error :: new (
375392 io:: ErrorKind :: InvalidInput ,
@@ -950,6 +967,36 @@ impl<D> XLogDataBody<D> {
950967 }
951968}
952969
970+ #[ derive( Debug ) ]
971+ pub struct RawInterpretedWalRecordsBody < D > {
972+ streaming_lsn : u64 ,
973+ commit_lsn : u64 ,
974+ next_record_lsn : Option < u64 > ,
975+ data : D ,
976+ }
977+
978+ impl < D > RawInterpretedWalRecordsBody < D > {
979+ #[ inline]
980+ pub fn streaming_lsn ( & self ) -> u64 {
981+ self . streaming_lsn
982+ }
983+
984+ #[ inline]
985+ pub fn commit_lsn ( & self ) -> u64 {
986+ self . commit_lsn
987+ }
988+
989+ #[ inline]
990+ pub fn next_record_lsn ( & self ) -> Option < u64 > {
991+ self . next_record_lsn
992+ }
993+
994+ #[ inline]
995+ pub fn data ( & self ) -> & D {
996+ & self . data
997+ }
998+ }
999+
9531000#[ derive( Debug ) ]
9541001pub struct PrimaryKeepAliveBody {
9551002 wal_end : u64 ,
0 commit comments