View on GitHub

postgresql-replicant

A streaming PostgreSQL logical replication client library for Haskell

Introduction

postgresql-replicant is a library for Haskell that understands PostgreSQL’s logical replication protocol and can stream changes that happen in your database to your Haskell programs.

Let’s say you work on an application built on top of PostgreSQL. You’ve made sure your schema have appropriate updated_at and created_at columns. And then there comes an error report that requires you to figure out when a value in a row in one of your tables changed. The problem is that this event happened ages ago. The row has definitely been updated since.

Maybe you have implemented history tables and have trigger functions to track these changes over time.

Perhaps you have a web application front-end that wants to receive real-time updates to the data models so that users can react to important information in your system.

Maybe you need to scale out your data processing, caching and compliance pipelines and want to minimize the impact on your application architecture.

All of these scenarios and more can be solved with postgresql-replicant.

Streaming Replication

PostgeSQL long ago implemented a feature that enables it to copy data from one publishing server to a subscribing server. This feature streams these changes as they are applied on the publisher to the subscriber. It is called streaming replication and is the feature that postgresql-replicant exploits.

PostgreSQL offers two kinds of streaming replication: logical and physical. For now postgresql-replicant is only concerned with logical replication. In this form of replication data is copied by streaming the changes that happen at the level of database objects and the statements that modify them. It essentially streams the insert, update and delete commands that make up each transaction.

Physical replication on the other hand streams the raw chunks of bytes that make up the data on the servers’ primary storage.

Changes

A program using this library will be mostly concerned with the Change type:

data Change
= Change
{ changeNextLSN :: LSN
, changeDeltas  :: [WalLogData]
}
deriving (Eq, Generic, Show)

A value of this type represents a transaction that occurred on the publishing server. The list of WalLogData values represent the statements that were applied to the publishing server in the transaction. Each change appears in the list in the same order as they were applied on the server.

You can think of the LSN type as a pointer into the publishing stream. If your program is disconnected the server will begin storing the changes. Once your program reconnects postgresql-replicant will query the replication slot and ask to start the stream where it left off. This way your program should not miss a change.

Once you have consumed the change and applied it you return the LSN value given in the Change to postgresql-replicant. This will update the server on your program’s location in the stream.

Replication Slots

In order to consume a replication stream a client creates a replication slot on the PostgreSQL publishing server. The server uses this slot to track the state of the client and to queue up changes for the client should they lose their connection.

postgresql-replicant can handle creating, querying, and connecting to a replication slot. You supply a name for the slot in the connection settings and postgresql-replicant can take care of the rest. Slot names must be unique and can contain only the same characters as table names.

You can query the state of the replication slot created by postgresql-replicant using this query:

select * from pg_replication_slots where slot_name = 'my-slot';

Connections

postgresql-replicant creates a ReplicantConnection type which is a new-type wrapper around postgresql-libpq Connection. The reason for this is that the connection created by this library enables it to send the replication commands that are not part of standard SQL.

Non-trivial programs will likely want to manage the underlying connection to provide resource management, connection pools, and logic to retry failed connections, etc. It is best to use the connect function to create the initial connection and manage it separately from regular Connection values. It is important to not wrap a non-replication connection in a ReplicantConnection as this will likely fail and throw an exception as soon as a replication command is sent over the socket.

How To Read this Library

The primary entry-point postgresql-replicant is src/Database/PostgreSQL/Replicant.hs. This is the natural place to begin understanding how this library is structured. A user of this library should ideally only have to import this module and have all of the tools they need to use the library as intended.

We try to use explicit export lists as infrequently as possible so that users can have access to internal types, functions, etc for whatever reason when integrating postgresql-replicant into their program. We have used them in modules in order to enforce a module-level invariant on a type constructor, for example: src/Database/PostgreSQL/Replicant/Connection.hs. A user could not create a value of ReplicantConnection with an arbitrary Connection. Only properly configured Connection values can be used. Improperly constructed Connection objects would result in run-time exceptions. Means to access the internals of ReplicantConnection are still provided. The idea is to use the feature sparingly.

The general layout for the rest of the modules is to encapsulate a single responsibility or concept each:

The Replicant.hs module re-exports internals taken from these modules to give the library its API. The most important of these is Protocol.hs and Message.hs which work in tandem. Protocol.hs handles the replication protocol which, when invoked with startReplicationStream will race two threads:

If either thread should throw an exception in IO the underlying connection will be shutdown and the exception re-thrown to the user.

Cookbook

Coming soon