Database Replication
SymmetricDS
SymmetricDS is open source software for database and file synchronization, with support for multi-master replication, filtered synchronization, and transformation. It uses web and database technologies to replicate change data as a scheduled or near real-time operation, and it includes an initial load feature for full data loads. The software was designed to scale for a large number of nodes, work across low-bandwidth connections, and withstand periods of network outage.
Biophics
For this project SymmetricDS is used for database replication. We have created tables in the canopy datawarehouse, which need to be replicated in the Biophics sql server database. The tables being replicated are:
raw_events
raw_tasks
raw_clients
raw_plans
raw_jurisdictions
raw_structures
raw_organizations
raw_providers
Setup
In order to setup symmetricDS for Biophics
Download the SymmetricDS standalone ZIP file from here
Unzip the file and place it in
/opt/symmetricDS
in the server (the nifi server for the case of Biophics)Copy the warehouse-000.properties and the target-001.properties files and place them in
/opt/symmetricDS/engines
in the serverCreate SymmetricDS tables with the command
/bin/symadmin --engine warehouse-000 create-sym-tables
Load the SymmetricDS configurations into the root node database from insert_configs.sql with
/bin/dimport --engine warehouse-000 insert_configs.sql
Install with
bin/sym_service install
bin/sym_service start
to start service (andbin/sym_service stop
to stop)To start initial load
bin/symadmin --engine warehouse-000 reload-node 001
Files
warehouse-000.properties
# Friendly name to refer to this node from command line
engine.name=warehouse-000
# The class name for the JDBC Driver
db.driver=org.postgresql.Driver
# The JDBC URL used to connect to the database
db.url=jdbc:postgresql://postgres.reveal-th.smartregister.org/{{ database_name }}
# The database user that SymmetricDS should use.
db.user={{ database_username }}
# The database password
db.password={{ database_password }}
# This node will contact the root node's sync.url to register itself.
# Leave blank to indicate this is the root node.
registration.url=
# Sync URL where other nodes can contact this node to push/pull data or register.
sync.url=http://localhost:31415/sync/warehouse-000
# Node group this node belongs to, which defines what it will sync with who.
# Must match the sym_node_group configuration in database.
group.id=warehouse
# External ID for this node, which is any unique identifier you want to use.
external.id=000
# If this is true, when symmetric starts up it will try to create the necessary tables.
auto.config.database=true
# Whether the pull job is enabled for this node.
start.pull.job=false
# Whether the push job is enabled for this node.
start.push.job=true
# Automatically register new nodes when they request it.
auto.registration=true
# If this is true, a reload is automatically sent to nodes when they register
# auto.reload=true
# How often to run purge job,
job.purge.period.time.ms=7200000
# How to run routing (in millis), which puts changes into batches.
job.routing.period.time.ms=5000
# How often to run push (in millis), which sends changes to other nodes.
job.push.period.time.ms=10000
# How often to run pull (in millis), which receives changes from other nodes.
job.pull.period.time.ms=10000
# When this node sends an initial load of data to another node, first send table create scripts.
initial.load.create.first=true
target-001.properties
# Friendly name to refer to this node from command line
engine.name=target-001
# The class name for the JDBC Driver
db.driver=net.sourceforge.jtds.jdbc.Driver
# The JDBC URL used to connect to the database
db.url=jdbc:jtds:sqlserver://mssql.reveal-th.smartregister.org:1433/{{ database_name }};useCursors=true;bufferMaxMemory=10240;lobBuffer=5242880
# The database user that SymmetricDS should use.
db.user={{ database_username }}
# The database password
db.password={{ database_password }}
# This node will contact the root node's sync.url to register itself.
registration.url=http://localhost:31415/sync/warehouse-000
# Node group this node belongs to, which defines what it will sync with who.
# Must match the sym_node_group configuration in database.
group.id=target
# External ID for this node, which is any unique identifier you want to use.
external.id=001
# If this is true, when symmetric starts up it will try to create the necessary tables.
auto.config.database=true
# Whether the pull job is enabled for this node.
start.pull.job=true
# Whether the push job is enabled for this node.
start.push.job=false
# Automatically register new nodes when they request it.
auto.registration=true
# How to run routing (in millis), which puts changes into batches.
job.routing.period.time.ms=5000
# How often to run push (in millis), which sends changes to other nodes.
job.push.period.time.ms=10000
# How often to run pull (in millis), which receives changes from other nodes.
job.pull.period.time.ms=10000
insert_configs.sql
------------------------------------------------------------------------------
-- Clear and load SymmetricDS Configuration
------------------------------------------------------------------------------
delete from sym_trigger_router;
delete from sym_trigger;
delete from sym_router;
delete from sym_channel;
delete from sym_node_group_link;
delete from sym_node_group;
delete from sym_node_host;
delete from sym_node_identity;
delete from sym_node_security;
delete from sym_node;
------------------------------------------------------------------------------
-- Channels
------------------------------------------------------------------------------
-- Channel "events"
insert into sym_channel
(channel_id, processing_order, max_batch_size, enabled, description)
values('events', 1, 100000, 1, 'Events data');
-- Channel "tasks"
insert into sym_channel
(channel_id, processing_order, max_batch_size, enabled, description)
values('tasks', 1, 100000, 1, 'Tasks data');
-- Channel "plans"
insert into sym_channel
(channel_id, processing_order, max_batch_size, enabled, description)
values('plans', 1, 100000, 1, 'Plans data');
-- Channel "structures"
insert into sym_channel
(channel_id, processing_order, max_batch_size, enabled, description)
values('structures', 1, 100000, 1, 'Structures data');
-- Channel "jurisdictions"
insert into sym_channel
(channel_id, processing_order, max_batch_size, enabled, description)
values('jurisdictions', 1, 100000, 1, 'jurisdictions data');
-- Channel "clients"
insert into sym_channel
(channel_id, processing_order, max_batch_size, enabled, description)
values('clients', 1, 100000, 1, 'Clients data');
-- Channel "organizations"
insert into sym_channel
(channel_id, processing_order, max_batch_size, enabled, description)
values('organizations', 1, 100000, 1, 'Organizations data');
-- Channel "providers"
insert into sym_channel
(channel_id, processing_order, max_batch_size, enabled, description)
values('providers', 1, 100000, 1, 'Providers data');
------------------------------------------------------------------------------
-- Node Groups
------------------------------------------------------------------------------
insert into sym_node_group
(node_group_id, description)
values
('warehouse', 'The Ona data warehouse: postgres db');
insert into sym_node_group
(node_group_id, description)
values
('target', 'The Biophics server: sqlserver db');
------------------------------------------------------------------------------
-- Node Group Links
------------------------------------------------------------------------------
-- Warehouse sends changes to target when target pulls from warehouse
insert into sym_node_group_link
(source_node_group_id, target_node_group_id, data_event_action)
values
('warehouse', 'target', 'W');
------------------------------------------------------------------------------
-- Triggers
------------------------------------------------------------------------------
-- Triggers for tables
insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('events', 'raw_events', 'events', current_timestamp, current_timestamp);
insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('tasks', 'raw_tasks', 'tasks', current_timestamp, current_timestamp);
insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('plans', 'raw_plans', 'plans', current_timestamp, current_timestamp);
insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('structures', 'raw_structures', 'structures', current_timestamp, current_timestamp);
insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('jurisdictions', 'raw_jurisdictions', 'jurisdictions', current_timestamp, current_timestamp);
insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('clients', 'raw_clients', 'clients', current_timestamp, current_timestamp);
insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('organizations', 'raw_organizations', 'organizations', current_timestamp, current_timestamp);
insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('providers', 'raw_providers', 'providers', current_timestamp, current_timestamp);
-- Triggers with capture disabled, so they are used for initial load only
insert into sym_trigger
(trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time)
values('events_warehouse', 'raw_events', 'events', 0, 0, 0, current_timestamp, current_timestamp);
insert into sym_trigger
(trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time)
values('tasks_warehouse', 'raw_tasks', 'tasks', 0, 0, 0, current_timestamp, current_timestamp);
insert into sym_trigger
(trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time)
values('plans_warehouse', 'raw_plans', 'plans', 0, 0, 0, current_timestamp, current_timestamp);
insert into sym_trigger
(trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time)
values('structures_warehouse', 'raw_structures', 'structures', 0, 0, 0, current_timestamp, current_timestamp);
insert into sym_trigger
(trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time)
values('jurisdictions_warehouse', 'raw_jurisdictions', 'jurisdictions', 0, 0, 0, current_timestamp, current_timestamp);
insert into sym_trigger
(trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time)
values('clients_warehouse', 'raw_clients', 'clients', 0, 0, 0, current_timestamp, current_timestamp);
insert into sym_trigger
(trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time)
values('organizations_warehouse', 'raw_organizations', 'organizations', 0, 0, 0, current_timestamp, current_timestamp);
insert into sym_trigger
(trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time)
values('providers_warehouse', 'raw_providers', 'providers', 0, 0, 0, current_timestamp, current_timestamp);
------------------------------------------------------------------------------
-- Routers
------------------------------------------------------------------------------
-- Default router sends all data from warehouse to target
insert into sym_router
(router_id,source_node_group_id,target_node_group_id,router_type,create_time,last_update_time)
values('warehouse_to_target', 'warehouse', 'target', 'default', current_timestamp, current_timestamp);
------------------------------------------------------------------------------
-- Trigger Routers
------------------------------------------------------------------------------
-- Send all items to all targets
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('events', 'warehouse_to_target', 100, current_timestamp, current_timestamp);
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('tasks', 'warehouse_to_target', 100, current_timestamp, current_timestamp);
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('plans', 'warehouse_to_target', 100, current_timestamp, current_timestamp);
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('structures', 'warehouse_to_target', 100, current_timestamp, current_timestamp);
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('jurisdictions', 'warehouse_to_target', 100, current_timestamp, current_timestamp);
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('clients', 'warehouse_to_target', 100, current_timestamp, current_timestamp);
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('organizations', 'warehouse_to_target', 100, current_timestamp, current_timestamp);
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('providers', 'warehouse_to_target', 100, current_timestamp, current_timestamp);
-- Send all items to target during initial load
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('events_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp);
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('tasks_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp);
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('plans_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp);
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('structures_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp);
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('jurisdictions_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp);
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('clients_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp);
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('organizations_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp);
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order,last_update_time,create_time)
values('providers_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp);
-- Add conflict
INSERT INTO sym_conflict (conflict_id, source_node_group_id, target_node_group_id, detect_type, resolve_changes_only, create_time,last_update_time,resolve_type,ping_back)
VALUES('plans_conflict', 'warehouse', 'target', 'USE_PK_DATA',0,NOW(),NOW(), 'FALLBACK', 'OFF');
This site is no longer maintained. Please visit docs.opensrp.io for current documentation.