Data Warehouse

Building a Type 2 Slowly Changing Dimension in Snowflake Using Streams and Tasks: Part 1

Building a Type 2 Slowly Changing Dimension in Snowflake Using Streams and Tasks: Part 1

Introduction

This is Part 1 of a two-part post that explains how to build a Type 2 Slowly Changing Dimension (SCD) using Snowflake’s Stream functionality. The second part will explain how to automate the process using Snowflake’s Task functionality.

SCDs are a common database modeling technique used to capture data in a table and show how it changes over time. Although SCDs are most commonly associated with dimensions, you can apply the SCD methodology described in this blog to any table in a database.

Conceptually, building an SCD is straightforward but with traditional databases, implementing an SCD can be difficult. For example, configuring triggers can be difficult and resource intensive, and some databases other than Snowflake force you to mine proprietary database logs with expensive proprietary tools.

If using triggers or mining logs is not possible then running full table comparisons is another option.  This can be prohibitively expensive as it requires full tables scans of multiple tables and isn’t always a practical solution.

Building an SCD in Snowflake is extremely easy using the Streams and Tasks functionalities that Snowflake recently announced at Snowflake Summit.

Streams and Tasks

A stream is a new Snowflake object type that provides change data capture (CDC) capabilities to track the delta of changes in a table, including inserts and data manipulation language (DML) changes, so action can be taken using the changed data. A table stream allows you to query a table and consume a set of changes to a table, at the row level, between two transactional points in time.

A task is a new Snowflake object type that defines a recurring schedule for executing SQL statements, including statements that call stored procedures. You can chain tasks together for successive execution to support more-complex, periodic processing.

In a continuous data pipeline, tasks may optionally use streams to provide a convenient way to continuously process new or changed data. A task can verify whether a stream contains changed data for a table and either consume the changed data or skip the current run if no changed data exists.

Setting Up a Stream

In the following example, I show all the code required to create a Type 2 SCD in Snowflake, and I provide an explanation of what each step does.

You must use a role that has the ability to create databases, streams, and tasks. I used the SYSADMIN role.

use role sysadmin;

Now, set up a database and schema to work in:

create database streams_and_tasks;
use database streams_and_tasks;
create schema scd;
use schema scd;

For this post, you’ll create a table called NATION. This table could be changed as part of an ETL process. Changes could include data being inserted, updated, or deleted. The NATION table always has the current view of the data and the update_timestamp field is updated for every row change.

create or replace table nation (
   n_nationkey number,
   n_name varchar(25),
   n_regionkey number,
   n_comment varchar(152),
   country_code varchar(2),
   update_timestamp timestamp_ntz);

The NATION_HISTORY table will keep a history of changes made to the NATION table. Each record has a start_time field and an end_time field indicating when the record was valid. In addition, each record has a current_flag field indicating if the record is the current record.

The NATION_HISTORY table will get loaded based on changes to the NATION table.

create or replace table nation_history (
   n_nationkey number,
   n_name varchar(25),
   n_regionkey number,
   n_comment varchar(152),
   country_code varchar(2),
   start_time timestamp_ntz,
   end_time timestamp_ntz,
   current_flag int);

Next, create a stream called NATION_TABLE_CHANGES on the NATION table. Creating a stream essentially turns on CDC for the table. Data changes in the NATION table will then be available for further processing using this stream.

create or replace stream nation_table_changes on table nation;

You can view streams by running the following command:

show streams;

After running the command, the GUI shows details for each existing stream, such as the stream name, the database name, the schema name, the owner, and the table name. The following screenshot shows details for the NATION_TABLE_CHANGES stream.

You can query the data in the stream. At this point, the stream should be empty because you haven’t changed any data in the NATION table. Notice in the screenshot above that the columns in the stream are the same as the columns in the source NATION table the stream was created on.  

Additionally, there are three new columns you can use to find out what type of DML operations changed data in a source table: METADATA$ACTION, METADATA$ISUPDATE, and METADATA$ROW_ID.

You can find column definitions here.

The below statement queries the NATION_TABLE_CHANGES stream.

select * from nation_table_changes;

There have been no data changes in the NATION table so the stream is empty.  Notice the three METADATA columns at the end of the table.

Prior to building out the NATION_HISTORY table, let’s first look at how those three stream columns change depending on which DML operation—insert, update, or delete—you run on the NATION table. The differences shown in the three columns enable you to build the logic required to load the NATION_HISTORY table.

Let’s look at what those three stream columns contain when you insert a record, update that record, and delete that record.

An insert operation generates a single row in the stream. Based on the METADATA$ACTION and METADATA$ISUPDATE column values shown below, you can see a record was inserted and no update took place.

An update operation generates two rows in the stream: the METADATA$ACTION column shows two entries (INSERT and DELETE), and the two entries in the METADATA$ISUPDATE column are both TRUE. Notice the entry in the METADATA$ROW_ID column is the same for both rows.

You need to concern yourself with only one of those records, so let’s use the one where the METADATA$ACTION entry is INSERT and the METADATA$ISUPDATE entry is TRUE.

A delete operation generates a single row in the stream. The entry in the METADATA$ACTION column is DELETE, similar to what you saw with the INSERT statement above, but the entry in the METADATA$ISUPDATE column is FALSE. So you know the record was deleted, not updated.

Now that you know what happens for each type of DML operation, you can use this information to write a SQL statement to load the NATION_HISTORY table with data that reflects changes made to the NATION table.

You’ll also need to use a MERGE statement, because you need to perform an insert or update operation, or both, in the NATION_HISTORY table every time rows in the NATION table change.

The MERGE statement syntax looks like the following figure. (I’ll show the code a bit later but I want to show the structure first.)  

Notice for each DML type, a different action takes place.

In the using section, the NATION_CHANGE_DATA view handles the logic to figure out what needs to be loaded into the NATION_HISTORY table. It relies on the data in the NATION_TABLE_CHANGES stream.

Run the CREATE statement shown below. It looks like a lot of SQL code, but it simply generates data to load into the NATION_HISTORY table.

create or replace view nation_change_data as
-- This subquery figures out what to do when data is inserted into the NATION table
-- An insert to the NATION table results in an INSERT to the NATION_HISTORY table
select n_nationkey, n_name, n_regionkey, n_comment,
country_code, start_time, end_time, current_flag, 'I' as dml_type
from (select n_nationkey, n_name, n_regionkey, n_comment, country_code,
            update_timestamp as start_time,
            lag(update_timestamp) over (partition by n_nationkey order by update_timestamp desc) as end_time_raw,
            case when end_time_raw is null then '9999-12-31'::timestamp_ntz else end_time_raw end as end_time,
            case when end_time_raw is null then 1 else 0 end as current_flag
     from (select n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp
           from nation_table_changes
           where metadata$action = 'INSERT'
           and metadata$isupdate = 'FALSE'))
union
-- This subquery figures out what to do when data is updated in the NATION table
-- An update to the NATION table results in an update AND an insert to the NATION_HISTORY table
-- The subquery below generates two records, each with a different dml_type
select n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag, dml_type
from (select n_nationkey, n_name, n_regionkey, n_comment, country_code,
            update_timestamp as start_time,
            lag(update_timestamp) over (partition by n_nationkey order by update_timestamp desc) as end_time_raw,
            case when end_time_raw is null then '9999-12-31'::timestamp_ntz else end_time_raw end as end_time,
            case when end_time_raw is null then 1 else 0 end as current_flag,
dml_type
     from (-- Identify data to insert into nation_history table
           select n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp, 'I' as dml_type
           from nation_table_changes
           where metadata$action = 'INSERT'
           and metadata$isupdate = 'TRUE'
           union
           -- Identify data in NATION_HISTORY table that needs to be updated
           select n_nationkey, null, null, null, null, start_time, 'U' as dml_type
           from nation_history
           where n_nationkey in (select distinct n_nationkey
                                 from nation_table_changes
                                 where metadata$action = 'INSERT'
                                 and metadata$isupdate = 'TRUE')
    and current_flag = 1))
union
-- This subquery figures out what to do when data is deleted from the NATION table
-- A deletion from the NATION table results in an update to the NATION_HISTORY table
select nms.n_nationkey, null, null, null, null, nh.start_time, current_timestamp()::timestamp_ntz, null, 'D'
from nation_history nh
inner join nation_table_changes nms
  on nh.n_nationkey = nms.n_nationkey
where nms.metadata$action = 'DELETE'
and   nms.metadata$isupdate = 'FALSE'
and   nh.current_flag = 1;

Now run the following SQL statement, which should execute without errors but return no data because the stream hasn’t captured anything yet.

select * from nation_change_data;

Now that you have created the NATION_CHANGE_DATA view, you can include it in the MERGE statement that was shown previously:

merge into nation_history nh -- Target table to merge changes from NATION into
using nation_change_data m -- nation_change_data is a view that holds the logic that determines what to insert/update into the NATION_HISTORY table.
on  nh.n_nationkey = m.n_nationkey -- n_nationkey and start_time determine whether there is a unique record in the NATION_HISTORY table
and nh.start_time = m.start_time
when matched and m.dml_type = 'U' then update -- Indicates the record has been updated and is no longer current and the end_time needs to be stamped
set nh.end_time = m.end_time,
nh.current_flag = 0
when matched and m.dml_type = 'D' then update -- Deletes are essentially logical deletes. The record is stamped and no newer version is inserted
set nh.end_time = m.end_time,
nh.current_flag = 0
when not matched and m.dml_type = 'I' then insert -- Inserting a new n_nationkey and updating an existing one both result in an insert
(n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag)
values (m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment, m.country_code, m.start_time, m.end_time, m.current_flag);

Now execute the MERGE statement. It should run but not load any data into the NATION_HISTORY table because there is no data in the stream.

Everything you need to build a Type II SCD is now in place. So let’s start inserting, updating, and deleting data to see how that works.

Inserting Data into a Table

To start, let’s insert 25 rows of data into the NATION table. The following example sets a variable ($update_timestamp) equal to the current timestamp and references that variable in the INSERT statements. However, you can choose how to set the timestamp. Perhaps it comes from a source system or is set by an ETL tool loading the table.

I also insert the data using a transaction but I’m using the begin; … end; syntax. All told, you should see 28 SQL statements execute when you execute the following code:

set update_timestamp = current_timestamp()::timestamp_ntz;
begin;
insert into nation values(0,'ALGERIA',0,' haggle. carefully final deposits detect slyly agai','DZ',$update_timestamp);
insert into nation values(1,'ARGENTINA',1,'al foxes promise slyly according to the regular accounts. bold requests alon','AR',$update_timestamp);
insert into nation values(2,'BRAZIL',1,'y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special ','BR',$update_timestamp);
insert into nation values(3,'CANADA',1,'eas hang ironic silent packages. slyly regular packages are furiously over the tithes. fluffily bold','CA',$update_timestamp);
insert into nation values(4,'EGYPT',4,'y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d','EG',$update_timestamp);
insert into nation values(5,'ETHIOPIA',0,'ven packages wake quickly. regu','ET',$update_timestamp);
insert into nation values(6,'FRANCE',3,'refully final requests. regular ironi','FR',$update_timestamp);
insert into nation values(7,'GERMANY',3,'l platelets. regular accounts x-ray: unusual regular acco','DE',$update_timestamp);
insert into nation values(8,'INDIA',2,'ss excuses cajole slyly across the packages. deposits print aroun','IN',$update_timestamp);
insert into nation values(9,'INDONESIA',2,' slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull','ID',$update_timestamp);
insert into nation values(10,'IRAN',4,'efully alongside of the slyly final dependencies. ','IR',$update_timestamp);
insert into nation values(11,'IRAQ',4,'nic deposits boost atop the quickly final requests? quickly regula','IQ',$update_timestamp);
insert into nation values(12,'JAPAN',2,'ously. final express gifts cajole a','JP',$update_timestamp);
insert into nation values(13,'JORDAN',4,'ic deposits are blithely about the carefully regular pa','JO',$update_timestamp);
insert into nation values(14,'KENYA',0,' pending excuses haggle furiously deposits. pending express pinto beans wake fluffily past t','KE',$update_timestamp);
insert into nation values(15,'MOROCCO',0,'rns. blithely bold courts among the closely regular packages use furiously bold platelets?','MA',$update_timestamp);
insert into nation values(16,'MOZAMBIQUE',0,'s. ironic unusual asymptotes wake blithely r','MZ',$update_timestamp);
insert into nation values(17,'PERU',1,'platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun','PE',$update_timestamp);
insert into nation values(18,'CHINA',2,'c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos','CN',$update_timestamp);
insert into nation values(19,'ROMANIA',3,'ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account','RO',$update_timestamp);
insert into nation values(20,'SAUDI ARABIA',4,'ts. silent requests haggle. closely express packages sleep across the blithely','SA',$update_timestamp);
insert into nation values(21,'VIETNAM',2,'hely enticingly express accounts. even final ','VN',$update_timestamp);
insert into nation values(22,'RUSSIA',3,' requests against the platelets use never according to the quickly regular pint','RU',$update_timestamp);
insert into nation values(23,'UNITED KINGDOM',3,'eans boost carefully special requests. accounts are. carefull','GB',$update_timestamp);
insert into nation values(24,'UNITED STATES',1,'y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be','US',$update_timestamp);
commit;

Now you can use commands to see data in both the NATION table and the NATION_TABLE_CHANGES stream you created.

First, run this command to see data in the NATION table:

select * from nation;

The following figure shows data in the NATION table:

Then run this command to see data in the NATION_TABLE_CHANGES stream:

select * from nation_table_changes;

The following figure shows data in the NATION_TABLE_CHANGES stream:

Notice the values in the METADATA columns, which indicate the changes were insert operations.

Now that the stream has data, execute the following MERGE statement to load data into the NATION_HISTORY table:


-- MERGE statement that uses the CHANGE_DATA view to load data
into the NATION_HISTORY table
merge into nation_history nh -- Target table to merge changes from NATION into
using nation_change_data m -- NATION_CHANGE_DATA is a view that holds the logic that determines what to insert/update into the NATION_HISTORY table.
on nh.n_nationkey = m.n_nationkey -- n_nationkey and start_time determine whether there is a unique record in the NATION_HISTORY table
and nh.start_time = m.start_time
when matched and m.dml_type = 'U' then update -- Indicates the record has been updated and is no longer current and the end_time needs to be stamped
set nh.end_time = m.end_time,
nh.current_flag = 0
when matched and m.dml_type = 'D' then update -- Deletes are essentially logical deletes. The record is stamped and no newer version is inserted
set nh.end_time = m.end_time,
nh.current_flag = 0
when not matched and m.dml_type = 'I' then insert -- Inserting a new n_nationkey and updating an existing one both result in an insert
(n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag)
values (m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment,m.country_code, m.start_time, m.end_time, m.current_flag);

In the following figure, you can see that 25 rows were inserted into the NATION_HISTORY table:

Now run the following command:

select * from nation_history;

Notice in the following figure that the START_TIME, END_TIME, and CURRENT_FLAG fields are now filled with data:

Because you executed a DML statement using the stream on the NATION table, the stream has been purged to ensure you don’t process the same changed data twice. To verify this, run the following command:

select * from nation_table_changes;

The following figure shows that the stream has been purged:

What’s Next?

In Part 2 of this post, you’ll see how to update data, delete data, and automate stream processing by using a task.

Cloud Data Engineering for Dummies

Share Article

Subscribe to our blog newsletter

Get the best, coolest and latest delivered to your inbox each week

Start your 30-DayFree Trial

Try Snowflake free for 30 days and experience the AI Data Cloud that helps eliminate the complexity, cost and constraints inherent with other solutions.