Maintaining a
history table is always a common scenario with respect to any data engineering
or data warehousing project. There are numerous ways of modelling a history
data set. Below is an SCD2 model implementation of history data set in pyspark.
There will be a
source , say SRC, data set with daily delta records (daily
changes). And history , say HIST , with active and expired records
history
In HIST:
- Any record will have START_DT,END_DT (ACIVE_FLAG
if need , but I usually ignore it since END_DT is sufficient from my
persepective)
- END_DT='9999-12-31' (or
null or any higher default date value) represents active records and END_DT=<past_date
value> represents expired records
- START_DT gives the
starting date of that record
- HIST dataset
is partitioned on END_DT (since every day , we are
interested only in END_DT='9999-12-31' records. So other partitions will
not be touched at all)
In
SRC , we have :
a. New records
b. Update to existing active
records
c. Deleted records(soft delete with
a flag, say DLT_FLG='Y'):
Note : This
scenario can vary since for some requirements we may need to expire existing
active records for these deleted records, but we won't be adding this as a new
active record. But in some scenarios , we may need to add this as a new record
after expiring the existing active record. I am currently choosing the former
option for now.
1.
New
records :
o
Records
which are available in SRC , but not available in HIST with filter
END_DT='9999-12-31'
o SRC left join HIST on(SRC.key=HIST.key
and HIST.END_DT='9999-12-31') where HIST.key is null ---> left_anti join
o
Select
required SRC columns and define START_DT=<today> and END_DT='9999-12-31'
(and define any other functional columns if needed wrt your requirement)
o
Perfomance
check :
o
We
are filtering out HIST with HIST.END_DT='9999-12-31', other partitions will not
be read by spark
o
Since
we are only using key columns from HIST , and HIST is a right data set in this
left join, this will qualify for a broadcast hah join.
2.
Update
to existing active records:
·
Records
which are available in SRC(with DLT_FLG<>'Y') and HIST (with filter
END_DT='9999-12-31') , but record attributes are changed (Basically delta
records).Here two things are needed
a.
Add
updated records from source as new active records.
·
SRC
inner join HIST on(SRC.key=HIST.key and SRC.DLT_FLG<>'Y' and
HIST.END_DT='9999-12-31')
·
Select
required SRC columns and define START_DT=<today> and
END_DT='9999-12-31'(and define any other functional columns if needed wrt your
requirement)
·
Perfomance
check :
o
We
are filtering out HIST with HIST.END_DT='9999-12-31', other partitions will not
be read by spark
o
Since
we are only using key columns from HIST and its an inner join, this will
qualify for a broadcast hash join.
b.
Identify
existing records corresponding to these records and expire it.
·
SRC
inner join HIST on(SRC.key=HIST.key and SRC.DLT_FLG<>'Y' and
HIST.END_DT='9999-12-31')
·
Select
required HIST columns and define END_DT=<today -1>
·
Perfomance
check :
o
We
are filtering out HIST with HIST.END_DT='9999-12-31', other partitions will not
be read by spark
o
Since
we are only using key columns from SRC and its an inner join, this will
qualify for a broadcast hash join.
3.
Deleted
records:
·
Records
which are available in SRC and HIST with SRC.DLT_FLG='Y'. As discussed
before, I am just considering that we need expire the existing records and no
need to add any new antry for this.
a.
Identify
existing records correponding to these records and expire it:
·
SRC
inner join HIST on(SRC.key=HIST.key and SRC.DLT_FLG='Y' and
HIST.END_DT='9999-12-31')
·
Select
required HIST columns and define END_DT=<today -1>
·
Perfomance
check :
o
<same
as 4b>
Note
: If you
see, 2b and 3a can be merged to one step since both are eventually doing
the same thing. For explaining purpose, I am keeping it as two.
4.
Retain
Unchanged records:
·
Active
records which are available in HIST for which no changes are happened:
a.
If
the record is in SRC means, either its new/updated/soft-deleted. So If a record
is not in SRC and available in HIST(with END_DT='9999-12-31') means its an
unchanged record.
·
HIST
left join SRC on(SRC.key=HIST.key and HIST.END_DT='9999-12-31') where SRC.key
is null ---> left_anti join
·
Select
all fields from HIST and keep as it is
·
Perfomance
check :
o
<same
as step 1 performance check>
5.
If
we see we are having 5 dataframes here
·
Dataframes:
§
newRecDf
--> output of step 1
§
existingRecUpdtDf
--> output of step 2a
§
existingRecExpiredDf
--> output of step 2b
§
deletedRecDf
--> output of step 3
§
UnchangedRecDf
--> output of step 4
·
Merging
all these dataframes will result in final dataframe to be loaded:
§ finalDf=functools.reduce(DataFrame.unionByName,[newRecDf,existingRecUpdtDf,existingRecExpiredDf,deletedRecDf,UnchangedRecDf])
6.
Write
this dataframe on END_DT partition
·
This
will update existing active record partition as well as add new expired date
partition.
§
finalDf.write.partitionBy("END_DT").parquet("<target_path>")
Summary:
If you analyze
the execution plan , all the sub dataframes (mentioned in step 5) are created
in separate stages in spark, which will be running in parallel (since
they don't have a dependency to each other ). Also almost all joins are
broadcast hash joins. And last but not the least, we never read the expired
partitions from history data set at any time.
Thanks for the wondeful blog
ReplyDeleteBig Data is the buzzword today, and the hype is not unwarranted. As companies come to realize that they have valuable information just sitting around, waiting to be analyzed, their anxiety about how to get started can be overwhelming. To provide some relief, here are 5 steps to take when you find yourself facing a big data analytics project.
ReplyDeletePost is really supportive to all of us. Eager that these kind of information you post in future also. Otherwise if any One Want Experience Certificate for Fill your Career Gap So Contact Us-9599119376 Or Visit Website.
ReplyDeleteBest Consultant for Experience Certificate Providers in Bangalore, India
Excellent article for the people who need information about this course.
ReplyDeleteBest Company for Experience Certificate Providers in Chennai, India
Excellent and very cool idea and great content of different kinds of the valuable information’s.
ReplyDeleteGenuine Fake Experience Certificate Providers in Hyderabad, India
We provide Experience Certificate of Physically Present & Government Registered Company from an MNC for any company or immigration.
ReplyDeleteGet Genuine Experience Certificate Provider in Gurgaon, India
Data Lake companyis a popular cloud provider for many enterprises with small data requirements. Data Lake’s Data Lake is a cloud-based solution that allows data of any size or format to be ingested, stored, queried, and analyzed without needing to be transformed into a specific structure or to be placed into a database.
ReplyDeleteGone through your blog it is very knowledgeable and have very interesting fact.Dreamsoft is the 20 years old consultancy providing fake experience certificate in Noida To get fake experience certificate in Noida you can call at 9599119376 or can the visit https://experiencecertificates.com/experience-certificate-provider-in-Noida.html
ReplyDeleteYou've written a fantastic article. This article provided me with some useful knowledge. Thank you for providing this information.
ReplyDeleteTop Consultancy Experience Certificate Providers in Bangalore, India
Best Genuine Experience Certificate Providers in Delhi, India
Great post. Thanks for sharing. Keep Sharing.
ReplyDeleteIf you want to enlist yourself at Data Analytics training in Delhi. . then you must be searching for an institute that offers quality education to the students. Even they offer various opportunities to the students as they give a 100% job assistance to the students.
IT オフショア 開発 会社企業が本社とは異なるタイムゾーンの地域のパートナーに作業をアウトソーシングする場合に発生します。
ReplyDeleteオフショア 開発 メリット
運用コストの削減。
柔軟性の向上。
はるかに大きな労働力へのアクセス。
オフショア 開発 メリット