Run faster and more cost-effective Dataproc jobs

Dataproc is a fully managed service for hosting open-source distributed processing platforms such as Apache Hive, Apache Spark, Presto, Apache Flink, and Apache Hadoop on Google Cloud. Dataproc provides flexibility to provision and configure clusters of varying sizes on demand. In addition, Dataproc has powerful features to enable your organization to lower costs, increase performance and streamline operational management of workloads running on the cloud. Dataproc is an important service in any data lake modernization effort. Many customers begin their journey to the cloud by migrating their Hadoop workloads to Dataproc and continue to modernize their solutions by incorporating the full suite of Google Cloud’s data offerings.This guide demonstrates how you can optimize Dataproc job stability, performance, and cost-effectiveness. You can achieve this by using a workflow template to deploy a configured ephemeral cluster that runs a Dataproc job with calculated application-specific properties.  Before you beginPre-requisitesA Google Cloud projectA 100-level understanding of Dataproc (FAQ)Experience with shell scripting, YAML templates, Hadoop ecosystemAn existing dataproc application , referred to as “the job” or “the application”Sufficient project quotas (CPUs, Disks, etc.) to create clusters Consider Dataproc Serverless or BigQueryBefore getting started with Dataproc, determine whether your application is suitable for (or portable to) Dataproc Serverless or BigQuery. These managed services will save you time spent on maintenance and configuration. This blog assumes the user has identified Dataproc as the best choice for their scenario. For more information about other solutions, please check out some of our other guides like Migrating Apache Hive to Bigquery and Running an Apache Spark Batch Workload on Dataproc Serverless.Separate data from computationConsider the advantages of using Cloud Storage. Using this persistent storage for your workflows has the following advantages:It’s a Hadoop Compatible File System (HCFS), so it’s easy to use with your existing jobs.Cloud Storage can be faster than HDFS. In HDFS, a MapReduce job can’t start until the NameNode is out of safe mode—a process that can take a few seconds to many minutes, depending on the size and state of your data.It requires less maintenance than HDFS.It enables you to easily use your data with the whole range of Google Cloud products.It’s considerably less expensive than keeping your data in replicated (3x) HDFS on a persistent Dataproc cluster. Pricing Comparison Examples (North America, as of 11/2022):GCS: $0.004 – $0.02 per GB, depending on the tierPersistent Disk: $0.04 – $0.34 per GB + compute VM costsHere are some guides on Migrating On-Premises Hadoop Infrastructure to Google Cloud and HDFS vs. Cloud Storage: Pros, cons, and migration tips. Google Cloud has developed an open-source tool for performing HDFS to GCS.Optimize your Cloud StorageWhen using Dataproc, you can create external tables in Hive, HBase, etc., where the schema resides in Dataproc, but the data resides in Google Cloud Storage. Separating compute and storage enables you to scale your data independently of compute power.In HDFS / Hive On-Prem setups, the compute and storage were closely tied together, either on the same machine or in a nearby machine. When using Google Cloud Storage over HDFS, you separate compute and storage at the expense of latency. It takes time for Dataproc to retrieve files on Google Cloud Storage. Many small files (e.g. millions of <1mb files) can negatively affect query performance, and file type and compression can also affect query performance.When performing data analytics on Google Cloud, it is important to be deliberate in choosing your Cloud Storage file strategy. Monitoring Dataproc JobsAs you navigate through the following guide, you’ll submit Dataproc Jobs and continue to optimize runtime and cost for your use case. Monitor the Dataproc Jobs console during/after job submissions to get in-depth information on the Dataproc cluster performance. Here you will find specific metrics that help identify opportunities for optimization, notably YARN Pending Memory, YARN NodeManagers, CPU Utilization, HDFS Capacity, and Disk Operations. Throughout this guide you will see how these metrics influence changes in cluster configurations.Guide: Run Faster and Cost-Effective Dataproc Jobs1. Getting startedThis guide demonstrates how to optimize performance and cost of applications running on Dataproc clusters. Because Dataproc supports many big data technologies – each with their own intricacies – this guide intends to be trial-and-error experimentation. Initially it will begin with a generic dataproc cluster with defaults set. As you proceed through the guide, you’ll increasingly customize Dataproc cluster configurations to fit your specific workload.Plan to separate Dataproc Jobs into different clusters – each data processing platform uses resources differently and can impact each other’s performances when run simultaneously. Even better, isolating single jobs to single clusters can set you up for ephemeral clusters, where jobs can run in parallel on their own dedicated resources.Once your job is running successfully, you can safely iterate on the configuration to improve runtime and cost, falling back to the last successful run whenever experimental changes have a negative impact.You can export an existing cluster’s configuration to a file during experimentation. Use this configuration to create new clusters through the import command.code_block[StructValue([(u’code’, u’gcloud dataproc clusters export my-cluster \rn –region=region \rn –destination=my-cluster.yamlrnrngcloud dataproc clusters import my-new-cluster \rn –region=us-central1 \rn –source=my-cluster.yaml’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfd8f5f5d0>)])]Keep these as reference to the last successful configuration incase drift occurs. 2. Calculate Dataproc cluster sizea. Via on-prem workload (if applicable)View the YARN UIIf you’ve been running this job on-premise, you can identify the resources used for a job on the Yarn UI. The image below shows a Spark job that ran successfully on-prem.The table below are key performance indicators for the job.For the above job you can calculate the followingNow that you have the cluster sizing on-prem, the next step is to identify the initial cluster size on Google Cloud. Calculate initial Dataproc cluster sizeFor this exercise assume you are using n2-standard-8, but a different machine type might be more appropriate depending on the type of workload. n2-standard-8 has 8 vCPUs and 32 GiB of memory. View other Dataproc-supported machine types here.Calculate the number of machines required based on the number of vCores required.Recommendations based on the above calculations:Take note of the calculations for your own job/workload.b. Via an autoscaling clusterAlternatively, an autoscaling cluster can help determine the right number of workers for your application. This cluster will have an autoscaling policy attached. Set the autoscaling policy min/max values to whatever your project/organization allows. Run your jobs on this cluster. Autoscaling will continue to add nodes until the YARN pending memory metric is zero. A perfectly sized cluster minimizes the amount of YARN pending memory while also minimizing excess compute resources.Deploying a sizing Dataproc clusterExample: 2 primary workers (n2-standard-8)0 secondary workers (n2-standard-8)pd-standard 1000GBAutoscaling policy: 0 min, 100 max.No application properties set.sample-autoscaling-policy.ymlcode_block[StructValue([(u’code’, u’workerConfig:rn minInstances: 2rn maxInstances: 2rnsecondaryWorkerConfig:rn minInstances: 0rn maxInstances: 100rnbasicAlgorithm:rn cooldownPeriod: 5mrn yarnConfig:rn scaleUpFactor: 1.0rn scaleDownFactor: 1.0rn gracefulDecommissionTimeout: 1hr’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfda5cd190>)])]code_block[StructValue([(u’code’, u’gcloud dataproc autoscaling-policies import policy-namern –source=sample-autoscaling-policy.yml \rn –region=regionrnrngcloud dataproc clusters create cluster-name \rn –master-machine-type=n2-standard-8 \rn –worker-machine-type=n2-standard-8 \rn –master-boot-disk-type=pd-standard \rn –master-boot-disk-size=1000GBrn –autoscaling-policy=policy-name rn –region=region’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfc6803ed0>)])]Submitting Jobs to Dataproc Clustercode_block[StructValue([(u’code’, u”gcloud dataproc jobs submit spark \rn –cluster=cluster-name \rn –region=region \rn –jar=<your-spark-jar-path> \rn –properties=’spark.executor.cores=5,spark.executor.memory=4608mb’ \rn — arg1 arg2″), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfe510d7d0>)])]Monitoring Worker Count / YARN NodeManagersObserve the peak number of workers required to complete your job.To calculate the number of required cores, multiply the machine size (2,8,16,etc. by the number of node managers.) 3. Optimize Dataproc cluster configurationUsing a non-autoscaling cluster during this experimentation phase can lead to the discovery of more accurate machine-types, persistent disks, application properties, etc. For now, build an isolated non-autoscaling cluster for your job that has the optimized number of primary workers.Example: N primary workers (n2-standard-8)0 secondary workers (n2-standard-8)pd-standard 1000GBNo autoscaling policyNo application properties setDeploying a non-autoscaling Dataproc clustercode_block[StructValue([(u’code’, u’gcloud dataproc clusters create cluster-name \rn –master-machine-type=n2-standard-8 \rn –worker-machine-type=n2-standard-8 \rn –master-boot-disk-type=pd-standard \rn –master-boot-disk-size=1000GB \rn –region=region \rn –num-workers=x’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfe510d1d0>)])]Choose the right machine type and machine sizeRun your job on this appropriately-sized non-autoscaling cluster. If the CPU is maxing out, consider using C2 machine type. If memory is maxing out, consider using N2D-highmem machine types. Prefer using smaller machine types (e.g. switch n2-highmem-32 to n2-highmem-8). It’s okay to have clusters with hundreds of small machines. For Dataproc clusters, choose the smallest machine with maximum network bandwidth (32 Gbps). Typically these machines are n2-standard-8 or n2d-standard-16.On rare occasions you may need to increase machine size to 32 or 64 cores. Increasing your machine size can be necessary if your organization is running low on IP addresses or you have heavy ML or processing workloads. Refer to Machine families resource and comparison guide | Compute Engine Documentation | Google Cloud for more information.Submitting Jobs to Dataproc Clustercode_block[StructValue([(u’code’, u’gcloud dataproc jobs submit spark \rn –cluster=cluster-name \rn –region=region \rn –jar=<your-spark-jar-path> \rn — arg1 arg2′), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfd978bf90>)])]Monitoring Cluster MetricsMonitor memory to determine machine-type:Monitor CPU to determine machine-type:Choose the right persistent diskIf you’re still observing performance issues, consider moving from pd-standard to pd-balanced or pd-ssd.Standard persistent disks (pd-standard) are best for large data processing workloads that primarily use sequential I/Os. For PD-Standard without local SSDs, we strongly recommend provisioning 1TB (1000GB) or larger to ensure consistently high I/O performance. Balanced persistent disks (pd-balanced) are an alternative to SSD persistent disks that balance performance and cost. With the same maximum IOPS as SSD persistent disks and lower IOPS per GB, a balanced persistent disk offers performance levels suitable for most general-purpose applications at a price point between that of standard and SSD persistent disks.SSD persistent disks (pd-ssd) are best for enterprise applications and high-performance database needs that require lower latency and more IOPS than standard persistent disks provide.For similar costs, pd-standard 1000GB == pd-balanced 500GB == pd-ssd 250 GB. Be certain to review performance impact when configuring disk. See Configure Disks to Meet Performance Requirements for information on disk I/O performance. View Machine Type Disk Limits for information on the relationships between machine types and persistent disks. If you are using 32 core machines or more, consider switching to multiple Local SSDs per node to get enough performance for your workload. You can monitor HDFS Capacity to determine disk size. If HDFS Capacity ever drops to zero, you’ll need to increase the persistent disk size.If you observe any throttling of Disk bytes or Disk operations, you may need to consider changing your cluster’s persistent disk to balanced or SSD:Choose the right ratio of primary workers vs. secondary workersYour cluster must have primary workers. If you create a cluster and you do not specify the number of primary workers, Dataproc adds two primary workers to the cluster. Then you must determine if you prioritize performance or cost optimization.If you prioritize performance, utilize 100% primary workers. If you prioritize cost optimization, specify the remaining workers to be secondary workers. Primary worker machines are dedicated to your cluster and provide HDFS capacity. On the other hand, secondary worker machines have three types: spot VMs, standard preemptible VMs, and non-preemptible VMs. As a default, secondary workers are created with the smaller of 100GB or the primary worker boot disk size. This disk space is used for local caching of data and do not run HDFS. Be aware that secondary workers may not be dedicated to your cluster and may be removed at any time. Ensure that your application is fault-tolerant when using secondary workers.Consider attaching Local SSDsSome applications may require higher throughput than what Persistent Disks provide. In these scenarios, experiment with Local SSDs. Local SSDs are physically attached to the cluster and provide higher throughput than persistent disks (see the Performance table). Local SSDs are available at a fixed size of 375 gigabytes, but you can add multiple SSDs to increase performance.Local SSDs do not persist data after a cluster is shut down. If persistent storage is desired, you can use SSD persistent disks, which provide higher throughput for their size than standard persistent disks. SSD persistent disks are also a good choice if partition size will be smaller than 8 KB (however, avoid small paritions).Like Persistent Disks, continue to monitor any throttling of Disk bytes or Disk operations to determine whether Local SSDs are appropriate:Consider attaching GPUsFor even more processing power, consider attaching GPUs to your cluster. Dataproc provides the ability to attach graphics processing units (GPUs) to the master and worker Compute Engine nodes in a Dataproc cluster. You can use these GPUs to accelerate specific workloads on your instances, such as machine learning and data processing.GPU drivers are required to utilize any GPUs attached to Dataproc nodes. You can install GPU drivers by following the instructions for this initialization action.Creating Cluster with GPUscode_block[StructValue([(u’code’, u’gcloud dataproc clusters create cluster-name \rn –region=region \rn –master-accelerator type=nvidia-tesla-k80 \rn –worker-accelerator type=nvidia-tesla-k80,count=4 \rn –secondary-worker-accelerator type=nvidia-tesla-k80,count=4rn –initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/gpu/install_gpu_driver.sh’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfda9ecbd0>)])]Sample cluster for compute-heavy workload:code_block[StructValue([(u’code’, u’gcloud dataproc clusters create cluster-name \rn –master-machine-type=c2-standard-30 \rn –worker-machine-type=c2-standard-30 \rn –master-boot-disk-type=pd-balanced \rn –master-boot-disk-size=500GB \rn –region=region \rn –num-workers=10′), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfda9ecb90>)])]4. Optimize application-specific propertiesIf you’re still observing performance issues, you can begin to adjust application properties. Ideally these properties are set on the job submission, isolating properties to their respective jobs. View the best practices for your application below.Spark Job TuningHive Performance TuningTez Memory TuningPerformance and Efficiency in Apache PigSubmitting Dataproc jobs with propertiescode_block[StructValue([(u’code’, u”gcloud dataproc jobs submit spark \rn –cluster=cluster-name \rn –region=region \rn –jar=my_jar.jar \rn –properties=’spark.executor.cores=5,spark.executor.memory=4608mb’ \rn — arg1 arg2″), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfe59c08d0>)])]5. Handle edge-case workload spikes via an autoscaling policyNow that you have an optimally sized, configured, tuned cluster, you can choose to introduce autoscaling. Autoscaling should not be viewed as a cost-optimization technique because aggressive up/down scaling can lead to Dataproc job instability. However, conservative autoscaling can improve Dataproc cluster performance during edge-cases that require more worker nodes.Use ephemeral clusters (see next step) to allow clusters to scale up, and delete them when the job or workflow is complete.Ensure primary workers make up >50% of your cluster.Avoid autoscaling primary workers. Primary workers run HDFS Datanodes, while secondary workers are compute-only workers. HDFS’s Namenode has multiple race conditions that cause HDFS to get into a corrupted state that causes decommissioning to get stuck forever. Primary workers are more expensive but provide job stability and better performance. The ratio of primary workers vs. secondary workers is a tradeoff you can make; stability versus cost.Note: Having too many secondary workers can create job instability. Best practice indicates to avoid having a majority of secondary workers. Prefer ephemeral, non-autoscaled clusters where possible.Allow these to scale up and delete them when jobs are complete.As stated earlier, you should avoid scaling down workers because it can lead to job instability. Set scaleDownFactor to 0.0 for ephemeral clusters.Creating and attaching autoscaling policiessample-autoscaling-policy.ymlcode_block[StructValue([(u’code’, u’workerConfig:rn minInstances: 10rn maxInstances: 10rnsecondaryWorkerConfig:rn maxInstances: 50rnbasicAlgorithm:rn cooldownPeriod: 4mrn yarnConfig:rn scaleUpFactor: 1.0rn scaleDownFactor: 0rn gracefulDecommissionTimeout: 0rnrnrngcloud dataproc autoscaling-policies import policy-namern –source=sample-autoscaling-policy.yml \rn –region=regionrnrngcloud dataproc clusters update cluster-name rn –autoscaling-policy=policy-name rn –region=region’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfe4027710>)])]6. Optimize cost and reusability via ephemeral Dataproc clustersThere are several key advantages of using ephemeral clusters:You can use different cluster configurations for individual jobs, eliminating the administrative burden of managing tools across jobs.You can scale clusters to suit individual jobs or groups of jobs.You only pay for resources when your jobs are using them.You don’t need to maintain clusters over time, because they are freshly configured every time you use them.You don’t need to maintain separate infrastructure for development, testing, and production. You can use the same definitions to create as many different versions of a cluster as you need when you need them.Build a custom imageOnce you have satisfactory cluster performance, you can begin to transition from a non-autoscaling cluster to an ephemeral cluster.Does your cluster have init scripts that install various software? Use Dataproc Custom Images. This will allow you to create ephemeral clusters with faster startup times. Google Cloud provides an open-source tool to generate custom images.Generate a custom imagecode_block[StructValue([(u’code’, u’git clone https://github.com/GoogleCloudDataproc/custom-images.gitrnrncd custom-images || exitrnrnpython generate_custom_image.py \rn –image-name “<image-name>” \rn –dataproc-version 2.0-debian10 \rn –customization-script ../scripts/customize.sh \rn –zone zone \rn –gcs-bucket gs://”<gcs-bucket-name>” \rn –disk-size 50 \rn –no-smoke-test’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfd8526810>)])]Using Custom Imagescode_block[StructValue([(u’code’, u’gcloud dataproc clusters create cluster-name \rn –image=projects/<PROJECT_ID>/global/images/<IMAGE_NAME> \rn –region=region rnrngcloud dataproc workflow-templates instantiate-from-file \rn –file ../templates/pyspark-workflow-template.yaml \rn –region region’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfd8f5f890>)])]Create a Workflow TemplateTo create an ephemeral cluster, you’ll need to set up a Dataproc workflow template. A Workflow Template is a reusable workflow configuration. It defines a graph of jobs with information on where to run those jobs.Use the gcloud dataproc clusters export command to generate yaml for your cluster config:code_block[StructValue([(u’code’, u’gcloud dataproc clusters export my-cluster \rn –region=region \rn –destination=my-cluster.yaml’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfd8f5fe10>)])]Use this cluster config in your workflow template. Point to your newly created custom image, your application, and add your job specific properties.Sample Workflow Template (with custom image)code_block[StructValue([(u’code’, u’—rnjobs:rn – pysparkJob:rn properties:rn spark.pyspark.driver.python: ‘/usr/bin/python3’rn args:rn – “arg1″rn mainPythonFileUri: gs://<path-to-python-script>rn stepId: step1rn placement:rn managedCluster:rn clusterName: cluster-namern config:rn gceClusterConfig:rn zoneUri: zonern masterConfig:rn diskConfig:rn bootDiskSizeGb: 500rn machineTypeUri: n1-standard-4rn imageUri: projects/<project-id>/global/images/<image-name>rn workerConfig:rn diskConfig:rn bootDiskSizeGb: 500rn machineTypeUri: n1-standard-4rn numInstances: 2rn imageUri: projects/<project-id>/global/images/<image-name>rn initializationActions:rn – executableFile: gs://<path-to-init-script>rn executionTimeout: ‘3600s”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfc6650cd0>)])]Deploying an ephemeral cluster via a workflow templatecode_block[StructValue([(u’code’, u’gcloud dataproc workflow-templates instantiate-from-file \rn –file ../templates/pyspark-workflow-template.yaml \rn –region region’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ebfc6650e10>)])]Dataproc Workflow Templates provide a dataproc orchestration solution for use-cases such as:Automation of repetitive tasksTransactional fire-and-forget API interaction modelSupport for ephemeral and long-lived clustersGranular IAM securityFor broader data orchestration strategies, consider a more comprehensive data orchestration service like Cloud Composer.Next stepsThis post demonstrates how you can optimize Dataproc job stability, performance, and cost-effectiveness. Use Workflow templates to deploy a configured ephemeral cluster that runs a Dataproc job with calculated application-specific properties. Finally, there are many ways that you can continue striving for maximum optimal performance. Please review and consider the guidance laid out in the Google Cloud Blog. For general best practices, check out Dataproc best practices | Google Cloud Blog. For guidance on running in production, check out 7 best practices for running Cloud Dataproc in production | Google Cloud Blog.
Quelle: Google Cloud Platform

Published by