Introduction
postgresql-replicant is a library for Haskell that understands
PostgreSQL’s logical replication protocol and can stream changes that
happen in your database to your Haskell programs.
Let’s say you work on an application built on top of PostgreSQL.
You’ve made sure your schema have appropriate updated_at and
created_at columns. And then there comes an error report that
requires you to figure out when a value in a row in one of your tables
changed. The problem is that this event happened ages ago. The row
has definitely been updated since.
Maybe you have implemented history tables and have trigger functions to track these changes over time.
Perhaps you have a web application front-end that wants to receive real-time updates to the data models so that users can react to important information in your system.
Maybe you need to scale out your data processing, caching and compliance pipelines and want to minimize the impact on your application architecture.
All of these scenarios and more can be solved with
postgresql-replicant.
Streaming Replication
PostgeSQL long ago implemented a feature that enables it to copy data
from one publishing server to a subscribing server. This feature
streams these changes as they are applied on the publisher to the
subscriber. It is called streaming replication and is the feature
that postgresql-replicant exploits.
PostgreSQL offers two kinds of streaming replication: logical and
physical. For now postgresql-replicant is only concerned with
logical replication. In this form of replication data is copied by
streaming the changes that happen at the level of database objects and
the statements that modify them. It essentially streams the insert,
update and delete commands that make up each transaction.
Physical replication on the other hand streams the raw chunks of bytes that make up the data on the servers’ primary storage.
Changes
A program using this library will be mostly concerned with the
Change type:
data Change
= Change
{ changeNextLSN :: LSN
, changeDeltas :: [WalLogData]
}
deriving (Eq, Generic, Show)
A value of this type represents a transaction that occurred on the
publishing server. The list of WalLogData values represent the
statements that were applied to the publishing server in the
transaction. Each change appears in the list in the same order as
they were applied on the server.
You can think of the LSN type as a pointer into the publishing
stream. If your program is disconnected the server will begin storing
the changes. Once your program reconnects postgresql-replicant will
query the replication slot and ask to start the stream where it left
off. This way your program should not miss a change.
Once you have consumed the change and applied it you return the LSN
value given in the Change to postgresql-replicant. This will
update the server on your program’s location in the stream.
Replication Slots
In order to consume a replication stream a client creates a replication slot on the PostgreSQL publishing server. The server uses this slot to track the state of the client and to queue up changes for the client should they lose their connection.
postgresql-replicant can handle creating, querying, and connecting
to a replication slot. You supply a name for the slot in the
connection settings and postgresql-replicant can take care of the
rest. Slot names must be unique and can contain only the same
characters as table names.
You can query the state of the replication slot created by
postgresql-replicant using this query:
select * from pg_replication_slots where slot_name = 'my-slot';
Connections
postgresql-replicant creates a ReplicantConnection type which is a
new-type wrapper around postgresql-libpq Connection. The reason
for this is that the connection created by this library enables it to
send the replication commands that are not part of standard SQL.
Non-trivial programs will likely want to manage the underlying
connection to provide resource management, connection pools, and logic
to retry failed connections, etc. It is best to use the connect
function to create the initial connection and manage it separately
from regular Connection values. It is important to not wrap a
non-replication connection in a ReplicantConnection as this will
likely fail and throw an exception as soon as a replication command is
sent over the socket.
How To Read this Library
The primary entry-point postgresql-replicant is
src/Database/PostgreSQL/Replicant.hs. This is the natural place to
begin understanding how this library is structured. A user of this
library should ideally only have to import this module and have all of
the tools they need to use the library as intended.
We try to use explicit export lists as infrequently as possible so
that users can have access to internal types, functions, etc for
whatever reason when integrating postgresql-replicant into their
program. We have used them in modules in order to enforce a
module-level invariant on a type constructor, for example:
src/Database/PostgreSQL/Replicant/Connection.hs. A user could not
create a value of ReplicantConnection with an arbitrary
Connection. Only properly configured Connection values can be
used. Improperly constructed Connection objects would result in
run-time exceptions. Means to access the internals of
ReplicantConnection are still provided. The idea is to use the
feature sparingly.
The general layout for the rest of the modules is to encapsulate a single responsibility or concept each:
Connection.hs: create connectionsException.hs: library exceptionsMessage.hs: the protocol message types, parsers, and serializers.PostgresUtils.hs: PostgreSQL-specific helper functionsProtocol.hs: handle the streaming protocolReplicationSlot.hs: create, query, connect to replication slotsSerialize.hs: serialization/parsing helpersState.hs: manage client stream statetypes/Lsn.hs: the LSN PostgreSQL typeUtil.hs: Miscellaneous functions, combinators, etc.
The Replicant.hs module re-exports internals taken from these
modules to give the library its API. The most important of these is
Protocol.hs and Message.hs which work in tandem. Protocol.hs
handles the replication protocol which, when invoked with
startReplicationStream will race two threads:
- Keep Alive Handler: will periodically send status updates to the publishing server and respond to requests for updates from the server
- Copy Out Data Handler: will handle the stream data from the server
and call the users’ callback function to send them the
Changevalues.
If either thread should throw an exception in IO the underlying
connection will be shutdown and the exception re-thrown to the user.
Cookbook
Coming soon