Database Replication

SymmetricDS

SymmetricDS is open source software for database and file synchronization, with support for multi-master replication, filtered synchronization, and transformation. It uses web and database technologies to replicate change data as a scheduled or near real-time operation, and it includes an initial load feature for full data loads. The software was designed to scale for a large number of nodes, work across low-bandwidth connections, and withstand periods of network outage.

  • Quick start tutorial can be found here

  • User guide can be found here

Biophics

For this project SymmetricDS is used for database replication. We have created tables in the canopy datawarehouse, which need to be replicated in the Biophics sql server database. The tables being replicated are:

  • raw_events

  • raw_tasks

  • raw_clients

  • raw_plans

  • raw_jurisdictions

  • raw_structures

  • raw_organizations

  • raw_providers

Setup

In order to setup symmetricDS for Biophics

  1. Download the SymmetricDS standalone ZIP file from here

  2. Unzip the file and place it in /opt/symmetricDS in the server (the nifi server for the case of Biophics)

  3. Copy the warehouse-000.properties and the target-001.properties files and place them in /opt/symmetricDS/engines in the server

  4. Create SymmetricDS tables with the command /bin/symadmin --engine warehouse-000 create-sym-tables

  5. Load the SymmetricDS configurations into the root node database from insert_configs.sql with /bin/dimport --engine warehouse-000 insert_configs.sql

  6. Install with bin/sym_service install

  7. bin/sym_service start to start service (and bin/sym_service stop to stop)

  8. To start initial load bin/symadmin --engine warehouse-000 reload-node 001

Files

warehouse-000.properties

# Friendly name to refer to this node from command line engine.name=warehouse-000 # The class name for the JDBC Driver db.driver=org.postgresql.Driver # The JDBC URL used to connect to the database db.url=jdbc:postgresql://postgres.reveal-th.smartregister.org/{{ database_name }} # The database user that SymmetricDS should use. db.user={{ database_username }} # The database password db.password={{ database_password }} # This node will contact the root node's sync.url to register itself. # Leave blank to indicate this is the root node. registration.url= # Sync URL where other nodes can contact this node to push/pull data or register. sync.url=http://localhost:31415/sync/warehouse-000 # Node group this node belongs to, which defines what it will sync with who. # Must match the sym_node_group configuration in database. group.id=warehouse # External ID for this node, which is any unique identifier you want to use. external.id=000 # If this is true, when symmetric starts up it will try to create the necessary tables. auto.config.database=true # Whether the pull job is enabled for this node. start.pull.job=false # Whether the push job is enabled for this node. start.push.job=true # Automatically register new nodes when they request it. auto.registration=true # If this is true, a reload is automatically sent to nodes when they register # auto.reload=true # How often to run purge job, job.purge.period.time.ms=7200000 # How to run routing (in millis), which puts changes into batches. job.routing.period.time.ms=5000 # How often to run push (in millis), which sends changes to other nodes. job.push.period.time.ms=10000 # How often to run pull (in millis), which receives changes from other nodes. job.pull.period.time.ms=10000 # When this node sends an initial load of data to another node, first send table create scripts. initial.load.create.first=true

 

target-001.properties

# Friendly name to refer to this node from command line engine.name=target-001 # The class name for the JDBC Driver db.driver=net.sourceforge.jtds.jdbc.Driver # The JDBC URL used to connect to the database db.url=jdbc:jtds:sqlserver://mssql.reveal-th.smartregister.org:1433/{{ database_name }};useCursors=true;bufferMaxMemory=10240;lobBuffer=5242880 # The database user that SymmetricDS should use. db.user={{ database_username }} # The database password db.password={{ database_password }} # This node will contact the root node's sync.url to register itself. registration.url=http://localhost:31415/sync/warehouse-000 # Node group this node belongs to, which defines what it will sync with who. # Must match the sym_node_group configuration in database. group.id=target # External ID for this node, which is any unique identifier you want to use. external.id=001 # If this is true, when symmetric starts up it will try to create the necessary tables. auto.config.database=true # Whether the pull job is enabled for this node. start.pull.job=true # Whether the push job is enabled for this node. start.push.job=false # Automatically register new nodes when they request it. auto.registration=true # How to run routing (in millis), which puts changes into batches. job.routing.period.time.ms=5000 # How often to run push (in millis), which sends changes to other nodes. job.push.period.time.ms=10000 # How often to run pull (in millis), which receives changes from other nodes. job.pull.period.time.ms=10000

 

insert_configs.sql

------------------------------------------------------------------------------ -- Clear and load SymmetricDS Configuration ------------------------------------------------------------------------------ delete from sym_trigger_router; delete from sym_trigger; delete from sym_router; delete from sym_channel; delete from sym_node_group_link; delete from sym_node_group; delete from sym_node_host; delete from sym_node_identity; delete from sym_node_security; delete from sym_node; ------------------------------------------------------------------------------ -- Channels ------------------------------------------------------------------------------ -- Channel "events" insert into sym_channel (channel_id, processing_order, max_batch_size, enabled, description) values('events', 1, 100000, 1, 'Events data'); -- Channel "tasks" insert into sym_channel (channel_id, processing_order, max_batch_size, enabled, description) values('tasks', 1, 100000, 1, 'Tasks data'); -- Channel "plans" insert into sym_channel (channel_id, processing_order, max_batch_size, enabled, description) values('plans', 1, 100000, 1, 'Plans data'); -- Channel "structures" insert into sym_channel (channel_id, processing_order, max_batch_size, enabled, description) values('structures', 1, 100000, 1, 'Structures data'); -- Channel "jurisdictions" insert into sym_channel (channel_id, processing_order, max_batch_size, enabled, description) values('jurisdictions', 1, 100000, 1, 'jurisdictions data'); -- Channel "clients" insert into sym_channel (channel_id, processing_order, max_batch_size, enabled, description) values('clients', 1, 100000, 1, 'Clients data'); -- Channel "organizations" insert into sym_channel (channel_id, processing_order, max_batch_size, enabled, description) values('organizations', 1, 100000, 1, 'Organizations data'); -- Channel "providers" insert into sym_channel (channel_id, processing_order, max_batch_size, enabled, description) values('providers', 1, 100000, 1, 'Providers data'); ------------------------------------------------------------------------------ -- Node Groups ------------------------------------------------------------------------------ insert into sym_node_group (node_group_id, description) values ('warehouse', 'The Ona data warehouse: postgres db'); insert into sym_node_group (node_group_id, description) values ('target', 'The Biophics server: sqlserver db'); ------------------------------------------------------------------------------ -- Node Group Links ------------------------------------------------------------------------------ -- Warehouse sends changes to target when target pulls from warehouse insert into sym_node_group_link (source_node_group_id, target_node_group_id, data_event_action) values ('warehouse', 'target', 'W'); ------------------------------------------------------------------------------ -- Triggers ------------------------------------------------------------------------------ -- Triggers for tables insert into sym_trigger (trigger_id,source_table_name,channel_id,last_update_time,create_time) values('events', 'raw_events', 'events', current_timestamp, current_timestamp); insert into sym_trigger (trigger_id,source_table_name,channel_id,last_update_time,create_time) values('tasks', 'raw_tasks', 'tasks', current_timestamp, current_timestamp); insert into sym_trigger (trigger_id,source_table_name,channel_id,last_update_time,create_time) values('plans', 'raw_plans', 'plans', current_timestamp, current_timestamp); insert into sym_trigger (trigger_id,source_table_name,channel_id,last_update_time,create_time) values('structures', 'raw_structures', 'structures', current_timestamp, current_timestamp); insert into sym_trigger (trigger_id,source_table_name,channel_id,last_update_time,create_time) values('jurisdictions', 'raw_jurisdictions', 'jurisdictions', current_timestamp, current_timestamp); insert into sym_trigger (trigger_id,source_table_name,channel_id,last_update_time,create_time) values('clients', 'raw_clients', 'clients', current_timestamp, current_timestamp); insert into sym_trigger (trigger_id,source_table_name,channel_id,last_update_time,create_time) values('organizations', 'raw_organizations', 'organizations', current_timestamp, current_timestamp); insert into sym_trigger (trigger_id,source_table_name,channel_id,last_update_time,create_time) values('providers', 'raw_providers', 'providers', current_timestamp, current_timestamp); -- Triggers with capture disabled, so they are used for initial load only insert into sym_trigger (trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time) values('events_warehouse', 'raw_events', 'events', 0, 0, 0, current_timestamp, current_timestamp); insert into sym_trigger (trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time) values('tasks_warehouse', 'raw_tasks', 'tasks', 0, 0, 0, current_timestamp, current_timestamp); insert into sym_trigger (trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time) values('plans_warehouse', 'raw_plans', 'plans', 0, 0, 0, current_timestamp, current_timestamp); insert into sym_trigger (trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time) values('structures_warehouse', 'raw_structures', 'structures', 0, 0, 0, current_timestamp, current_timestamp); insert into sym_trigger (trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time) values('jurisdictions_warehouse', 'raw_jurisdictions', 'jurisdictions', 0, 0, 0, current_timestamp, current_timestamp); insert into sym_trigger (trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time) values('clients_warehouse', 'raw_clients', 'clients', 0, 0, 0, current_timestamp, current_timestamp); insert into sym_trigger (trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time) values('organizations_warehouse', 'raw_organizations', 'organizations', 0, 0, 0, current_timestamp, current_timestamp); insert into sym_trigger (trigger_id,source_table_name,channel_id, sync_on_insert, sync_on_update, sync_on_delete,last_update_time,create_time) values('providers_warehouse', 'raw_providers', 'providers', 0, 0, 0, current_timestamp, current_timestamp); ------------------------------------------------------------------------------ -- Routers ------------------------------------------------------------------------------ -- Default router sends all data from warehouse to target insert into sym_router (router_id,source_node_group_id,target_node_group_id,router_type,create_time,last_update_time) values('warehouse_to_target', 'warehouse', 'target', 'default', current_timestamp, current_timestamp); ------------------------------------------------------------------------------ -- Trigger Routers ------------------------------------------------------------------------------ -- Send all items to all targets insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('events', 'warehouse_to_target', 100, current_timestamp, current_timestamp); insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('tasks', 'warehouse_to_target', 100, current_timestamp, current_timestamp); insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('plans', 'warehouse_to_target', 100, current_timestamp, current_timestamp); insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('structures', 'warehouse_to_target', 100, current_timestamp, current_timestamp); insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('jurisdictions', 'warehouse_to_target', 100, current_timestamp, current_timestamp); insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('clients', 'warehouse_to_target', 100, current_timestamp, current_timestamp); insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('organizations', 'warehouse_to_target', 100, current_timestamp, current_timestamp); insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('providers', 'warehouse_to_target', 100, current_timestamp, current_timestamp); -- Send all items to target during initial load insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('events_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp); insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('tasks_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp); insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('plans_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp); insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('structures_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp); insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('jurisdictions_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp); insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('clients_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp); insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('organizations_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp); insert into sym_trigger_router (trigger_id,router_id,initial_load_order,last_update_time,create_time) values('providers_warehouse', 'warehouse_to_target', 200, current_timestamp, current_timestamp); -- Add conflict INSERT INTO sym_conflict (conflict_id, source_node_group_id, target_node_group_id, detect_type, resolve_changes_only, create_time,last_update_time,resolve_type,ping_back) VALUES('plans_conflict', 'warehouse', 'target', 'USE_PK_DATA',0,NOW(),NOW(), 'FALLBACK', 'OFF');