This is a small article in which I am trying to explain on how partitions are created ,how to know its numbers,how to control its numbers ..etc.
This blog is for :
pyspark (spark with Python) Analysts and all those who are interested in learning pyspark.
Pre-requesties:
Should have a good knowledge in python as well as should have a basic knowledge of pyspark functions.
Data Sets used :
For demonstrating purpose , I am using the below data set (file in HDFS):
"orderitems" with columns order_item,order_item_order_id,product_id,order_qty,order_item_subtotal,price_per_qty
If you are reading from a file to rdd in HDFS , by default it will create number of partitions equals number of blocks it has to read.
Say Default block size of HDFS (2.x)is 128mb, then if a file less than this size will be read to rdd by spark as 1 partition since only one block is used.
And if it is 256mb file, then 2 partitions and so on.
To know the current number of partitions of an rdd,
orderitems=sc.textFile("/public/retail_db/order_items")
orderitems.getNumPartitions()
To know , number of records in each partition, you can use
orderitems.glom().map(lambda x : len(x))
Now If I save it directly in to HDFS:
orderitems.saveAsTextFile("/user/pathirippilly/spark_jobs/output_data_set/orderitesmpartitioned")
we can see the how many number of partitions are there, that many number of files will be created.
To alter the default partitions created, we can do it in multiple ways
method 1:
While reading a file to rdd using sparkContext, you can pass the second argument as numPartitions as below
orderitems=sc.textFile("/public/retail_db/order_items",5) # Here 5 partitions will be created irrespective of the size of the file
method 2 :
If you want to decrease the number of partitions , we can use coalesce.
orderitems=sc.textFile("/public/retail_db/order_items")
orderitems.coalesce(1) # this will reduce the number of partitions to 1
note:
It avoids a full shuffle. If it's known that the number is decreasing then the executor can safely keep data on the minimum number of partitions,
only moving the data off the extra nodes, onto the nodes that we kept.
method 3:
This is using repartition.This is slower and expensive than coalesce(But you can increase or decrease partitions here)
orderitems=sc.textFile("/public/retail_db/order_items")
orderitems.repartition(5)# Here we are repartitioning to 5
Note:
Keep in mind that repartitioning your data is a fairly expensive operation.
This blog is for :
pyspark (spark with Python) Analysts and all those who are interested in learning pyspark.
Pre-requesties:
Should have a good knowledge in python as well as should have a basic knowledge of pyspark functions.
Data Sets used :
For demonstrating purpose , I am using the below data set (file in HDFS):
"orderitems" with columns order_item,order_item_order_id,product_id,order_qty,order_item_subtotal,price_per_qty
If you are reading from a file to rdd in HDFS , by default it will create number of partitions equals number of blocks it has to read.
Say Default block size of HDFS (2.x)is 128mb, then if a file less than this size will be read to rdd by spark as 1 partition since only one block is used.
And if it is 256mb file, then 2 partitions and so on.
To know the current number of partitions of an rdd,
orderitems=sc.textFile("/public/retail_db/order_items")
orderitems.getNumPartitions()
To know , number of records in each partition, you can use
orderitems.glom().map(lambda x : len(x))
Now If I save it directly in to HDFS:
orderitems.saveAsTextFile("/user/pathirippilly/spark_jobs/output_data_set/orderitesmpartitioned")
we can see the how many number of partitions are there, that many number of files will be created.
To alter the default partitions created, we can do it in multiple ways
method 1:
While reading a file to rdd using sparkContext, you can pass the second argument as numPartitions as below
orderitems=sc.textFile("/public/retail_db/order_items",5) # Here 5 partitions will be created irrespective of the size of the file
method 2 :
If you want to decrease the number of partitions , we can use coalesce.
orderitems=sc.textFile("/public/retail_db/order_items")
orderitems.coalesce(1) # this will reduce the number of partitions to 1
note:
It avoids a full shuffle. If it's known that the number is decreasing then the executor can safely keep data on the minimum number of partitions,
only moving the data off the extra nodes, onto the nodes that we kept.
method 3:
This is using repartition.This is slower and expensive than coalesce(But you can increase or decrease partitions here)
orderitems=sc.textFile("/public/retail_db/order_items")
orderitems.repartition(5)# Here we are repartitioning to 5
Note:
Keep in mind that repartitioning your data is a fairly expensive operation.
Comments
Post a Comment