How to setup auto scalable Google Dataproc cluster?

Jiten Savla
6 min readSep 24, 2020

Introduction

We have recently migrated our Data Engineering pipeline from multi-threaded Java process deployed on multiple GCP VMs to Hadoop/Spark based ecosystem. One the critical requirements for us was ability to auto scaling the multi node cluster on demand to complete the data engineering pipeline in shortest possible time span with optimal utilization of resources. Being Google shop, we first experimented with Google’s Dataproc which is a managed Spark and Hadoop service that helps you create clusters quickly, manage them easily, and save money by turning clusters off when you don’t need them. You can learn more about Dataproc here.

Creating Dataproc Cluster

For the running the dataproc in auto scaling cluster, we need to create a data proc cluster using the admin console of GCP. We have described an cluster selection below which we selected for our PoC.

Cluster Mode

There are 3 cluster modes available: Single Node (1 master, 0 workers), Standard (1 master, N workers) & High Availability (3 masters, N workers). Auto scaling doesn’t work with Single Node (1 master, 0 workers) mode. High Availability (3 masters, N workers) mode is useful for production grade setup but over kill for our PoC hence we selected Standard (1 master, N workers).

Cluster Nodes

For the Proof of Concept (PoC), we decide to start small and hence we went with 1 master, 2 workers where each node is of type “n1-standard-2 (2 vCPU, 7.5 GB memory)”.

Master node configuration
Worker node configuration

Notes:
Worker nodes created as part of the cluster creation also double up as data node and hence disk size needs to factor that.

In addition to primary workers, cluster can also have preemptible workers or secondary workers. Unlike primary workers, secondary workers do not store data. Therefore, you can use secondary workers to scale compute without scaling storage. We decided to not use secondary works for our PoC and left it for a separate story in the future :-).

Since the data node could have high I/O ops, we can select “SSD persistent disk” as disk type but for PoC, we went with “Standard persistent disk”. Please consider “SSD persistent disk” for production grade setup.

Yarn cores & Yarn memory

Auto scaling in dataproc works on the availability of Yarn memory and hence its important to understand the Yarn cores & memory.

Yarn cores & memory

Yarn memory = no of workers * 0.8 * RAM per node = 2 * 0.8 * 7.5 GB = 12 GB

Enabling auto scaling policy

We can select auto scaling policy while creating cluster.

Enabling auto scaling policy

Alternatively, we can add auto scaling policy to a cluster via command line (online edit option is not available once the cluster is created).

gcloud dataproc clusters update cluster-name \
--autoscaling-policy=policy id or resource URI \
--region=region

Auto scaling policy

Auto scaling policy is a definition of when and how to auto scale the dataproc cluster. One auto scaling policy can applied to multiple clusters. When creating the policy following the important configurations.

Worker configuration

Worker configuration help in controlling how many worker will be spawned when auto scaling threshold has been passed. Separate setting for Primary & Secondary workers is available. Since we had not used secondary worker, we didn’t provide configuration for them.

Min instances & max instances help control minimum & maximum worker nodes that can be spawned while auto scaling. This can help control an uncontrolled spawning on workers.

Weight for the instance group is used to determine the fraction of total workers in the cluster from this instance group. For example, if primary workers have weight 2, and secondary workers have weight 1, the cluster will have approximately 2 primary workers for each secondary worker.

Weight wasn’t applicable for the PoC as there no secondary workers in the cluster.

Worker configuration

Cooldown duration

Cooldown duration is time elapsed before the cooldown is applied to the cluster. The minimum and default cooldown duration is two minutes. If a shorter cooldown duration is set in a policy, workload changes will more quickly affect cluster size, but clusters may unnecessarily scale up and down.

We went the default configuration for the PoC.

Cooldown duration

Yarn auto scaling configuration

Following settings are used as a guiding factor for the resource manager Yarn to assess if the auto scaling threshold has been reached and how many workers need to spawned.

Summary of the configuration

Scale up factor: Fraction of average pending memory in the last cooldown period for which to add workers. A scale-up factor of 1.0 will result in scaling up so that there is no pending memory remaining after the update (more aggressive scaling). A scale-up factor closer to 0 will result in a smaller magnitude of scaling up (less aggressive scaling). We decided to experiment with the most aggressive scaling up.

Scale up min worker fraction: Minimum scale-up threshold as a fraction of total cluster size before scaling occurs. For example, in a 20-worker cluster, a threshold of 0.1 means the autoscaler must recommend at least a 2-worker scale-up for the cluster to scale. A threshold of 0 means the autoscaler will scale up on any recommended change. We decided to experiment with the letting autoscaler recommend the change to the cluster.

Scale down factor: Fraction of average available memory in the last cooldown period for which to remove workers. A scale-down factor of 1 will result in scaling down so that there is no available memory remaining after the update (more aggressive scaling). A scale-down factor of 0 disables removing workers, which can be beneficial for autoscaling a single job. We decided to experiment with the most aggressive scaling down.

Scale up min worker fraction: Minimum scale-down threshold as a fraction of total cluster size before scaling occurs. For example, in a 20-worker cluster, a threshold of 0.1 means the autoscaler must recommend at least a 2 worker scale-down for the cluster to scale. A threshold of 0 means the autoscaler will scale down on any recommended change. We decided to experiment with the letting autoscaler recommend the change to the cluster.

Yarn auto scaling configuration

Auto scaling Limitation

One important note regarding the auto scaling of dataproc is that it has limitation of scaling based on memory requests and NOT cores requests. By default, YARN finds slots for containers based on memory requests and ignores core requests entirely. So in the default configuration, dataproc only needs to auto scale based on YARN pending/available memory.

There are definitely use cases where you want to oversubscribe YARN cores by running more containers. For example your default distcp configs might have 8 low-memory containers running on a node manager even if you have only 4 physical cores. Each distcp task is largely I/O bound and doesn’t take much memory. So GCP thinks that leaving the default of only scheduling based on memory is reasonable.

There is StackOverflow post regarding it here.

Conclusion

In this part, we have setup the cluster & auto scaling policy. In the next part we shall look at submitting job on this cluster, inducing load with multiple jobs and monitoring how cluster auto scales up & down.

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Jiten Savla
Jiten Savla

Written by Jiten Savla

Founder @ Zebience | Product enthusiast | Software craftsman | Traveler | Amatuer Photographer

Responses (1)

Write a response

This is very nicely explained. Thank you very much !

--