Writing a Workflow File

To use spockctrl to make an automated change to your cluster, you need to describe the changes in a workflow file. A workflow file describes the changes you'll be making to your cluster in a step-by-step, node-by-node manner, and needs to be customized for your cluster.

⚠️

The new node should not be accessible to users while adding a node with a workflow.

Do not modify your cluster's DDL during node addition.

All nodes in your cluster must be available for the duration of the addition.

If the workflow fails, don't invoke the workflow again until you ensure that all artifacts created by previous run of the workflow have been removed!

Sample workflows are available in the Spock extension's Github repository for common activities; the samples can help you:

  • add_node.json - Add a node to a cluster.
  • remove_node.json - Remove a node from a cluster.
  • cross-wire.json - Configure subscriptions and replication artifacts between empty nodes.
  • uncross-wire.json - Remove subscriptions and replication artifacts from empty nodes.

To execute a workflow, include the -w (or --workflow=<path_to_workflow_file>) command-line option when you invoke spockctrl, followed by the path to the workflow JSON file.

spockctrl --config=/path/to/my/spockctrl.json --workflow=/path/to/my/workflow.json

or

spockctrl -c path/to/my/spockctrl.json -w path/to/my/workflow.json

Example - Workflow to Add a Node to a Two-Node Cluster

In this example, we'll walk through the stanzas that make up a workflow that adds a new node to a two-node cluster. Within the workflow file, the COMMAND property identifies the action performed by the stanza in which it is used. Spock 5.0 supports the following COMMANDs:

COMMANDDescription
CREATE NODEAdd a node to a cluster.
DROP NODEDrop a node from a cluster.
CREATE SUBSCRIPTIONAdd a subscription to a cluster.
DROP SUBSCRIPTIONDrop a subscription from a cluster.
CREATE REPSETAdd a repset to a cluster.
DROP REPSETDrop a node to a cluster.
CREATE SLOTAdd a replication slot.
DROP SLOTDrop a replication slot.
ENABLE SUBSCRIPTIONStart replication on a node.
DISABLE SUBSCRIPTIONStop replication on a node.
SQLInvoke the specified Postgres SQL command.

In this walkthrough, we're using a two-node cluster; if your cluster is larger than two nodes, any actions performed on the replica node (in our example, n2) should be performed on every replica node in your cluster. A replica node is any existing node that is not used as a source node.

Our sample workflow adds a third node to a new node cluster. The first stanza provides connection information for the host of the new node. Spockctrl can add only one node per workflow file; provide this information for each new node you add to your cluster.

This stanza associates the name of the new node with the connection properties of the new node:

  • Provide the new node name in the node property and the --node_name property - the name must be identical.
  • --dsn specifies the connection properties of the new node.
{
  "workflow_name": "Add Node",
  "description": "Adding third node (n3) to two node (n1,n2) cluster.",
  "steps": [
    {
      "spock": {
        "node": "n3",
        "command": "CREATE NODE",
        "description": "Create a spock node n3",
        "args": [
          "--node_name=n3",
          "--dsn=host=127.0.0.1 port=5433 user=pgedge password=pgedge",
          "--location=Los Angeles",
          "--country=USA",
          "--info={\"key\": \"value\"}"
        ],
        "on_success": {},
        "on_failure": {}
      }
    },

The next stanza creates the subscription from the new node (n3) to the source node (n1). In this stanza:

  • the node property specifies the name of the node in our original cluster that is used as our source node. The content of this node will be copied to the new node that we are creating.
  • --sub_name specifies the name of the new subscription.
  • --provider_dsn specifies the connection properties of the provider (our new node).
  • --replication_sets specifies the names of the replication sets created for the subscription.
  • --enabled, --synchronize_data and --synchronize_structure must be true.
    {
      "spock": {
        "node": "n1",
        "command": "CREATE SUBSCRIPTION",
        "description": "Create a subscription (sub_n3_n1) on (n1) for n3->n1",
        "sleep": 0,
        "args": [
          "--sub_name=sub_n3_n1",
          "--provider_dsn=host=127.0.0.1 port=5433 user=pgedge password=spockpass",
          "--replication_sets=ARRAY['default', 'default_insert_only', 'ddl_sql']",
          "--synchronize_structure=true",
          "--synchronize_data=true",
          "--forward_origins='{}'::text[]",
          "--apply_delay='0'::interval",
          "--force_text_transfer=false",
          "--enabled=true"
        ],
        "on_success": {},
        "on_failure": {}
      }
    },

Our next stanza creates subscriptions between the new node and any existing replica nodes.

  • node specifies the name of the existing replica node.
  • --provider_dsn specifies the connection properties of the new node; this is the provider node for the new subscription.
  • --replication_sets specifies the names of the replication sets created for the subscription
  • --enabled, --synchronize_data and --synchronize_structure must be true.

You will need to include this stanza once for each replica node in your cluster; if your existing cluster has three nodes (one source node and two replica nodes), you will add two copies of this stanza. If your existing cluster has four nodes (one source node and three replica nodes), you will add three copies of this stanza.

    {
      "spock": {
        "node": "n2",
        "command": "CREATE SUBSCRIPTION",
        "description": "Create a subscription (sub_n3_n2) on (n2) for n3->n2",
        "sleep": 0,
        "args": [
          "--sub_name=sub_n3_n2",
          "--provider_dsn=host=127.0.0.1 port=5433 user=pgedge password=spockpass",
          "--replication_sets=ARRAY['default', 'default_insert_only', 'ddl_sql']",
          "--synchronize_structure=true",
          "--synchronize_data=true",
          "--forward_origins='{}'::text[]",
          "--apply_delay='0'::interval",
          "--force_text_transfer=false",
          "--enabled=true"
        ],
        "ignore_errors": false,
        "on_success": {},
        "on_failure": {}
      }
    },

In the next step, we wait for the apply worker to check state of the subscription. This step is optional, but should be considered a best practice. This step uses a SQL command to call the spock.wait_for_apply_worker function.

  • $n2.sub_create is a variable (the subscription ID) populated by the previous CREATE SUBSCRIPTION stanza.
  • If needed, use the sleep property to accomodate processing time.

Note that if you have more than one replica node, and multiple CREATE SUBSCRIPTION stanzas, each stanza should be followed by a copy of this stanza, with the variable reset for each subsequent execution ($n3.sub_create, $n4.sub_create, etc).

    {
      "sql": {
        "node": "n2",
        "command": "SQL",
        "description": "Wait for apply worker on n2 subscription",
        "sleep": 0,
        "args": [
          "--sql=SELECT spock.wait_for_apply_worker($n2.sub_create, 1000);"
        ],
        "on_success": {},
        "on_failure": {}
      }
    },

In our next stanza, we create a subscription between each replica node (n2) and our new node (n3).

  • --provider_dsn is the connection string of the replica node (in our example, the provider is the first node referenced in our subscription).
  • --enabled, --synchronize_data, --force_text_transfer and --synchronize_structure must be false.

If you have multiple replica nodes, you will need one iteration of this stanza for each replica node in your cluster.

    {
      "spock": {
        "node": "n3",
        "command": "CREATE SUBSCRIPTION",
        "description": "Create a subscription (sub_n2_n3) on (n3) for n2->n3",
        "sleep": 5,
        "args": [
          "--sub_name=sub_n2_n3",
          "--provider_dsn=host=127.0.0.1 port=5432 user=pgedge password=spockpass",
          "--replication_sets=ARRAY['default', 'default_insert_only', 'ddl_sql']",
          "--synchronize_structure=false",
          "--synchronize_data=false",
          "--forward_origins='{}'::text[]",
          "--apply_delay='0'::interval",
          "--force_text_transfer=false",
          "--enabled=false"
        ],
        "on_success": {},
        "on_failure": {}
      }
    },

In the next stanza, we use a CREATE SLOT command to create a replication slot for the new subscription between our existing replica (n2) and our new node (n3). Provide the slot name in the form spk_database-name_node-name_subscription-name where:

  • database-name is the name of your database.
  • node-name name of the existing replica node.
  • subscription-name is the subscription created in the previous step.

You must create one replication slot for each iteration of the CREATE SUBSCRIPTION stanza that allows a replica node to communicate with the new node; if you have three nodes in your cluster (one source node, and two replica nodes), you will need to provide one slot for each replica node (or two slots total).

    {
      "spock": {
        "node": "n2",
        "command": "CREATE SLOT",
        "description": "Create a logical replication slot spk_pgedge_n2_sub_n2_n3 on (n2)",
        "args": [
          "--slot=spk_pgedge_n2_sub_n2_n3",
          "--plugin=spock_output"
        ],
        "on_success": {},
        "on_failure": {}
      }
    },

Next, on each replica node, we invoke a SQL command that starts a sync event between the existing node(n2) and our source node (n1). This ensures that our replica nodes stay in sync with the source node for the duration of the ADD NODE process.

The function (spock.sync_event) returns the log sequence number (LSN) of the sync event:

    {
      "sql": {
        "node": "n2",
        "command": "SQL",
        "description": "Trigger a sync event on (n2)",
        "sleep": 10,
        "args": [
          "--sql=SELECT spock.sync_event();"
        ],
        "on_success": {},
        "on_failure": {}
      }
    },

The previous stanza returns the LSN of the sync event in the $n2.sync_event variable; in the next stanza, we watch for that variable so we know when the step is complete.

  • node specifies the cluster source node (in our example, n1).
  • $n2.sync_event is the LSN returned by the previous stanza.
  • If needed, you can use sleep to provide extra processing time for the step.

Include this stanza once for each iteration of the previous stanza within your workflow file.

    {
      "sql": {
        "node": "n1",
        "command": "SQL",
        "description": "Wait for a sync event on (n1) for n2-n1",
        "sleep": 0,
        "args": [
          "--sql=CALL spock.wait_for_sync_event(true, 'n2', '$n2.sync_event'::pg_lsn, 1200000);"
        ],
        "on_success": {},
        "on_failure": {}
      }
    },

In the next stanza, we create a subscription on our new target node (n3) from the source node (n1):

  • arg->--sub_name is sub_n1_n3.
  • arg->--provider_dsn is the connection string for the source node (our provider, n1).
  • --enabled, --synchronize_data, and --synchronize_structure must be true.
  • --force_text_transfer must be false.
    {
      "spock": {
        "node": "n3",
        "command": "CREATE SUBSCRIPTION",
        "description": "Create a subscription (sub_n1_n3) for n1 fpr n1->n3",
        "sleep": 0,
        "args": [
          "--sub_name=sub_n1_n3",
          "--provider_dsn=host=127.0.0.1 port=5431 user=pgedge password=spockpass",
          "--replication_sets=ARRAY['default', 'default_insert_only', 'ddl_sql']",
          "--synchronize_structure=true",
          "--synchronize_data=true",
          "--forward_origins='{}'::text[]",
          "--apply_delay='0'::interval",
          "--force_text_transfer=false",
          "--enabled=true"
        ],
        "on_success": {},
        "on_failure": {}
      }
    },

Then, we include a stanza that triggers a sync event on the source node (n1) between the source node and the new node (n3). Use the sleep property to allocate time for the data to sync to the new node if needed.

    {
      "sql": {
        "node": "n1",
        "command": "SQL",
        "description": "Trigger a sync event on (n1)",
        "sleep": 5,
        "args": [
          "--sql=SELECT spock.sync_event();"
        ],
        "on_success": {},
        "on_failure": {}
      }
    },

In the next stanza, we wait for the sync event started in the previous stanza to complete. Use the sleep property to allocate time for the data to sync to the new node if needed.

    {
      "sql": {
        "node": "n3",
        "command": "SQL",
        "description": "Wait for a sync event on (n1) for n1-n3",
        "sleep": 10,
        "args": [
          "--sql=CALL spock.wait_for_sync_event(true, 'n1', '$n1.sync_event'::pg_lsn, 1200000);"
        ],
        "on_success": {},
        "on_failure": {}
      }
    },

In the next stanza, we check for data lag between the new node (n3) and any replica nodes in our cluster (n2). The timestamp returned is passed to the next stanza for use in evaluating the comparative state of the replica nodes and our new node.

    {
      "sql": {
        "node": "n3",
        "command": "SQL",
        "description": "Check commit timestamp for n3 lag",
        "sleep": 1,
        "args": [
          "--sql=SELECT commit_timestamp FROM spock.lag_tracker WHERE origin_name = 'n2' AND receiver_name = 'n3'"
        ],
        "on_success": {},
        "on_failure": {}
      }
    },

In the next stanza, we use the timestamp from the previous stanza ($n3.commit_timestamp) to advance our replica slot to that location within our log files. This effectively advances transactions to the time specified (preventing duplicate entries from being written to the replica node).

    {
      "sql": {
        "node": "n2",
        "command": "SQL",
        "description": "Advance the replication slot for n2->n3 based on a specific commit timestamp",
        "sleep": 0,
        "args": [
          "--sql=WITH lsn_cte AS (SELECT spock.get_lsn_from_commit_ts('spk_pgedge_n2_sub_n2_n3', '$n3.commit_timestamp'::timestamp) AS lsn) SELECT pg_replication_slot_advance('spk_pgedge_n2_sub_n2_n3', lsn) FROM lsn_cte;"
        ],
        "on_success": {},
        "on_failure": {}
      }
    },

Then, we enable the subscription from each replica node to the new node. At this point replication starts between any existing node and the new node (n3).

    {
      "spock": {
        "node": "n3",
        "command": "ENABLE SUBSCRIPTION",
        "description": "Enable subscription (sub_n2_n3) on n3",
        "args": [
          "--sub_name=sub_n2_n3",
          "--immediate=true"
        ],
        "on_success": {},
        "on_failure": {}
      }
    },

After starting replication, we check the lag time between the new node and each node in the cluster. This step invokes a SQL command that loops through each node in the cluster and compares the lag on each node until each returned value is comparable. Use the sleep property to extend processing time if needed.

    {
      "sql": {
        "node": "n3",
        "command": "SQL",
        "description": "Advance the replication slot for n2->n3 based on a specific commit timestamp",
        "sleep": 0,
        "args": [
          "--sql=DO $$\nDECLARE\n    lag_n1_n3 interval;\n    lag_n2_n3 interval;\nBEGIN\n    LOOP\n        SELECT now() - commit_timestamp INTO lag_n1_n3\n        FROM spock.lag_tracker\n        WHERE origin_name = 'n1' AND receiver_name = 'n3';\n\n        SELECT now() - commit_timestamp INTO lag_n2_n3\n        FROM spock.lag_tracker\n        WHERE origin_name = 'n2' AND receiver_name = 'n3';\n\n        RAISE NOTICE 'n1 → n3 lag: %, n2 → n3 lag: %',\n                     COALESCE(lag_n1_n3::text, 'NULL'),\n                     COALESCE(lag_n2_n3::text, 'NULL');\n\n        EXIT WHEN lag_n1_n3 IS NOT NULL AND lag_n2_n3 IS NOT NULL\n                  AND extract(epoch FROM lag_n1_n3) < 59\n                  AND extract(epoch FROM lag_n2_n3) < 59;\n\n        PERFORM pg_sleep(1);\n    END LOOP;\nEND\n$$;\n"
        ],
        "on_success": {},
        "on_failure": {}
      }
    }
  ]
}

The SQL command from the last step (in a more readable format) is:

  DO $$
    DECLARE
      lag_n1_n3 interval;
      lag_n2_n3 interval;
      BEGIN
        LOOP
          SELECT now() - commit_timestamp INTO lag_n1_n3
            FROM spock.lag_tracker
            WHERE origin_name = 'n1' AND receiver_name = 'n3';
                                        
          SELECT now() - commit_timestamp INTO lag_n2_n3
            FROM spock.lag_tracker
            WHERE origin_name = 'n2' AND receiver_name = 'n3';
                                                        
          RAISE NOTICE 'n1 -> n3 lag: %, n2 -> n3 lag: %',
            COALESCE(lag_n1_n3::text, 'NULL'),
            COALESCE(lag_n2_n3::text, 'NULL');
                                                                                                  
          EXIT WHEN lag_n1_n3 IS NOT NULL AND lag_n2_n3 IS NOT NULL
            AND extract(epoch FROM lag_n1_n3) < 59
            AND extract(epoch FROM lag_n2_n3) < 59;
                                                                                                                                      
          PERFORM pg_sleep(1);
          END LOOP;
        END
    $$;