How to Build WatermelonDB Sync Backend in Elixir
WatermelonDB
WatermelonDB is a reactive database for React frontend application that supports data synchronization.
What I like about this database is you can bring your own sync backend (HTTP-based) as long as it complies with this spec:
Operation | Request Params / Body | Response Body |
---|---|---|
Pull | - lastPulledAt : integer, Unix time in milliseconds (ms) | - changes : (JSON) - timestamp : integer, Unix time in milliseconds (ms) |
Push | - changes : (JSON) - lastPulledAt : integer, Unix time in milliseconds (ms) | X |
The following is a brief how pull and push operation works. Please refer to Sync documentation for the details
Pull Operation
Request:
lastPulledAt
is a timestamp retrieved in the last/previous pull operation
Response:
changes
is a JSON containing changes of data (created, updated, deleted) sincelastPulledAt
at servertimestamp
is a timestamp that will replacelastPulledAt
for the next pull operation.
Push Operation
Request:
changes
is a JSON containing data changes on the client (local) that will be applied by server on server DB.lastPulledAt
is a timestamp retrieved in the last/previous pull operation. This is for conflict detection. Server compare modification time of each row ofchanges
on server DB withlastPulledAt
. If it is greater, there is a conflict.
Response:
- No specified response
changes
Example
{
"posts": {
"created": [
{
"id": "d1633195-156f-4f9d-9ccf-7740203b080e",
"_status": "created",
"_changed": "",
"title": "Phoenix",
"content": "Phoenix is a web framework for Elixir",
"likes": 200,
"created_at": 1588400731806,
"updated_at": 1588400731806
}
],
"updated": [
{
"id": "2d7c6a82-eb04-47b1-be52-6f8f6cf806ff",
"_status": "updated",
"_changed": "updated_at,title,content,likes",
"title": "Elixir",
"content": "Elixir is amazing",
"likes": 100,
"created_at": 1588389279195,
"updated_at": 1588400691047
}
],
"deleted": [
"2b130e52-079d-4b31-9f42-ce257cf546f0"
]
},
"comments": {
"created": [],
"updated": [
{
"id": "1e945c88-baf2-4db7-aa39-286b6865b3fb",
"comment": "That's good!"
}
],
"deleted": []
}
}
created
andupdated
is an array of object containing created / updated recordsdeleted
is an array of string of deleted IDs
Sync Flow
Based on the documentation and sync code example (synchronize()
) on client side, this is what will be expected from sync backend:
Proposed Alternative Sync Approach
While in iterations of prototyping sync backend, I took another approach for tracking changes and made a workaround for an issue in regard to WatermelonDB sync behaviour on client side.
Using Auto-incrementing Counter (Version) + Timestamp for Tracking Changes
WatermelonDB sync documentation is good enough to gives a tips for implementing sync backend by using timestamp. It also states:
This protects against weird edge cases related to server clock time changes (NTP time sync, leap seconds, etc.) (Alternatively, instead of using timestamps, you could use auto-incrementing couters, but you’d have to ensure they are consistent across the whole database, not just one table)
I followed its suggestion to use auto-incrementing counter. In my approach, for tracking changes, it needs these server DB setup: a global sequence (version_seq
) & each table have columns: version
(int), version_created
(int), created_at_server
(timestamp), updated_at_server
(timestamp), and deleted_at_server
(timestamp).
Please see Database Design to know how this is implemented in practice.
Workaround for Sync on Client Side
There is a problem if we use sync code example (synchronize()
) for client side on the documentation.
Please look again at Sync Flow diagram above.
In 8, after WatermelonDB receives changes
& timestamp
, internally, timestamp
value is set as lastPulledAt
for the next pull operation. That is not problem.
At first, I assume it will have same mechanism for push operation.
We got new timestamp
that will be become the next lastPulledAt
.
But I am wrong.
Look at 15.
No response at all for push operation AND no way to explicitly set lastPulledAt
on the push operation.
It means for the next pull operation, we will get changes that we’ve just pushed on previous push operation. Well, it actually mentioned in the documentation:
Current limitations
- During next sync pull, changes we’ve just pushed will be pulled again, which is unnecessary. It would be better if server, during push, also pulled local changes since
lastPulledAt
and responded with NEW timestamp to be treated aslastPulledAt
.
I don’t know why this library designed to behave like this. It raised an issue complaining/questioning about that.
Sync Flow Workaround
So this is what I did for a temporary solution/workaround:
- introduce variable
latestVersionOfSession
&changesOfSession
(1) - call
synchronize()
twice (2 & 20) - on first
synchronize()
, pull & push operation retrievelatestVersion
&changes
(8 & 19) then set it aslatestVersionOfSession
&changesOfSession
value - on second
synchronize()
, pull operation only setlastPulledAt = latestVersionOfSession
&changes = changesOfSession
to be applied on LocalDB (22). Push operation does nothing.
This is workaround for the client side. The code is available on the next post.
Application Example: BlogApp
Let’s say we want to build a blog app (web based) that supports data synchronization. User can submit, edit, and delete a post content. If user click Sync button, data located on current browser will be synced to server. So if user open another browser (another client device), data will be automically synced and available on that browser.
This tutorial only covers how to build sync backend implementation. Frontend (ReactJS) implementation is available on the next post: Building an Offline First React Web App Using WatermelonDB in Phoenix (Elixir).
Database Design
Determining Data Changes
For push operation:
- when a new data is created:
version = nextval('version_seq')
version_created = nextval('version_seq')
created_at_server = current time
updated_at_server = current time
- when a data is updated:
version = nextval('version_seq')
updated_at_server = current time
- when a data is deleted:
version = nextval('version_seq')
deleted_at_server = current time
For pull operation:
retrieve all data that were changed since
lastPulledVersion
:SELECT * FROM posts WHERE version_created > <lastPulledVersion> OR version > <lastPulledVersion>
Then categorize which records were created, updated, or deleted :
created
version_created > <lastPulledVersion> AND deleted_at_server IS NULL
updated
created_at_server != updated_at_server AND deleted_at_server IS NULL
deleted
deleted_at_server IS NOT NULL
Sync Backend Implementation
Sync Backend consists of four main components: SyncController
, Sync
context, Blog
context, and Repo
. If you come from another framework, context is kind of like service.
We will build sync backend using Elixir 1.10 and Phoenix 1.5.1
Install Phoenix 1.5.1:
$ mix archive.uninstall phx_new
$ mix archive.install hex phx_new 1.5.1
Generate a new Phoenix web app:
$ mix phx.new blog_app
We will use PostgreSQL on Docker with password and database name specified on config/dev.exs
:
$ docker run --name blog-db -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=blog_app_dev -d -p 5432:5432 postgres:12.2
Create the database:
$ cd blog_app
$ mix ecto.create
Create version_seq
sequence that will generate version for each data changes :
$ mix ecto.gen.migration create_version_seq
# priv/repo/migrations/xxx_create_version_seq.exs
defmodule BlogApp.Repo.Migrations.CreateVersionSeq do
use Ecto.Migration
def change do
execute "CREATE SEQUENCE version_seq"
end
end
$ mix ecto.migrate
Generate Post
schema:
$ mix phx.gen.schema Blog.Post posts title:string content:string likes:integer push_id:integer created_at:utc_datetime_usec updated_at:utc_datetime_usec created_at_server:utc_datetime_usec updated_at_server:utc_datetime_usec deleted_at_server:utc_datetime_usec version:integer version_created:integer --binary-id
Set default value of version*
columns with a incremental number generated by version_seq
.
As it may increase overtime, change version*
columns type to bigint
to support bigger value.
We don’t need inserted_at
and updated_at
columns generated by Phoenix so we omit timestamps()
.
Edit xxx_create_posts.exs
:
# priv/repo/migrations/xxx_create_posts.exs
defmodule BlogApp.Repo.Migrations.CreatePosts do
use Ecto.Migration
def change do
create table(:posts, primary_key: false) do
add :id, :binary_id, primary_key: true
add :title, :string
add :content, :string
add :likes, :integer
add :created_at, :utc_datetime_usec
add :updated_at, :utc_datetime_usec
add :created_at_server, :utc_datetime_usec
add :updated_at_server, :utc_datetime_usec
add :deleted_at_server, :utc_datetime_usec
add :push_id, :integer
- add :version, :integer
+ add :version, :bigint, default: fragment("nextval('version_seq')")
- add :version_created, :integer
+ add :version_created, :bigint, default: fragment("nextval('version_seq')")
- timestamps()
end
end
end
To enable JSON encoding for Post
schema, annotate it with @derive
Jason.Encoder
for certain columns only:
# lib/blog_app/blog/post.ex
defmodule BlogApp.Blog.Post do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
@foreign_key_type :binary_id
+ @derive {Jason.Encoder, only: [:id, :title, :content, :likes]}
schema "posts" do
field :title, :string
field :content, :string
field :likes, :integer
field :created_at, :utc_datetime_usec
field :updated_at, :utc_datetime_usec
field :created_at_server, :utc_datetime_usec
field :updated_at_server, :utc_datetime_usec
field :deleted_at_server, :utc_datetime_usec
field :push_id, :integer
field :version, :integer
field :version_created, :integer
- timestamps()
end
# ...
end
$ mix ecto.migrate
Sync Endpoint
Sync endpoint will be handled by:
- push:
POST /api/sync/push?lastPulledVersion=<lastPulledVersion>
- pull:
GET /api/sync/pull?lastPulledVersion=<lastPulledVersion>
Edit lib/blog_app_web/router.ex
# lib/blog_app_web/router.ex
defmodule BlogAppWeb.Router do
# ...
scope "/api", BlogAppWeb do
pipe_through :api
post "/sync/push", SyncController, :push
get "/sync/pull", SyncController, :pull
end
# ...
end
Controller
Create lib/blog_app_web/controllers/sync_controller.ex
# lib/blog_app_web/controllers/sync_controller.ex
defmodule BlogAppWeb.SyncController do
use BlogAppWeb, :controller
alias BlogApp.Sync
def push(
%Plug.Conn{
body_params: req_body,
query_params: %{"lastPulledVersion" => last_pulled_version}
} = conn,
_params
) do
resp = Sync.push(req_body, String.to_integer(last_pulled_version))
json(conn, resp)
end
def pull(
%Plug.Conn{
query_params: %{"lastPulledVersion" => last_pulled_version}
} = conn,
_params
) do
resp = Sync.pull(String.to_integer(last_pulled_version))
json(conn, resp)
end
end
Push
Changes have to be recorded in a DB transaction. If there is a failed data operation, every operation must be rolled back.
In a push operation, data that will be recorded are also annotated with a push_id
.
Later on, after push operation has been successfully applied, all changes since last_pulled_version
are retrieved to become push response except those have already been applied (filtered with push_id
).
BlogApp.Sync.push/2
:
# lib/blog_app/sync.ex
defmodule BlogApp.Sync do
alias BlogApp.{Repo, Blog}
def push(changes, last_pulled_version) do
push_id = Enum.random(1..1_000_000_000)
Ecto.Multi.new()
|> Blog.record_posts(changes["posts"], last_pulled_version, push_id)
|> Repo.transaction()
pull(last_pulled_version, push_id)
end
# ...
end
BlogApp.Blog.record_posts/4
:
# lib/blog_app/blog.ex
defmodule BlogApp.Blog do
import Ecto.Query
alias Ecto.Multi
alias BlogApp.Repo
alias BlogApp.Blog.Post
def record_posts(%Multi{} = multi, post_changes, last_pulled_version, push_id) do
multi
|> Multi.run(:check_conflict_posts, fn _, _changes ->
case check_conflict_version_posts(post_changes, last_pulled_version) do
:no_conflict -> {:ok, :no_conflict}
:conflict -> {:error, :conflict}
end
end)
|> record_created_posts(post_changes["created"] |> set_push_id(push_id))
|> record_updated_posts(post_changes["updated"] |> set_push_id(push_id))
|> record_deleted_posts(post_changes["deleted"], push_id)
end
# ...
defp set_push_id(posts, push_id) do
posts
|> Enum.map(fn post -> post |> Map.put("push_id", push_id) end)
end
# ...
end
Conflict Detection
Conflict happens when other users/clients have modified data that we’re pushing.
Blog.check_conflict_version_posts/2
:
# lib/blog_app/blog.ex
defmodule BlogApp.Blog do
# ...
def check_conflict_version_posts(post_changes, last_pulled_version) do
ids =
Enum.concat(post_changes["created"], post_changes["updated"])
|> Enum.map(fn post -> post["id"] end)
|> Enum.concat(post_changes["deleted"])
count =
Post
|> select([p], count(p.version))
|> where([p], p.id in ^ids)
|> where([p], p.version > ^last_pulled_version or p.version_created > ^last_pulled_version)
|> Repo.one()
case count do
0 -> :no_conflict
_ -> :conflict
end
end
# ...
end
Storing Record Changes
Data changes are saved on database in bulk using INSERT INTO CONFLICT
on PostgreSQL.
This is also known as UPSERT (update or insert).
Blog.upsert_posts/3
handle both create & update case.
Blog.record_created_posts/2
& Blog.record_updated_posts/2
:
# lib/blog_app/blog.ex
defmodule BlogApp.Blog do
# ...
def record_created_posts(%Multi{} = multi, created_changes),
do: upsert_posts(multi, :create_posts, created_changes)
def record_updated_posts(%Multi{} = multi, updated_changes),
do: upsert_posts(multi, :update_posts, updated_changes)
def upsert_posts(%Multi{} = multi, _name, changes) when is_nil(changes),
do: multi
def upsert_posts(%Multi{} = multi, name, changes) do
now = DateTime.utc_now()
posts =
changes
|> Enum.map(fn row ->
row
|> Map.put("created_at", row["created_at"] * 1000 |> DateTime.from_unix!(:microsecond))
|> Map.put("updated_at", row["updated_at"] * 1000 |> DateTime.from_unix!(:microsecond))
|> Map.put("created_at_server", now)
|> Map.put("updated_at_server", now)
|> Map.take(["id", "title", "content", "likes", "created_at", "updated_at", "created_at_server", "updated_at_server", "push_id"])
|> key_to_atom()
end)
Multi.insert_all(multi, name, Post, posts,
conflict_target: :id,
on_conflict: {:replace_all_except, [:id, :version_created, :created_at_server, :deleted_at_server]},
returning: true
)
end
def key_to_atom(map) do
Enum.reduce(map, %{}, fn
{key, value}, acc when is_atom(key) -> Map.put(acc, key, value)
{key, value}, acc when is_binary(key) -> Map.put(acc, String.to_existing_atom(key), value)
end)
end
# ...
end
Blog.record_deleted_posts/3
:
# lib/blog_app/blog.ex
defmodule BlogApp.Blog do
# ...
def record_deleted_posts(%Multi{} = multi, deleted_ids, _push_id) when is_nil(deleted_ids),
do: multi
def record_deleted_posts(%Multi{} = multi, deleted_ids, push_id) do
now = DateTime.utc_now()
posts =
deleted_ids
|> Enum.map(fn id ->
%{id: id, deleted_at_server: now, push_id: push_id}
end)
Multi.insert_all(multi, :delete_posts, Post, posts,
conflict_target: :id,
on_conflict: {:replace, [:deleted_at_server, :version, :push_id]},
returning: true
)
end
# ...
end
Pull
Pull endpoint calls Sync.pull
without push_id
specified.
It means all data changes since last_pulled_version
become the response of pull operation.
# lib/blog_app/sync.ex
defmodule BlogApp.Sync do
# ...
def pull(last_pulled_version, push_id \\ nil) do
%{latest_version: latest_version_posts, changes: posts_changes} =
Blog.list_posts_changes(last_pulled_version, push_id)
latest_version =
[last_pulled_version, latest_version_posts]
|> Enum.max()
%{
"latestVersion" => latest_version,
"changes" => %{
"posts" => posts_changes
}
}
end
end
Blog.list_posts_changes/2
:
# lib/blog_app/blog.ex
defmodule BlogApp.Blog do
# ...
def list_posts_changes(last_pulled_version, push_id) do
posts_latest =
Post
|> where([p], p.version_created > ^last_pulled_version or p.version > ^last_pulled_version)
|> Repo.all()
posts_changes =
posts_latest
|> Enum.reject(fn post -> is_just_pushed(post, push_id) end)
|> Enum.group_by(fn post ->
cond do
post.version_created > last_pulled_version and is_nil(post.deleted_at_server) -> :created
post.created_at_server != post.updated_at_server and is_nil(post.deleted_at_server) -> :updated
not is_nil(post.deleted_at_server) -> :deleted
end
end)
|> Map.update(:created, [], fn posts -> posts end)
|> Map.update(:updated, [], fn posts -> posts end)
|> Map.update(:deleted, [], fn posts -> posts |> Enum.map(fn post -> post.id end) end)
latest_version = find_latest_version(posts_latest)
%{latest_version: latest_version, changes: posts_changes}
end
defp find_latest_version(posts) do
posts
|> Enum.flat_map(fn post -> [post.version, post.version_created] end)
|> Enum.max(fn -> 0 end)
end
# ...
defp is_just_pushed(_post, push_id) when is_nil(push_id), do: false
defp is_just_pushed(post, push_id), do: post.push_id == push_id
end
Run the Backend
$ mix phx.server
Sync endpoint will be available on
- push:
POST http://localhost:4000/api/sync/push?lastPulledVersion=<lastPulledVersion>
- pull:
GET http://localhost:4000/api/sync/pull?lastPulledVersion=<lastPulledVersion>
What’s Next ?
We have build a sync backend for WatermelonDB frontend application. On the next post, we will continue to code the frontend application with sync capability: Building an Offline First React Web App Using WatermelonDB in Phoenix (Elixir)