Skip to main content

Pyspark : Maintaining a history data set in SCD2 (Slowly Changing Dimension 2) model

Maintaining a history table is always a common scenario with respect to any data engineering or data warehousing project. There are numerous ways of modelling a history data set. Below is an SCD2 model implementation of history data set in pyspark.

There will be a source , say SRC, data set with daily delta records (daily changes). And history , say HIST , with active and expired records history

In HIST:

  •  Any record will have START_DT,END_DT (ACIVE_FLAG if need , but I usually ignore it since END_DT is sufficient from my persepective)
  •  END_DT='9999-12-31' (or null or any higher default date value) represents active records and END_DT=<past_date value> represents expired records
  • START_DT gives the starting date of that record
  • HIST dataset is partitioned on END_DT (since every day , we are interested only in END_DT='9999-12-31' records. So other partitions will not be touched at all)

In SRC , we have :

             a. New records 

             b. Update to existing active records

             c. Deleted records(soft delete with a flag, say DLT_FLG='Y'):

                           Note : This scenario can vary since for some requirements we may need to expire existing active records for these deleted records, but we won't be adding this as a new active record. But in some scenarios , we may need to add this as a new record after expiring the existing active record. I am currently choosing the former option for now.

1.      New records :

o   Records which are available in SRC , but not available in HIST with filter END_DT='9999-12-31'

o   SRC left join HIST on(SRC.key=HIST.key and HIST.END_DT='9999-12-31') where HIST.key is null ---> left_anti join

o   Select required SRC columns and define START_DT=<today> and END_DT='9999-12-31' (and define any other functional columns if needed wrt your requirement)

o   Perfomance check : 

o   We are filtering out HIST with HIST.END_DT='9999-12-31', other partitions will not be read by spark

o   Since we are only using key columns from HIST , and HIST is a right data set in this left join, this will qualify for a broadcast hah join.

2.      Update to existing active records:

·        Records which are available in SRC(with DLT_FLG<>'Y') and  HIST (with filter END_DT='9999-12-31') , but record attributes are changed (Basically delta records).Here two things are needed

a.      Add updated records from source as new active records.

·        SRC inner join HIST on(SRC.key=HIST.key and SRC.DLT_FLG<>'Y' and HIST.END_DT='9999-12-31')

·        Select required SRC columns and define START_DT=<today> and END_DT='9999-12-31'(and define any other functional columns if needed wrt your requirement)

·        Perfomance check : 

o   We are filtering out HIST with HIST.END_DT='9999-12-31', other partitions will not be read by spark

o   Since we are only using key columns from HIST and its an inner join, this will qualify for a broadcast hash join.

b.      Identify existing records corresponding to these records and expire it.

·        SRC inner join HIST on(SRC.key=HIST.key and SRC.DLT_FLG<>'Y' and HIST.END_DT='9999-12-31')

·        Select required HIST columns and define END_DT=<today -1>

·        Perfomance check :

o   We are filtering out HIST with HIST.END_DT='9999-12-31', other partitions will not be read by spark

o   Since we are only using key columns from SRC and  its an inner join, this will qualify for a broadcast hash join.

3.      Deleted records:

·        Records which are available in SRC and HIST with  SRC.DLT_FLG='Y'. As discussed before, I am just considering that we need expire the existing records and no need to add any new antry for this.

a.      Identify existing records correponding to these records and expire it:

·        SRC inner join HIST on(SRC.key=HIST.key and SRC.DLT_FLG='Y' and HIST.END_DT='9999-12-31')

·        Select required HIST columns and define END_DT=<today -1>

·        Perfomance check :

o   <same as 4b>

 

Note : If you see,  2b and 3a can be merged to one step since both are eventually doing the same thing. For explaining purpose, I am keeping it as two.

 

4.      Retain Unchanged records:

·        Active records which are available in HIST for which no changes are happened:

a.      If the record is in SRC means, either its new/updated/soft-deleted. So If a record is not in SRC and available in HIST(with END_DT='9999-12-31') means its an unchanged record. 

·        HIST left join SRC on(SRC.key=HIST.key and HIST.END_DT='9999-12-31') where SRC.key is null ---> left_anti join

·        Select all fields from HIST and keep as it is 

·        Perfomance check :

o   <same as step 1 performance check>

 

5.      If we see we are having 5 dataframes here 

·        Dataframes:

§  newRecDf --> output of step 1

§  existingRecUpdtDf --> output of step 2a

§  existingRecExpiredDf --> output of step 2b

§  deletedRecDf --> output of step 3

§  UnchangedRecDf --> output of step 4

·        Merging all these dataframes will result in final dataframe to be loaded:

§  finalDf=functools.reduce(DataFrame.unionByName,[newRecDf,existingRecUpdtDf,existingRecExpiredDf,deletedRecDf,UnchangedRecDf])

6.      Write this dataframe on END_DT partition 

·        This will update existing active record partition as well as add new expired date partition.         

§  finalDf.write.partitionBy("END_DT").parquet("<target_path>")

 

Summary:

If you analyze the execution plan , all the sub dataframes (mentioned in step 5) are created in separate stages in spark, which  will be running in parallel (since they don't have a dependency to each other ). Also almost all joins are broadcast hash joins. And last but not the least, we never read the expired partitions from history data set at any time.

                          

 

 

Comments

  1. Big Data is the buzzword today, and the hype is not unwarranted. As companies come to realize that they have valuable information just sitting around, waiting to be analyzed, their anxiety about how to get started can be overwhelming. To provide some relief, here are 5 steps to take when you find yourself facing a big data analytics project.

    ReplyDelete
  2. Post is really supportive to all of us. Eager that these kind of information you post in future also. Otherwise if any One Want Experience Certificate for Fill your Career Gap So Contact Us-9599119376 Or Visit Website.

    Best Consultant for Experience Certificate Providers in Bangalore, India

    ReplyDelete
  3. Excellent and very cool idea and great content of different kinds of the valuable information’s.

    Genuine Fake Experience Certificate Providers in Hyderabad, India

    ReplyDelete
  4. We provide Experience Certificate of Physically Present & Government Registered Company from an MNC for any company or immigration.

    Get Genuine Experience Certificate Provider in Gurgaon, India

    ReplyDelete
  5. Data Lake companyis a popular cloud provider for many enterprises with small data requirements. Data Lake’s Data Lake is a cloud-based solution that allows data of any size or format to be ingested, stored, queried, and analyzed without needing to be transformed into a specific structure or to be placed into a database.

    ReplyDelete
  6. Gone through your blog it is very knowledgeable and have very interesting fact.Dreamsoft is the 20 years old consultancy providing fake experience certificate in Noida To get fake experience certificate in Noida you can call at 9599119376 or can the visit https://experiencecertificates.com/experience-certificate-provider-in-Noida.html

    ReplyDelete
  7. You've written a fantastic article. This article provided me with some useful knowledge. Thank you for providing this information.

    Top Consultancy Experience Certificate Providers in Bangalore, India
    Best Genuine Experience Certificate Providers in Delhi, India

    ReplyDelete
  8. Great post. Thanks for sharing. Keep Sharing.
    If you want to enlist yourself at Data Analytics training in Delhi. . then you must be searching for an institute that offers quality education to the students. Even they offer various opportunities to the students as they give a 100% job assistance to the students.

    ReplyDelete
  9. IT オフショア 開発 会社企業が本社とは異なるタイムゾーンの地域のパートナーに作業をアウトソーシングする場合に発生します。
    オフショア 開発 メリット
    運用コストの削減。
    柔軟性の向上。
    はるかに大きな労働力へのアクセス。


    オフショア 開発 メリット

    ReplyDelete

Post a Comment

Popular posts from this blog

Understanding spark architecture in Deep with YARN

OVERVIEW Apache spark is a Distributed Computing framework. By distributed it doesn’t imply that it can run only on a cluster. Spark can be configured on our local system also. But Since spark works great in clusters and in real time , it is being implemented in multi node clusters like Hadoop, we will consider a Hadoop cluster for explaining spark here. We can Execute spark on a spark cluster in following ways Interactive clients(scala shell,pyspark etc): Usually used for exploration while coding like python shell Submit a job (using spark submit utility):Always used for submitting a production application Basic Architecture Spark follows a Master/Slave Architecture. That is For every submitted application, it creates a Master Process and multiple slave processes. Master is the Driver and Slaves are the executors. Say If from a client machine, we have submitted a spark job to a cluster. Spark will create a driver process and multiple executors. Similraly...

Different ways of Transposing a Dataframe in Pyspark

When I have started coding on transposing Dataframes, I found below different methods.  I am sharing all those info here. Creation of a test Input Dataframe to be Transposed ds = {'one':[0.3, 1.2, 1.3, 1.5, 1.4, 1.0],'two':[0.6, 1.2, 1.7, 1.5, 1.4, 2.0]} df = sc.parallelize([ (k,) + tuple(v[0:]) for k,v in ds.items()]).toDF() df.show() method 1 (This method involves conversion of spark object to a python object (rdd to list of tuples of entire data)): inp_list=df.rdd.map(tuple).collect() # Creating a list of tuples of rows # Unpacking the list and zipping all tuples together except the header #list(zip(*inp_list))[1:] is having the transposed tuples  and list(zip(*inp_list))[0] is having the header df_transpose=spark.createDataFrame(list(zip(*inp_list))[1:],list(zip(*inp_list))[0])  df_transpose.show() method 2 (In this method only header data we are converting into python list. Rest of the transformations are carried out as...

Spark (With Python) : map() vs mapPartitions()

As a Beginner in spark, many developers will be having confusions over map() and mapPartitions() functions. This article is an attempt to resolve the confusions This blog is for : pyspark (spark with Python) Analysts and all those who are interested in learning pyspark. Pre-requesties: Should have a good knowledge in python as well as should have a basic knowledge of pyspark functions. If you use map() over an rdd , the function called  inside it will run for every record .It means if you have 10M records , function also will be executed 10M times. This is expensive especially when you are dealing with scenarios involving database connections and querying data from data base. Lets say inside map function, we have a function defined where we are connecting to a database and querying from it. Lets say our RDD is having 10M records. Now this function will execute 10M times which means 10M database connections will be created . This  is very expensive. In these kind ...