ETL timeouts and compression


#1

Is OICR using LZO compression in the ETL pipeline for spark and HDFS? I’m getting heartbeat timeouts in SUMMARIZE steps. I’ve increased the Spark cluster to 6 workers with 26GB each. Doubled timeouts to

spark.executor.heartbeatInterval 20s
spark.network.timeout 240s

and re-running now. Would LZO compression help?

Thanks,
Brian K.


#2

Hi @kibri,

Do you have a stack trace we can look at? My feeling here is that a grouping is too large for a single node’s memory (probably counting the number of mutations a donor has).

How many workers per node? Do you 26GB for each of those, or for the machine as a whole? Have you verified that the max heap is set correctly using jps -lvm?


#3

Thanks, Bob. I’ve seen different variations of timeouts this week. There’re no errors on the worker side, but the dcc-release client and spark master show timeouts. Here’s the latest in the dcc-release client:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5736.0 failed 0 times, most recent failure: Lost task 0.0 in stage 5736.0 (TID 5018, dcc-spark-worker-2): ExecutorLostFailure (ex
ecutor 1 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)

There is one worker per node. The -Xmx param on the JVM command line is at 26GB. I’ll retry today and check the workers using jps.


#4

Hey @kibri

Just so you have a reference, I was seeing similar things a little while ago on our 10 node test cluster until I upped the memory.

The relevant section from the application.yml for dcc-release I am using now:

spark.executor.memory: "27g"
spark.network.timeout: 1800

#5

Thanks, I’m setting up a 10 node cluster with the memory and timeouts you gave. We’ll know in a few hours how it goes!


#6

Same failure. I had 10 worker nodes and the spark UI showed that each had 27 GB.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5714.0 failed 0 times, most recent failure: Lost task 0.0 in stage 5714.0 (TID 4984, dcc-spa
rk-worker-8): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or networ
k issues. Check driver logs for WARN messages.

I’m checking the worker logs to see if there is any more information there.

Brian K.


#7

jps shows “-Xms8m”, but the command lines for the workers show -Xms27648M for the CoarseGrainedExecutorBackend and -Xms30g for Worker.

ubuntu@dcc-spark-worker-1:~$ jps -lvm
25553 sun.tools.jps.Jps -lvm -Dapplication.home=/usr/lib/jvm/java-8-oracle -Xms8m

root 24380 0.0 2.0 36891920 624756 ? Sl Jan30 1:00 /usr/lib/jvm/java-8-oracle/jre/bin/java -cp /srv/spark-1.6.1-bin-spark-1.6.1-bin-2.5.0-mr1-cdh5.3.1/lib/hadoop-lzo.jar:/srv/spark-1.6.1-bin-spark-1.6.1-bin-2.5.0-mr1-cdh5.3.1/conf/:/srv/spark-1.6.1-bin-spark-1.6.1-bin-2.5.0-mr1-cdh5.3.1/lib/spark-assembly-1.6.1-hadoop2.5.0-mr1-cdh5.3.1.jar:/etc/hadoop/conf -Xms30g -Xmx30g org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://10.60.60.55:7077
root 25378 30.0 5.1 34128120 1593696 ? Sl 17:13 0:24 /usr/lib/jvm/java-8-oracle/jre/bin/java -cp /srv/spark-1.6.1-bin-spark-1.6.1-bin-2.5.0-mr1-cdh5.3.1/lib/hadoop-lzo.jar:/srv/spark-1.6.1-bin-spark-1.6.1-bin-2.5.0-mr1-cdh5.3.1/conf/:/srv/spark-1.6.1-bin-spark-1.6.1-bin-2.5.0-mr1-cdh5.3.1/lib/spark-assembly-1.6.1-hadoop2.5.0-mr1-cdh5.3.1.jar:/etc/hadoop/conf -Xms27648M -Xmx27648M -Dspark.driver.port=45111 org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.60.60.55:45111 --executor-id 6 --hostname 10.60.60.215 --cores 6 --app-id app-20170131170959-0005 --worker-url spark://Worker@10.60.60.215:53034


#8

I tried running 31 projects, but the workflow failed in JOIN with the “Remote RPC client disassociated.” Which is unexpected, since I ran 70 projects through JOIN successfully before. Do you have any other suggestions on tuning parameters, or on monitoring spark workers?


#9

There is a bit of a data skew in terms of how large certain donors can get. So in theory you can run without most of the projects and still hit those big donors and fail at the JOIN job.

As for why you are seeing these failures, I’m not sure.

When you added the extra nodes to the spark cluster, did you redistribute the data on HDFS?
The command for that I believe is $ hdfs balancer and you can set the bandwidth for the balancer with $ hdfs dfsadmin -setBalancerBandwidth <bandwidth in bytes per second>.


#10

The other possibility is you are running our of memory because while 27-30GB should be sufficient for an executor you could be running multiple worker instances and thus multiple executors.

What does your spark-env.sh look like?

Also any information about the configuration of your cluster would be helpful, both physical and logical configurations.


#11

Good morning. I balanced HDFS and enabled HDFS caching on our OHSU project, but still got the same failure. We were speculating that our own data is causing the bottleneck after you suggested that it’s the characteristics of donors that increase the work size, so I thought caching our project files might help. Caching the project source files may not make a difference. Maybe I should cache our project files produced by JOIN? Here are the settings in spark-env.sh:

export SPARK_PRINT_LAUNCH_COMMAND=1
export SPARK_DAEMON_MEMORY=30g
export SPARK_WORKER_INSTANCES=1

Here is spark-defaults.conf:

spark.eventLog.enabled true
spark.eventLog.dir /tmp/spark/eventlog
spark.executor.heartbeatInterval 20s
spark.network.timeout 1800s

I used the driver Web UI to look at the environment variables while running, and these two might not be good choices:

spark.default.parallelism: 1
spark.task.maxFailures: 0

The configuration I have is 10 spark workers, 1 HDFS namenode, and 2 HDFS datanodes, all as OpenStack VMs. The specs are

Spark Worker: RAM 30GB, 6 VCPU, Disk 125GB
HDFS nodes: RAM 16GB, 8 VCPU, Disk 160GB


#12

Hey @kibri,

Your spark workers should be colocated with your HDFS nodes. Every worker should be on a datanode. Spark uses the the knowledge of where your data lives to schedule the tasks in order to achieve the performance benefits of data locality.

What I now think is happening is that your HDFS cluster or network is under intense load from trying to stream this data from the datanodes to your workers when generally your workers and datanode should be the same machine/VM. Your timeouts are then likely caused by the namenode, datanode, or network being unable to keep up.

In the cluster I described that I am using, it’s 10 spark&datanode VMs + 1 NameNode VM with the dcc-release client.


#13

Aha, that would explain it! I’ll co-locate and try again.


#14

I setup the 10 spark worker - HDFS datanode machines and balanced HDFS. The balance command took 2 hours to re-distributed the files. I re-ran the workflow, but it failed in the JOIN task on RPC “dissassociated” after 1 hour. No error in the logs of the worker that lost RPC. The namenode was on a separate machine, so I will move it to the dcc-release client machine. I didn’t set the balancer bandwidth yet, so I’ll do that for the next run.


#15

Dusan, I was able to get 10 projects all the way through the pipeline after I fixed the configuration of HDFS nodes. I still get RPC errors on larger sets, but the pipeline runs faster when there’s less network traffic, so I think I may just be running up against our network capacity at the failures. I’m going to find out what I can use to measure the network traffic next. Thanks for your help. The last failure I had was due to a misconfiguration of elasticsearch heap, so still finding the bottlenecks.


#16

I’m glad I could help.

One last piece of advise just in case you have not come across it, but you don’t want to give all the memory a server has to Elasticsearch as Lucene makes heavy use of the file system caching:

https://www.elastic.co/guide/en/elasticsearch/guide/current/heap-sizing.html#_give_less_than_half_your_memory_to_lucene