Skip to main content

Understanding spark architecture in Deep with YARN




OVERVIEW

Apache spark is a Distributed Computing framework. By distributed it doesn’t imply that it can run only on a cluster. Spark can be configured on our local system also. But Since spark works great in clusters and in real time , it is being implemented in multi node clusters like Hadoop, we will consider a Hadoop cluster for explaining spark here.
We can Execute spark on a spark cluster in following ways
  • Interactive clients(scala shell,pyspark etc): Usually used for exploration while coding like python shell
  • Submit a job (using spark submit utility):Always used for submitting a production application

Basic Architecture

Spark follows a Master/Slave Architecture. That is For every submitted application, it creates a Master Process and multiple slave processes. Master is the Driver and Slaves are the executors.
Say If from a client machine, we have submitted a spark job to a cluster. Spark will create a driver process and multiple executors. Similraly  if another spark job is submitted to same cluster, it will create again “one Driver- Many executors” combo.Thus for every program it will do the same. Driver is responsible for Analyzing, distributing, scheduling and monitoring work across the cluster.Driver is also responsible for maintaining necessary information to executors during the lifetime of the application


Detailed Architecture

Below is the general  architectural diagram for spark cluster.






               Main Components of a Spark Cluster are :
  •        Worker Nodes
  •        Cluster Manager
  •        Driver Program(Spark Context)
  •        Executor
  •       Task

Worked Nodes:

These are nothing but physical nodes with RAM,CPU,HDD(SSD) etc. RAM configured will be usually high since spark utilizes in-memory computation of high volumes of data. And these RAM,CPU,HDD,Network Bandwidth etc are called resources.

Cluster Manger:

This component will control entire resource management and scheduling of cluster. When you submit a spark job , the driver component (spark Context) will connects  with the master (master is a running Spark instance that connects to a cluster manager for resources.)which will ask cluster manager to allocate available resources asked by driver. So Cluster Manger only provides the executors for doing data processing in each worker nodes.
It find the worker nodes where the executors will be launched.
Spark comes with a default cluster manager called “Stand alone cluster manager”. But Spark can run on other cluster managers like YARN,MESOS etc. Most widely used is YARN in Hadoop cluster.

Driver Program:

This is nothing but sparkContext of your spark program. When you submit a spark job to cluster, the spark Context defined (whch is usually a line of code) inside the spark Code will run first and it is  called the driver. This controls entire life cycle of spark code. Killing the driver code implies, killing the entire execution. If you are writing the entire code for doing a spark submit, sparkContext is defined as first line of code inside the program. But if we are launching an interactive shell, sparkContext is automatically instantiated.

Executor:

Executor is nothing but a JVM container with required resources to execute the code inside each worker node.
Task:

Task is the smallest  unit of work done by spark under each Executor.
              
Apache Spark has a well-defined layered architecture where all the spark components and layers are loosely coupled. This architecture is further integrated with various extensions and libraries. Apache Spark Architecture is based on two main abstractions:
  • Resilient Distributed Dataset (RDD)
  • Directed Acyclic Graph (DAG)
RDD:

·        Resilient: Fault tolerant and is capable of rebuilding data on failure
·        Distributed: Distributed data among the multiple nodes in a cluster
·        Dataset: Collection of partitioned data with values

Resilient Distributed Datasets. Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster.

Transformation and Actions:

Two types of Apache Spark RDD operations are- Transformations and Actions. A Transformation is a function that produces new RDD from the existing RDDs but when we want to work with the actual dataset, at that point Action is performed. When the action is triggered after the result, new RDD is not formed like transformation.

Transformation:

Spark Transformation is a function that produces new RDD from the existing RDDs. It takes RDD as input and produces one or more RDD as output. Each time it creates new RDD when we apply any transformation. And the newly created RDDs can not be reverted , so they are Acyclic.Also any RDD is immutable so that it can be only transformed.
Applying transformation built an RDD lineage, with the entire parent RDDs of the final RDD(s). RDD lineage, also known as RDD operator graph or RDD dependency graph. It is a logical execution plan i.e., it is Directed Acyclic Graph (DAG) of the entire parent RDDs of RDD.
Transformations are lazy in nature i.e., they get execute when we call an action. They are not executed immediately. Two most basic type of transformations is a map(), filter().
After the transformation, the resultant RDD is always different from its parent RDD. It can be smaller (e.g. filter, count, distinct, sample), bigger (e.g. flatMap(), union(), Cartesian()) or the same size (e.g. map).There are two types of transformation

Narrow transformation – In Narrow transformation, all the elements that are required to compute the records in single partition live in the single partition of parent RDD. A limited subset of partition is used to calculate the result. Narrow transformations are the result of map(), filter().

Wide transformation – In wide transformation, all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. The partition may live in many partitions of parent RDD. Wide transformations are the result of groupbyKey() and reducebyKey().

Actions:
Transformations create RDDs from each other, but when we want to work with the actual dataset, at that point action is performed. When the action is triggered after the result, new RDD is not formed like transformation. Thus, Actions are Spark RDD operations that give non-RDD values. The values of action are stored to drivers or to the external storage system. It brings laziness of RDD into motion.
An action is one of the ways of sending data from Executer to the driver. Executors are agents that are responsible for executing a task. While the driver is a JVM process that coordinates workers and execution of the task.
count(),collect(),take(),top(),reduce(),fold()

Spark Job Life Cycle

Submitting the job
When you submit a job on a spark cluster , first sparkContext will start running which is nothing but your Driver Program.Under sparkContext only , all other tranformation and actions takes place. As per requested by driver code only , resources will be allocated And output of every action is received by driver or JVM only. So its important that how you are submitting your job . There are two ways of submitting your job to a cluster

1. Client Mode
2. Cluster Mode

Client Mode is nothing but you will be submitting your job through edge Node or Gate Way node which is associated to your cluster. Here the driver code will be running on your gate way node.That means if any interruptions happens on your gate way node or if your gate way node is closed, execution will be killed. So client mode is preferred while testing and debugging your code
When you deploy as Cluster Mode (that is you will be submitting your code with –deploy-mode  as ‘cluster’ using spark submit),the master node of the cluster communicate to the  cluster manager for launching an executor (executor here is nothing but a JVM with some required resources) and cluster manager will launch the executor in a selected worker node(Data node) and driver code will run here. Here we don’t have any role in selecting the worker node and we can not kill the code from client node.So here execution of driver code is controlled by cluster manager. And that’s why production codes are in cluster mode.To kill such an application , in YARN as cluster manager
1. Copy past the application Id from the spark scheduler, for instance application_1428487296152_25597
2. Connect to the server that have launch the job
3. Yarn application -kill application_1428487296152_25597
Lazy Evaluation

So as described, one you submit the application ,
1.     Spark-submit launches the driver program on the same node in (client mode) or on the cluster (cluster mode) and invokes the main method specified by the user.
2.     The driver program contacts the cluster manager to ask for resources to launch executor JVMs based on the configuration parameters supplied.
3.     The cluster manager launches executor JVMs on worker nodes.
4.     The driver process scans through the user application. Based on the RDD actions and transformations in the program, Spark creates an operator graph
This is what we call as DAG(Directed Acyclic Graph)

DAG

 (Directed Acyclic Graph) DAG in Apache Spark is a set of Vertices and Edges, where vertices represent the RDDs and the edges represent the Operation to be applied on RDD. In Spark DAG, every edge directs from earlier to later in the sequence. On the calling of Action, the created DAG submits to DAG Scheduler which further splits the graph into the stages of the task.
DAG a finite direct graph with no directed cycles. There are finitely many vertices and edges, where each edge directed from one vertex to another. It contains a sequence of vertices such that every edge is directed from earlier to later in the sequence. It is a strict generalization of MapReduce model. DAG operations can do better global optimization than other systems like MapReduce. The picture of DAG becomes clear in more complex jobs.
Apache Spark DAG allows the user to dive into the stage and expand on detail on any stage. In the stage view, the details of all RDDs belonging to that stage are expanded. The Scheduler splits the Spark RDD into stages based on various transformation applied. Each stage is comprised of tasks, based on the partitions of the RDD, which will perform same computation in parallel. The graph here refers to navigation, and directed and acyclic refers to how it is done.



Need of Directed Acyclic Graph in Spark

The limitations of Hadoop MapReduce became a key point to introduce DAG in Spark. The computation through MapReduce in three steps:

·        The data is read from HDFS.
·        Then apply Map and Reduce operations.
·        The computed result is written back to HDFS.

Each MapReduce operation is independent of each other and HADOOP has no idea of which Map reduce would come next. Sometimes for some iteration, it is irrelevant to read and write back the immediate result between two map-reduce jobs. In such case, the memory in stable storage (HDFS) or disk memory gets wasted.
In multiple-step, till the completion of the previous job all the jobs block from the beginning. As a result, complex computation can require a long time with small data volume.
While in Spark, a DAG (Directed Acyclic Graph) of consecutive computation stages is formed. In this way, we optimize the execution plan, e.g. to minimize shuffling data around. In contrast, it is done manually in MapReduce by tuning each MapReduce step.

How DAG works in Spark?
  • The interpreter is the first layer, using a Scala interpreter, Spark interprets the code with some modifications.
  • Spark creates an operator graph when you enter your code in Spark console.
  • When we call an Action on Spark RDD at a high level, Spark submits the operator graph to the DAG Scheduler


DAGScheduler is the scheduling layer of Apache Spark that implements stage-oriented scheduling. It transforms a logical execution plan (i.e. RDD lineage of dependencies built using RDD transformations) to a physical execution plan (using stages).
The execution DAG or physical execution plan is the DAG of stages.



  •         The DAG scheduler divides operators into stages of tasks. A stage is comprised of tasks based on partitions of the input data. The DAG scheduler pipelines operators together. For e.g. Many map operators can be scheduled in a single stage. The final result of a DAG scheduler is a set of stages.
  •         The Stages are passed on to the Task Scheduler.The task scheduler launches tasks via cluster manager (Spark Standalone/Yarn/Mesos). The task scheduler doesn't know about dependencies of the stages.
  •         The Workers execute the task on the slave.

At high level, there are two transformations that can be applied onto the RDDs, namely narrow transformation and wide transformation. Wide transformations basically result in stage boundaries.
Lets take an example , a simple word count job on “wordCount.txt” file residing in Hadoop cluster
Below is the spark code snippet

rdd=sc.textFile("/user/pathirippilly/sample_data_mr/wordCount.txt",5)
pipeRDD=rdd.map(lambda x : x.split(" ")).flatMap(lambda words : map(lambda word :        (word,1),words)).reduceByKey(lambda a,b:a+b)
pipeRDD.collect()
This sequence of commands implicitly defines a DAG of RDD objects (RDD lineage) that will be used later when an action is called. Each RDD maintains a pointer to one or more parents along with the metadata about what type of relationship it has with the parent
For example, when we call  b = a.map() on a RDD, the RDD b keeps a reference to its parent a, that's a lineage
To display the lineage of an RDD, Spark provides a debug method toDebugString(). For example executing toDebugString() on the pipeRDD RDD, will output the following:
PythonRDD[6] at RDD at PythonRDD.scala:48 []\n |  MapPartitionsRDD[5] at mapPartitions at PythonRDD.scala:122 []\n |  ShuffledRDD[4] at partitionBy at NativeMethodAccessorImpl.java:0 []\n +-(5) PairwiseRDD[3] at reduceByKey at <stdin>:1 []\n    |  PythonRDD[2] at reduceByKey at <stdin>:1 []\n    |  /user/pathirippilly/sample_data_mr/wordCount.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n    |  /user/pathirippilly/sample_data_mr/wordCount.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []
The first line (from the bottom) shows the input RDD. We created this RDD by calling sc.textFile(). Below is the more diagrammatic view of the DAG graph created from the given RDD.


Once the DAG is build, the Spark scheduler creates a physical execution plan. As mentioned above, the DAG scheduler splits the graph into multiple stages, the stages are created based on the transformations. The narrow transformations will be grouped (pipe-lined) together into a single stage. So for our example, Spark will create two stage execution as follows:

The DAG scheduler will then submit the stages into the task scheduler. The number of tasks submitted depends on the number of partitions present in the textFile. Fox example consider we have 4 partitions in this example, then there will be 4 set of tasks created and submitted in parallel provided there are enough slaves/cores. Below diagram illustrates this in more detail:

For more detailed information i suggest you to go through the following youtube videos where the Spark creators give in depth details about the DAG and execution plan and lifetime.

Now if you summarize the application life cycle:

  1. The user submits a spark application using the spark-submit command.
  2. Spark-submit launches the driver program on the same node in (client mode) or on the cluster (cluster mode) and invokes the main method specified by the user.
  3. The driver program contacts the cluster manager to ask for resources to launch executor JVMs based on the configuration parameters supplied.
  4. The cluster manager launches executor JVMs on worker nodes.
  5. The driver process scans through the user application. Based on the RDD actions and transformations in the program, Spark creates an operator graph.
  6. When an action (such as collect) is called, the graph is submitted to a DAG scheduler. The DAG scheduler divides the operator graph into stages.
  7. A stage comprises tasks based on partitions of the input data. The DAG scheduler pipelines operators together to optimize the graph. For instance, many map operators can be scheduled in a single stage. This optimization is the key to Spark's performance. The final result of a DAG scheduler is a set of stages.
  8. The stages are passed on to the task scheduler. The task scheduler launches tasks via cluster manager. (Spark Standalone/Yarn/Mesos). The task scheduler doesn't know about dependencies among stages.
  9. Tasks are run on executor processes to compute and save results.
  10. If the driver's main method exits or it calls SparkContext.stop(), it will terminate the executors and release resources from the cluster manager.
OVERVIEW OF YARN as CLUSTER MANAGER

YARN is a generic resource-management framework for distributed workloads; in other words, a cluster-level operating system. Although part of the Hadoop ecosystem, YARN can support a lot of varied compute-frameworks (such as Tez, and Spark) in addition to MapReduce.
The central theme of YARN is the division of resource-management functionalities into a global ResourceManager (RM) and per-application ApplicationMaster (AM). An application is the unit of scheduling on a YARN cluster; it is either a single job or a DAG of jobs (jobs here could mean a Spark job, an Hive query or any similar constructs).


The ResourceManager and the NodeManager form the data-computation framework. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system. The NodeManager is the per-machine agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler 

The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks 

Application

A Spark application is the highest-level unit of computation in Spark. A Spark application can be used for a single batch job, an interactive session with multiple jobs, or a long-lived server continually satisfying requests. A Spark job can consist of more than just a single map and reduce. On the other hand, a YARN application is the unit of scheduling and resource-allocation. There is a one-to-one mapping between these two terms in case of a Spark workload on YARN; i.e, a Spark application submitted to YARN translates into a YARN application.

YARN Client
A program which submits an application to YARN is called a YARN client

The notion of driver and how it relates to the concept of client is important to understanding Spark interactions with YARN. In particular, the location of the driver w.r.t the client & the ApplicationMaster defines the deployment mode in which a Spark application runs: YARN client mode or YARN cluster mode.
Client mode: The driver program, in this mode, runs on the YARN client. Thus, the driver is not managed as part of the YARN cluster.
Take note that, since the driver is part of the client and, as mentioned above in the Spark Driversection, the driver program must listen for and accept incoming connections from its executors throughout its lifetime, the client cannot exit till application completion.



Cluster mode: The driver program, in this mode, runs on the ApplicationMaster, which itself runs in a container on the YARN cluster. The YARN client just pulls status from the ApplicationMaster. In this case, the client could exit after application submission.



Executor and Container

The first fact to understand is: each Spark executor runs as a YARN container [2]. This and the fact that Spark executors for an application are fixed, and so are the resources allotted to each executor, a Spark application takes up resources for its entire duration. This is in contrast with a MapReduce application which constantly returns resources at the end of each task, and is again allotted at the start of the next task.
Also, since each Spark executor runs in a YARN container, YARN & Spark configurations have a slight interference effect. I will illustrate this in the next segment.

Configuration and Resource Tuning


We will first focus on some YARN configurations, and understand their implications, independent of Spark

yarn.nodemanager.resource.memory-mb

It is the amount of physical memory, in MB, that can be allocated for containers in a node. This value has to be lower than the memory available on the node.
<name>yarn.nodemanager.resource.memory-mb</name>
<value>16384</value> <!-- 16 GB -->

yarn.scheduler.minimum-allocation-mb

It is the minimum allocation for every container request at the ResourceManager, in MBs. In other words, the ResourceManager can allocate containers only in increments of this value. Thus, this provides guidance on how to split node resources into containers. Memory requests lower than this will throw a InvalidResourceRequestException.
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>2048</value><!-- 2 GB -->

yarn.scheduler.maximum-allocation-mb

The maximum allocation for every container request at the ResourceManager, in MBs. Memory requests higher than this will throw a InvalidResourceRequestException.
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>8192</value><!-- 8 GB -->
 
Thus, in summary, the above configurations mean that the ResourceManager can only allocate memory to containers in increments of yarn.scheduler.minimum-allocation-mb and not exceed yarn.scheduler.maximum-allocation-mb, and it should not be more than the total allocated memory of the node, as defined by yarn.nodemanager.resource.memory-mb.


A MUCH DEEPER VIEW

 Any, any Spark process that would ever work on your cluster or local machine is a JVM process. So what is a JVM

JVM

JVM is a engine that provides runtime environment to drive the Java Code or applications. It converts Java bytecode into machines language. JVM is a part of JRE(Java Run Environment). It stands for Java Virtual Machine
  • In other programming languages, the compiler produces machine code for a particular system. However, Java compiler produces code for a Virtual Machine known as Java Virtual Machine.
  • First, Java code is complied into bytecode. This bytecode gets interpreted on different machines
  • Between host system and Java source, Bytecode is an intermediary language.
  • JVM is responsible for allocating memory space.
The JVM memory consists of the following segments:
·        Heap Memory, which is the storage for Java objects
·        Non-Heap Memory, which is used by Java to store loaded classes and other meta-data
·        JVM code itself, JVM internal structures, loaded profiler agent code and data, etc.

Heap:

The JVM has a heap that is the runtime data area from which memory for all class instances and arrays are allocated. It is created at the JVM start-up.
The heap size may be configured with the following VM options:
·        -Xmx<size> - to set the maximum Java heap size
·        -Xms<size> - to set the initial Java heap size


By default, the maximum heap size is 64 Mb.
Heap memory for objects is reclaimed by an automatic memory management system which is known as a garbage collector. The heap may be of a fixed size or may be expanded and shrunk, depending on the garbage collector's strategy.

Memory management in spark(versions below 1.6)

So for spark , as for any JVM process, you can configure its heap size with -Xmx and -Xms flags of the JVM. How does this process use its heap memory and why does it need it at all? Here’s the diagram of Spark memory allocation inside of the JVM heap:

                          


By default, Spark starts with 512MB JVM heap

To be on a safe side and avoid OOM error Spark allows to utilize only 90% of the heap, which is controlled by the spark.storage.safetyFractionparameter of Spark. Ok, as you might have heard of Spark as an in-memory tool, Spark allows you to store some data in memory.But this misconception is discussed and corrected in this blog:  https://0x0fff.com/spark-misconceptions/

1.     First thing is that, any calculation that happens in any modern day computing is in-memory.Spark also doing the same thing, reads from some source cache it in memory ,process it and writes back to some target. So its utilizing the cache effectively.
2.     Second thing is that , Do you think that Spark processes all the transformations in memory? You would be disappointed, but the heart of Spark, “shuffle”, writes data to disks. If you have a “group by” statement in your SparkSQL query or you are just transforming RDD to PairRDD and calling on it some aggregation by key, you are forcing Spark to distribute data among the partitions based on the hash value of the key. The “shuffle” process consists of two phases, usually referred as “map” and “reduce”. “Map” just calculates hash values of your key (or other partitioning function if you set it manually) and outputs the data to N separate files on the local filesystem, where N is the number of partitions on the “reduce” side. “Reduce” side polls the “map” side for the data and merges it in new partitions. So if you have an RDD of M partitions and you transform it to pair RDD with N partitions, there would be M*N files created on the local filesystems in your cluster, holding all the data of the specific RDD. There are some optimizations available to reduce amount of files. Also there are some work undergo to pre-sort them and then “merge” on “reduce” side, but this does not change the fact that each time you need to “shuffle” you data you are putting it to the HDDs.

So some amount of memory is reserved for the caching of the data you are processing, and this part is usually 60% of the safe heap, which is controlled by the spark.storage.memoryFraction parameter.
So if you want to know how much data you can cache in Spark, you should take the sum of all the heap sizes for all the executors, multiply it by safetyFraction and by storage.memoryFraction, and by default it is 0.9 * 0.6 = 0.54 or 54% of the total heap size you allow Spark to use.
Now a bit more about the shuffle memory. It is calculated as “Heap Size” * spark.shuffle.safetyFraction * spark.shuffle.memoryFraction. Default value for spark.shuffle.safetyFraction is 0.8 or 80%, default value for spark.shuffle.memoryFraction is 0.2 or 20%. So finally you can use up to 0.8*0.2 = 0.16 or 16% of the JVM heap for the shuffle. But in general Spark uses this memory for the exact task it is called after – for Shufflle
When the shuffle is performed, sometimes you as well need to sort the data. When you sort the data, you usually need a buffer to store the sorted data (remember, you cannot modify the data in the LRU cache in place as it is there to be reused later). So it needs some amount of RAM to store the sorted chunks of data. What happens if you don’t have enough memory to sort the data? There is a wide range of algorithms usually referenced as “external sorting” (http://en.wikipedia.org/wiki/External_sorting) that allows you to sort the data chunk-by-chunk and then merge the final result together.

The last part of RAM I haven’t yet cover is “unroll” memory. The amount of RAM that is allowed to be utilized by unroll process is spark.storage.unrollFraction * spark.storage.memoryFraction * spark.storage.safetyFraction, which with the default values equal to 0.2 * 0.6 * 0.9 = 0.108 or 10.8% of the heap. This is the memory that can be used when you are unrolling the block of data into the memory. Why do you need to unroll it after all? Spark allows you to store the data both in serialized and deserialized form. The data in serialized form cannot be used directly, so you have to unroll it before using, so this is the RAM that is used for unrolling. It is shared with the storage RAM, which means that if you need some memory to unroll the data, this might cause dropping some of the partitions stored in the Spark LRU cache.
Now that’s all about memory management in spark. I would like to

Memory management in spark(versions above 1.6)

From spark 1.6.0+, we have unified memory manager. Diagram is given below


  1. Reserved Memory. This is the memory reserved by the system, and its size is hardcoded. As of Spark 1.6.0, its value is 300MB, which means that this 300MB of RAM does not participate in Spark memory region size calculations, and its size cannot be changed in any way without Spark recompilation or setting spark.testing.reservedMemory, which is not recommended as it is a testing parameter not intended to be used in production. Be aware, this memory is only called “reserved”, in fact it is not used by Spark in any way, but it sets the limit on what you can allocate for Spark usage. Even if you want to give all the Java Heap for Spark to cache your data, you won’t be able to do so as this “reserved” part would remain spare (not really spare, it would store lots of Spark internal objects). For your information, if you don’t give Spark executor at least 1.5 * Reserved Memory = 450MB heap, it will fail with “please use larger heap size” error message.

  1. User Memory. This is the memory pool that remains after the allocation of Spark Memory, and it is completely up to you to use it in a way you like. You can store your own data structures there that would be used in RDD transformations. For example, you can rewrite Spark aggregation by using mapPartitions transformation maintaining hash table for this aggregation to run, which would consume so called User Memory. In Spark 1.6.0 the size of this memory pool can be calculated as (“Java Heap” – “Reserved Memory”) * (1.0 – spark.memory.fraction), which is by default equal to (“Java Heap” – 300MB) * 0.25. For example, with 4GB heap you would have 949MB of User Memory. And again, this is the User Memory and its completely up to you what would be stored in this RAM and how, Spark makes completely no accounting on what you do there and whether you respect 

  1. Spark Memory. Finally, this is the memory pool managed by Apache Spark. Its size can be calculated as (“Java Heap” – “Reserved Memory”) * spark.memory.fraction, and with Spark 1.6.0 defaults it gives us (“Java Heap” – 300MB) * 0.75. For example, with 4GB heap this pool would be 2847MB in size. This whole pool is split into 2 regions – Storage Memory and Execution Memory, and the boundary between them is set by spark.memory.storageFractionparameter, which defaults to 0.5. The advantage of this new memory management scheme is that this boundary is not static, and in case of memory pressure the boundary would be moved, i.e. one region would grow by borrowing space from another one. I would discuss the “moving” this boundary a bit later, now let’s focus on how this memory is being used:
·        Storage Memory. This pool is used for both storing Apache Spark cached data and for temporary space serialized data “unroll”. Also all the “broadcast” variables are stored there as cached blocks. In case you’re curious, here’s the code of unroll. As you may see, it does not require that enough memory for unrolled block to be available – in case there is not enough memory to fit the whole unrolled partition it would directly put it to the drive if desired persistence level allows this. As of “broadcast”, all the broadcast variables are stored in cache with MEMORY_AND_DISKpersistence level.
·        Execution Memory. This pool is used for storing the objects required during the execution of Spark tasks. For example, it is used to store shuffle intermediate buffer on the Map side in memory, also it is used to store hash table for hash aggregation step. This pool also supports spilling on disk if not enough memory is available, but the blocks from this pool cannot be forcefully evicted by other threads (tasks).
Ok, so now let’s focus on the moving boundary between Storage Memory and Execution Memory. Due to nature of Execution Memory, you cannot forcefully evict blocks from this pool, because this is the data used in intermediate computations and the process requiring this memory would simply fail if the block it refers to won’t be found. But it is not so for the Storage Memory – it is just a cache of blocks stored in RAM, and if we evict the block from there we can just update the block metadata reflecting the fact this block was evicted to HDD (or simply removed), and trying to access this block Spark would read it from HDD (or recalculate in case your persistence level does not allow to spill on HDD).
So, we can forcefully evict the block from Storage Memory, but cannot do so from Execution Memory. When Execution Memory pool can borrow some space from Storage Memory? It happens when either:
  • There is free space available in Storage Memory pool, i.e. cached blocks don’t use all the memory available there. Then it just reduces the Storage Memory pool size, increasing the Execution Memory pool.
  • Storage Memory pool size exceeds the initial Storage Memory region size and it has all this space utilized. This situation causes forceful eviction of the blocks from Storage Memory pool, unless it reaches its initial size.
In turn, Storage Memory pool can borrow some space from Execution Memory pool only if there is some free space in Execution Memory pool available.
Initial Storage Memory region size, as you might remember, is calculated as Spark Memory” * spark.memory.storageFraction = (“Java Heap” – “Reserved Memory”) * spark.memory.fraction * spark.memory.storageFraction. With default values, this is equal to (“Java Heap” – 300MB) * 0.75 * 0.5 = (“Java Heap” – 300MB) * 0.375. For 4GB heap this would result in 1423.5MB of RAM in initial Storage Memory region.
This implies that if we use Spark cache and the total amount of data cached on executor is at least the same as initial Storage Memory region size, we are guaranteed that storage region size would be at least as big as its initial size, because we won’t be able to evict the data from it making it smaller. However, if your Execution Memory region has grown beyond its initial size before you filled the Storage Memory region, you won’t be able to forcefully evict entries from Execution Memory, so you would end up with smaller Storage Memory region while execution holds its blocks in memory.

Architecture of spark with YARN as cluster manager
When you start a spark cluster with YARN as cluster manager, it looks like as below

When you have a YARN cluster, it has a YARN Resource Manager daemon that controls the cluster resources (practically memory) and a series of YARN Node Managers running on the cluster nodes and controlling node resource utilization. From the YARN standpoint, each node represents a pool of RAM that you have a control over. When you request some resources from YARN Resource Manager, it gives you information of which Node Managers you can contact to bring up the execution containers for you. Each execution container is a JVM with requested heap size. JVM locations are chosen by the YARN Resource Manager and you have no control over it – if the node has 64GB of RAM controlled by YARN (yarn.nodemanager.resource.memory-mb setting in yarn-site.xml) and you request 10 executors with 4GB each, all of them can be easily started on a single YARN node even if you have a big cluster.

When you start Spark cluster on top of YARN, you specify the amount of executors you need (–num-executors flag or spark.executor.instances parameter), amount of memory to be used for each of the executors (–executor-memory flag or spark.executor.memory  parameter), amount of cores allowed to use for each executors (–executor-cores flag of spark.executor.cores parameter), and amount of cores dedicated for each task’s execution (spark.task.cpus parameter). Also you specify the amount of memory to be used by the driver application (–driver-memory flag or spark.driver.memory parameter).
When you execute something on a cluster, the processing of your job is split up into stages, and each stage is split into tasks. Each task is scheduled separately. You can consider each of the JVMs working as executors as a pool of task execution slots, each executor would give you spark.executor.cores / spark.task.cpus execution slots for your tasks, with a total of spark.executor.instances executors. Here’s an example. The cluster with 12 nodes running YARN Node Managers, 64GB of RAM each and 32 CPU cores each (16 physical cores with hyper threading). This way on each node you can start 2 executors with 26GB of RAM each (leave some RAM for system processes, YARN NM and DataNode), each executor with 12 cores to be utilized for tasks (leave some cores for system processes, YARN NM and DataNode). So In total your cluster would handle 12 machines * 2 executors per machine * 12 cores per executor / 1 core for each task = 288 task slots. This means that your Spark cluster would be able to run up to 288 tasks in parallel thus utilizing almost all the resources you have on this cluster

TASK
Task is a single unit of work performed by Spark, and is executed as a thread in the executor JVM. This is the secret under the Spark low job startup time – forking additional thread inside of the JVM is much faster that bringing up the whole JVM, which is performed when you start a MapReduce job in Hadoop.

PARTITIONS
Now let’s focus on another Spark abstraction called “partition”. All the data you work with in Spark is split into partitions. Partition size completely depends on the data source you use. For most of the methods to read the data in Spark you can specify the amount of partitions you want to have in your RDD. When you read a file from HDFS, you use Hadoop’s InputFormat to make it. By default each input split returned by InputFormat is mapped to a single partition in RDD. For most of the files on HDFS single input split is generated for a single block of data stored on HDFS, which equals to approximately 64MB of 128MB of data. Approximately, because the data in HDFS is split on exact block boundaries in bytes, but when it is processed it is split on the record splits. For text file the splitting character is the newline char, for sequence file it is the block end and so on. The only exception of this rule is compressed files – if you have the whole text file compressed, then it cannot be split into records and the whole file would become a single input split and thus a single partition in Spark and you have to manually repartition it.

SHUFFLE

What is the shuffle in general? Imagine that you have a list of phone call detail records in a table and you want to calculate amount of calls happened each day. This way you would set the “day” as your key, and for each record (i.e. for each call) you would emit “1” as a value. After this you would sum up values for each key, which would be an answer to your question – total amount of records for each day. But when you store the data across the cluster, how can you sum up the values for the same key stored on different machines? The only way to do so is to make all the values for the same key be on the same machine, after this you would be able to sum them up.
There are many different tasks that require shuffling of the data across the cluster, for instance table join – to join two tables on the field “id”, you must be sure that all the data for the same values of “id” for both of the tables are stored in the same chunks. Imagine the tables with integer keys ranging from 1 to 1’000’000. By storing the data in same chunks I mean that for instance for both tables values of the key 1-100 are stored in a single partition/chunk, this way instead of going through the whole second table for each partition of the first one, we can join partition with partition directly, because we know that the key values 1-100 are stored only in these two partitions. To achieve this both tables should have the same number of partitions, this way their join would require much less computations. So now you can understand how important shuffling is.
Discussing this topic, I would follow the MapReduce naming convention. In the shuffle operation, the task that emits the data in the source executor is “mapper”, the task that consumes the data into the target executor is “reducer”, and what happens between them is “shuffle”.
Shuffling in general has 2 important compression parameters: spark.shuffle.compress – whether the engine would compress shuffle outputs or not, and spark.shuffle.spill.compress – whether to compress intermediate shuffle spill files or not. Both have the value “true” by default, and both would use spark.io.compression.codec codec for compressing the data, which is snappy by default.


Comments

  1. It’s very informative. Thanks for sharing these wonderful ideas. You can check more about
    Data Analytics


    ReplyDelete
  2. Very knowledgeable Blog.Thanks for providing such a valuable Knowledge on Big Data.

    ReplyDelete
  3. Very informative article. Big Data is unavoidable count on growth of Industry 4.0.

    Big data help preventive and predictive analytics more accurate and precise.

    ReplyDelete
  4. Clavax is a top Android app development company that provides offshore Android application development services in Australia, America, Middle East built around specific business requirements of the customers. We deliver the highest level of customer service by deploying innovative and collaborative project management systems to build the most professional, robust, and highly scalable web & mobile solutions with the highest quality standards.

    ReplyDelete
  5. Hi Dear,
    Thanks for sharing such useful blog. Really! This Blog is very informative for us which contain lot of information about Business intelligence, I like this post. Please visit at "Data Analytics", i hope you may like our Business intelligence.

    Visit Here - https://portal.enterprisedna.co/p/enterprise-dna-membership/?affcode=78853_ume2zlih'

    Thanks, Regards,

    ReplyDelete
  6. Amazing blog.Thanks for sharing such excellent information with us. keep sharing...
    data analytics courses delhi

    ReplyDelete
  7. I have no words for this amazing post. Really good information. As we know mobile app industry is increasing day by day. People are planning to move into mobile technology, but they are not sure about the best mobile app development framework. In this case, I want you to get the perfect knowledge of app framework. Thanks!

    ReplyDelete

  8. I know this is an amazing post, it defines the true value of your knowledge. In fact, running a business is not common. People keep running to drive more business and generate more customers. At RisingMax which is best IT consulting companies in NYC, you can maintain a leading position with real estate software development in New York. keep it up. I really think this article is amazing, I can't describe it in words. Also, if you need an automotive software development service, do not delay in shaking hands with RisingMax.

    ReplyDelete
  9. This comment has been removed by the author.

    ReplyDelete
  10. Hi, I enjoyed your article. if your Looking to supply your liquor, beer, or wine supply on the occasion but don't want to leave the house?If that's the case, an Ready made Alcohol delivery app is a great option for you! With an on-demand alcohol delivery app, you can get your favored liquors, beers, or drinks delivered to your door with a single finger of your finger.

    ReplyDelete
  11. Once again you provide several doses of reality which explore the complete explanation of packing and moving companies in Bangalore . This article don't have to be that long. I simply couldn't leave your web site before suggesting that I actually loved the usual info on packing and movers services in Bangalore. I just want to know what is the best way to get real service

    ReplyDelete
  12. Since its launch, RisingMax has been following the principles of building and implementing great ideas, empowering customers' businesses and improving their lives through innovative enterprise solutions. Our team at RisingMax is distinguished by the cross-technology imagination, knowledge and experience we bring to every project we deal with. We understand the importance of nurturing interpersonal relationships.

    ReplyDelete
  13. Mobile app development is a lucrative and in-demand job path. Enroll in an advanced program in Android app development to determine if you're cut out for a future as a mobile app developer. Suffescom Solutions should also be aware of the App Store Optimization procedure, which is critical if you want to be found by consumers looking for apps that are comparable to yours.

    ReplyDelete
  14. Wow, that was a fantastic article. I took the time to read it. It's a tremendous resource! The Netflix clone is an excellent way to provide your users with a customized video-streaming platform that includes entertaining video content. Suffescom Solutions has grown to become the largest entertainment platform provider by providing streaming services like Netflix Clone script and various other customized content.

    ReplyDelete
  15. Thank you for providing the best information regarding mobile apps. It is very user-friendly and covers every aspect of food delivery app development.  Since technology is evolving especially in terms of the food business. The rise of mobile app development is also achieving new heights. Here the Suffescom solutions provide all sorts of information regarding mobile app development under one roof.

    ReplyDelete
  16. Nice article! Thanks for sharing so much good information on this topic, I have read your whole blog which is very helpful for beginners. Discord Clone App is a platform for Voice-over IP, Instant Messaging, and Digital Distribution. In private chat or as part of a "server", users communicate with voice calls, video calls, text messaging, media, and files.

    ReplyDelete
  17. Thank you for posting this awesome article. I’m a long time reader but I’ve never been compelled to leave a comment. If you are interested in Android Application Development Company or want to discuss the importance of Mobile Application in the present scenario, contact anytime.
    You can also contact here, if you are looking forward to Hire Android App Developers
    best Mobile app development company

    ReplyDelete
  18. You can create a TRC20 or TRC721 Token through our Tron token development services and they will work seamlessly with their Ethereum counterparts.

    ReplyDelete
  19. The Token Migration platform helps projects upgrade their old tokens to new tokens. In essence, if your old token has any flaws or if you want to add some additional features, you can create an entirely new token and users can swap their old tokens with your new tokens through the migration contract.

    ReplyDelete
  20. Create your own DEX with like Uniswap our Uniswap clone script. Get decentralized exchange development from BlockchainX experts.Uniswap clone script

    ReplyDelete
  21. The migration platform works similarly to a DEX without the Automated Market Maker system. The New or V2 tokens are stored inside the migration smart contract and users can simply connect their web3 wallets to trustlessly migrate their tokens.Token Migration

    ReplyDelete
  22. Tron token development is your best choice if you want all the functionalities of Ethereum, without the outrageous gas fees. Our Tron token development services allow you to create and deploy tokens on the tron network in minutes

    ReplyDelete
  23. Connect yourself with technology by reading the information provided by Apps For Startup Stay connected with it as it provides the best information on technology.

    ReplyDelete

Post a Comment

Popular posts from this blog

Different ways of Transposing a Dataframe in Pyspark

When I have started coding on transposing Dataframes, I found below different methods.  I am sharing all those info here. Creation of a test Input Dataframe to be Transposed ds = {'one':[0.3, 1.2, 1.3, 1.5, 1.4, 1.0],'two':[0.6, 1.2, 1.7, 1.5, 1.4, 2.0]} df = sc.parallelize([ (k,) + tuple(v[0:]) for k,v in ds.items()]).toDF() df.show() method 1 (This method involves conversion of spark object to a python object (rdd to list of tuples of entire data)): inp_list=df.rdd.map(tuple).collect() # Creating a list of tuples of rows # Unpacking the list and zipping all tuples together except the header #list(zip(*inp_list))[1:] is having the transposed tuples  and list(zip(*inp_list))[0] is having the header df_transpose=spark.createDataFrame(list(zip(*inp_list))[1:],list(zip(*inp_list))[0])  df_transpose.show() method 2 (In this method only header data we are converting into python list. Rest of the transformations are carried out as...

Spark (With Python) : map() vs mapPartitions()

As a Beginner in spark, many developers will be having confusions over map() and mapPartitions() functions. This article is an attempt to resolve the confusions This blog is for : pyspark (spark with Python) Analysts and all those who are interested in learning pyspark. Pre-requesties: Should have a good knowledge in python as well as should have a basic knowledge of pyspark functions. If you use map() over an rdd , the function called  inside it will run for every record .It means if you have 10M records , function also will be executed 10M times. This is expensive especially when you are dealing with scenarios involving database connections and querying data from data base. Lets say inside map function, we have a function defined where we are connecting to a database and querying from it. Lets say our RDD is having 10M records. Now this function will execute 10M times which means 10M database connections will be created . This  is very expensive. In these kind ...