Pyspark : Maintaining a history data set in SCD2 (Slowly Changing Dimension 2) model

Maintaining a history table is always a common scenario with respect to any data engineering or data warehousing project. There are numerous ways of modelling a history data set. Below is an SCD2 model implementation of history data set in pyspark.

There will be a source , say SRC, data set with daily delta records (daily changes). And history , say HIST , with active and expired records history


  •  Any record will have START_DT,END_DT (ACIVE_FLAG if need , but I usually ignore it since END_DT is sufficient from my persepective)
  •  END_DT='9999-12-31' (or null or any higher default date value) represents active records and END_DT=<past_date value> represents expired records
  • START_DT gives the starting date of that record
  • HIST dataset is partitioned on END_DT (since every day , we are interested only in END_DT='9999-12-31' records. So other partitions will not be touched at all)

In SRC , we have :

             a. New records 

             b. Update to existing active records

             c. Deleted records(soft delete with a flag, say DLT_FLG='Y'):

                           Note : This scenario can vary since for some requirements we may need to expire existing active records for these deleted records, but we won't be adding this as a new active record. But in some scenarios , we may need to add this as a new record after expiring the existing active record. I am currently choosing the former option for now.

1.      New records :

o   Records which are available in SRC , but not available in HIST with filter END_DT='9999-12-31'

o   SRC left join HIST on(SRC.key=HIST.key and HIST.END_DT='9999-12-31') where HIST.key is null ---> left_anti join

o   Select required SRC columns and define START_DT=<today> and END_DT='9999-12-31' (and define any other functional columns if needed wrt your requirement)

o   Perfomance check : 

o   We are filtering out HIST with HIST.END_DT='9999-12-31', other partitions will not be read by spark

o   Since we are only using key columns from HIST , and HIST is a right data set in this left join, this will qualify for a broadcast hah join.

2.      Update to existing active records:

·        Records which are available in SRC(with DLT_FLG<>'Y') and  HIST (with filter END_DT='9999-12-31') , but record attributes are changed (Basically delta records).Here two things are needed

a.      Add updated records from source as new active records.

·        SRC inner join HIST on(SRC.key=HIST.key and SRC.DLT_FLG<>'Y' and HIST.END_DT='9999-12-31')

·        Select required SRC columns and define START_DT=<today> and END_DT='9999-12-31'(and define any other functional columns if needed wrt your requirement)

·        Perfomance check : 

o   We are filtering out HIST with HIST.END_DT='9999-12-31', other partitions will not be read by spark

o   Since we are only using key columns from HIST and its an inner join, this will qualify for a broadcast hash join.

b.      Identify existing records corresponding to these records and expire it.

·        SRC inner join HIST on(SRC.key=HIST.key and SRC.DLT_FLG<>'Y' and HIST.END_DT='9999-12-31')

·        Select required HIST columns and define END_DT=<today -1>

·        Perfomance check :

o   We are filtering out HIST with HIST.END_DT='9999-12-31', other partitions will not be read by spark

o   Since we are only using key columns from SRC and  its an inner join, this will qualify for a broadcast hash join.

3.      Deleted records:

·        Records which are available in SRC and HIST with  SRC.DLT_FLG='Y'. As discussed before, I am just considering that we need expire the existing records and no need to add any new antry for this.

a.      Identify existing records correponding to these records and expire it:

·        SRC inner join HIST on(SRC.key=HIST.key and SRC.DLT_FLG='Y' and HIST.END_DT='9999-12-31')

·        Select required HIST columns and define END_DT=<today -1>

·        Perfomance check :

o   <same as 4b>


Note : If you see,  2b and 3a can be merged to one step since both are eventually doing the same thing. For explaining purpose, I am keeping it as two.


4.      Retain Unchanged records:

·        Active records which are available in HIST for which no changes are happened:

a.      If the record is in SRC means, either its new/updated/soft-deleted. So If a record is not in SRC and available in HIST(with END_DT='9999-12-31') means its an unchanged record. 

·        HIST left join SRC on(SRC.key=HIST.key and HIST.END_DT='9999-12-31') where SRC.key is null ---> left_anti join

·        Select all fields from HIST and keep as it is 

·        Perfomance check :

o   <same as step 1 performance check>


5.      If we see we are having 5 dataframes here 

·        Dataframes:

§  newRecDf --> output of step 1

§  existingRecUpdtDf --> output of step 2a

§  existingRecExpiredDf --> output of step 2b

§  deletedRecDf --> output of step 3

§  UnchangedRecDf --> output of step 4

·        Merging all these dataframes will result in final dataframe to be loaded:

§  finalDf=functools.reduce(DataFrame.unionByName,[newRecDf,existingRecUpdtDf,existingRecExpiredDf,deletedRecDf,UnchangedRecDf])

6.      Write this dataframe on END_DT partition 

·        This will update existing active record partition as well as add new expired date partition.         

§  finalDf.write.partitionBy("END_DT").parquet("<target_path>")



If you analyze the execution plan , all the sub dataframes (mentioned in step 5) are created in separate stages in spark, which  will be running in parallel (since they don't have a dependency to each other ). Also almost all joins are broadcast hash joins. And last but not the least, we never read the expired partitions from history data set at any time.





