Custom Python App on EMR Serverless

Environment

  • Python 3.9.9
  • EMR Serverless: 6.13
  • TensorFlow: 2.11

Reference

  • https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/using-python-libraries.html
  • https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/jobs-spark.html
  • https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/using-python.html

I had to jump through a few hoops to get a PySpark application running on EMR Serverless. Below are the steps I followed, along with final functioning configuration, and at the bottom of this post is a few errors I encountered along the way.

Steps

1. Setup Build Environment

For a packaged application to work it must be built in an environment very similar to that of EMR Serverless; specifically, Amazon Linux 2. Plenty of mention is made online about using platform=linux/arm64 amazonlinux:2 to achieve this in Docker. I could not get this to work – when attempting to output the image it would just hang forever – I suspect because I’m on OSX, so I ended up spinning up an EC2 instance for my build environment, based on Amazon Linux 2 image.

2. Setup and Run Build Script

Mine was almost identical to the one found here, just with a few tweaks. Place your project requirements.txt in your build environment working directory and:

sudo yum install -y gcc openssl-devel bzip2-devel libffi-devel tar gzip wget make xz-devel lzma

wget https://www.python.org/ftp/python/3.9.9/Python-3.9.9.tgz && \
    tar xzf Python-3.9.9.tgz && \
    cd Python-3.9.9  && \
    ./configure --enable-optimizations && \
    sudo make altinstall

sudo yum install -y python3-pip

# Create python venv with Python 3.9.9
python3.9 -m venv pyspark_venv_python_3.9.9 --copies

# copy system python3 libraries to venv
cp -r /usr/local/lib/python3.9/* ./pyspark_venv_python_3.9.9/lib/python3.9/

# package venv to archive.
# **Note** that you have to supply --python-prefix option
# to make sure python starts with the path where your
# copied libraries are present.
# Copying the python binary to the "environment" directory.
source pyspark_venv_python_3.9.9/bin/activate && \
    pip install venv-pack && \
    pip install -r requirements.txt

sudo mkdir -p /home/hadoop/environment
source pyspark_venv_python_3.9.9/bin/activate &&  \
    venv-pack -f -o pyspark_venv_python_3.9.9.tar.gz --python-prefix /home/hadoop/environment

# You'll need to reference this path/file in your EMR Serverless job config
aws s3 cp pyspark_venv_python_3.9.9.tar.gz s3://<path_to>/<project_artifacts>/

3. Align Python Lib with EMR Requirements

If you’re smart, you started out with EMR-capable lib versions and worked backward from there. If, like me, you were handed a project where this was not the case, you’ll likely have to backoff dependency versions to make them compatible with EMR Serverless.

4. Zip and Upload Custom Python Modules

  • From directory containing your application code: zip -r my_custom_modules my_custom_modules/
  • aws s3 cp my_custom_modules.zip s3://<path_to>/<project_artifacts>/my_custom_modules.zip

4. Configure EMR Serverless

  • In EMR Studio, create an application. I allowed it create and use a default IAM role.
  • Upload your entry point script to S3 and define it under ‘Script location’.
  • Add any script arguments you need to pass (and your app is prepared to parse).
  • I landed on the following Spark properties in order to get the job to run:
    --conf spark.archives=s3://<path_to>/<project_artifacts>/pyspark_venv_python_3.9.9.tar.gz#environment
    --conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python
    --conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python
    --conf spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python
    --conf spark.submit.pyFiles=s3://<path_to>/<project_artifacts>/my_custom_modules.zip
    --conf spark.files=s3://<path_to>/<project_artifacts>/some_other_file.yml, s3://<path_to>/<project_artifacts>/second_other_file.yml

Errors

Encountered along the way.

ErrorResolution
Traceback (most recent call last): File "/home/hadoop/environment/lib/python3.9/site-packages/fastavro/read.py", line 2, in <module> from . import _read File "fastavro/_read.pyx", line 11, in init fastavro._read File "/home/hadoop/environment/lib/python3.9/lzma.py", line 27, in <module> from _lzma import * ModuleNotFoundError: No module named '_lzma'In build environment:
sudo yum install lzma
ModuleNotFoundError: No module named '<my custom modules>'These steps, from the base of your application code:
* zip -r my_custom_modules.zip my_custom_modules/
* upload zip file to s3 bucket
* add to your job spark properties: --conf spark.submit.pyFiles=s3://<path_to>/my_custom_modules.zip
ImportError: urllib3 v2.0 only supports OpenSSL 1.1.1+, currently the 'ssl' module is compiled with...Downgraded to urllib3=1.26.6
ImportError: cannot import name 'builder' from 'google.protobuf.internal' (/home/hadoop/environment/lib/python3.9/site-packages/google/protobuf/internal/__init__.py)Fetch latest version – e.g., from fully-updated installation – of protobuf’s ‘builder.py’ to your project’s Python packages at Lib/site-packages/google/protobuff/internal. See here for details.
Docker build env: “copying <n> files…” Never finishes.I had to abandon a Dockerized Amazon Linux 2 build environment, I suspect it had something to do with my Apple silicon. I ended up spinning up a VM on AWS and using their Amazon Linux 2.

Posted on

Default Argument Value Does Not Refresh Between Function Calls

Something struck me as unexpected today while working in Python. I had a function to take a datetime object and convert it into epoch milliseconds:

import datetime
import time

this_tz = 'US/Eastern'

def get_epch_ms(dttm=datetime.datetime.now(pytz.timezone(this_tz))):
    # Returns milliseconds since epoch for datetime object passed.
    # If no argument is passed, uses *now* as time basis.
    # DOES NOT APPEAR TO REFRESH 'dttm' BETWEEN EXECUTIONS.

    return int(time.mktime(dttm.astimezone(pytz.timezone(this_tz)).timetuple()) * 1000.0 + round(dttm.microsecond / 1000.0))

This function works fine: call it with get_epch_ms() and the epoch millisecond value for *now* is returned; however, I noticed during subsequent calls to the function within the same execution of the broader application that the value of dttm did not update each time. I.e., it appears as if the logic used to populate a default value – dttm=datetime.datetime.now(pytz.timezone(this_tz)) – was executed only during the first call to the function, and that same value was used for subsequent calls. It took me a bit to track this down, not sure if it’s just something I’ve never come up against before.

The fix is simple enough, though involved a couple of additional lines of code:

import datetime
import time

this_tz = 'US/Eastern'

def get_epch_ms(dttm=None):
    # Returns milliseconds since epoch for datetime object passed.
    # If no argument is passed, uses *now* as time basis.
    # Refreshes 'dttm' between calls to this function.

    if dttm is None:
        dttm = datetime.datetime.now(pytz.timezone(this_tz))

    return int(time.mktime(dttm.astimezone(pytz.timezone(this_tz)).timetuple()) * 1000.0 + round(dttm.microsecond / 1000.0))

The updated function properly provides an updated timestamp at each invocation, when called as get_epch_ms().

Posted on