Skip to main content

Posts

Pyspark : Maintaining a history data set in SCD2 (Slowly Changing Dimension 2) model

Maintaining a history table is always a common scenario with respect to any data engineering or data warehousing project. There are numerous ways of modelling a history data set. Below is an SCD2 model implementation of history data set in pyspark. There will be a source , say  SRC , data set with daily delta records (daily changes). And history , say  HIST  , with active and expired records history In HIST:  Any record will have  START_DT,END_DT  (ACIVE_FLAG if need , but I usually ignore it since END_DT is sufficient from my persepective)   END_DT='9999-12-31'  (or null or any higher default date value) represents active records and END_DT=<past_date value> represents expired records START_DT  gives the starting date of that record HIST  dataset is  partitioned on END_DT  (since every day , we are interested only in END_DT='9999-12-31' records. So other partitions will not...
Recent posts

Getting the Metadata Statistics of a HDFS file

 Below bash command will give you the metadata statistics such as distinct number of columns and   length of a record in a raw file. This will help you on landing phase, to determine if any raw files are corrupted.This will help you especially when there are n number of raw files are in your landing area and you need to do a quick testing of it. (The command is hard coded with .dat extension, please change according to your file type) Sample Out put of the below command is as follows:     2. Output will be saved to a metadata file metadata_stat_<unixtimestamp>      3. If we have more than one value for distinct_no_of_cols for a files, this means one or more records of your file is corrupted or there are some quoted strings in your records which contains delimiter itself as a value      4. If your file is a fixed length files , distinct_no_lengths columns should have only one value per file.  command: metafilename=metadat...

Different ways of Transposing a Dataframe in Pyspark

When I have started coding on transposing Dataframes, I found below different methods.  I am sharing all those info here. Creation of a test Input Dataframe to be Transposed ds = {'one':[0.3, 1.2, 1.3, 1.5, 1.4, 1.0],'two':[0.6, 1.2, 1.7, 1.5, 1.4, 2.0]} df = sc.parallelize([ (k,) + tuple(v[0:]) for k,v in ds.items()]).toDF() df.show() method 1 (This method involves conversion of spark object to a python object (rdd to list of tuples of entire data)): inp_list=df.rdd.map(tuple).collect() # Creating a list of tuples of rows # Unpacking the list and zipping all tuples together except the header #list(zip(*inp_list))[1:] is having the transposed tuples  and list(zip(*inp_list))[0] is having the header df_transpose=spark.createDataFrame(list(zip(*inp_list))[1:],list(zip(*inp_list))[0])  df_transpose.show() method 2 (In this method only header data we are converting into python list. Rest of the transformations are carried out as...

Understanding spark architecture in Deep with YARN

OVERVIEW Apache spark is a Distributed Computing framework. By distributed it doesn’t imply that it can run only on a cluster. Spark can be configured on our local system also. But Since spark works great in clusters and in real time , it is being implemented in multi node clusters like Hadoop, we will consider a Hadoop cluster for explaining spark here. We can Execute spark on a spark cluster in following ways Interactive clients(scala shell,pyspark etc): Usually used for exploration while coding like python shell Submit a job (using spark submit utility):Always used for submitting a production application Basic Architecture Spark follows a Master/Slave Architecture. That is For every submitted application, it creates a Master Process and multiple slave processes. Master is the Driver and Slaves are the executors. Say If from a client machine, we have submitted a spark job to a cluster. Spark will create a driver process and multiple executors. Similraly...