kokobob.com

Understanding the Constraints of AWS Glue Job Bookmarks

Written on

Chapter 1: Introduction to AWS Glue

AWS Glue serves as a serverless solution primarily designed for executing extract, transform, and load (ETL) processes. It allows data practitioners to utilize its capabilities without needing in-depth knowledge of cluster architecture. At its core, AWS Glue leverages Apache Spark, accessible via the Python PySpark API or Scala. It facilitates the movement and cataloging of large datasets, whether through Glue crawlers or code-based approaches, enabling the addition of new data partitions, tables, and databases. Glue jobs can source data from platforms like Amazon RDS and Amazon S3 while targeting others, such as Amazon Redshift. By utilizing Glue, data experts can create comprehensive workflows to orchestrate data-driven pipelines, employing infrastructure as code (IaC) tools like Terraform and AWS CloudFormation, along with seamless integration through AWS CodeBuild and AWS CodePipeline.

Chapter 1.1: Glue Job Bookmarks Explained

Data practitioners can opt for either full or incremental data loading methods. Incremental data loading is particularly beneficial for large and continuously growing datasets, often seen in transactional databases as businesses expand. Conversely, full data loading is suitable for smaller datasets that are easy to synchronize. Job bookmarks can be configured to monitor which data has already been processed during ETL for incremental loads. Moreover, developers can implement code-based solutions to track changes in job source records. Fortunately, setting up bookmarks through Glue's job configuration is straightforward.

Section 1.1.1: Code Configuration for Job Bookmarks

To configure bookmarks at the code level, one must include “job.init” at the beginning, “job.commit()” at the end, and specify “transformation_ctx” for the various stages of the job, along with “jobBookmarkKeys” and “jobBookmarkKeysSortOrder” when invoking the source.

#!/usr/bin/env python

# -- coding: utf-8 --

# Importing libraries

import sys

from awsglue.transforms import *

from awsglue.utils import getResolvedOptions

from pyspark.context import SparkContext

from awsglue.context import GlueContext

from awsglue.job import Job

# Params configuration

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()

glueContext = GlueContext(sc)

spark = glueContext.spark_session

job = Job(glueContext)

job.init(args['JOB_NAME'], args)

# Data source

datasource0 = glueContext.create_dynamic_frame

.from_catalog(database="database",

table_name="relatedqueries_csv",

transformation_ctx="datasource0",

additional_options={"jobBookmarkKeys":["col0"],

"jobBookmarkKeysSortOrder":"asc"})

# Data entries map

applymapping1 = ApplyMapping.apply(frame=datasource0,

mappings=[("col0", "long", "id", "long"),

("col1", "string", "number", "string"),

("col2", "string", "name", "string")

],

transformation_ctx="applymapping1")

# Data target

datasink2 = glueContext.write_dynamic_frame

.from_options(frame=applymapping1,

connection_type="s3",

connection_options={"path": "s3://input_path"},

format="json",

transformation_ctx="datasink2")

job.commit()

Section 1.2: Configuring Job Properties

Bookmarks can be activated by adjusting job properties in the AWS Glue console, as depicted in the accompanying image.

Configuring Job Properties in AWS Glue Console

Chapter 2: Challenges with AWS Glue Job Bookmarks

In the first video, you will learn about utilizing job bookmarks in AWS Glue jobs, providing insights into best practices for effective implementation.

The second video explains what job bookmarks are in AWS Glue, detailing their purpose and advantages.

Bookmark Limitations

While AWS Glue offers robust functionalities, it is not without limitations. From my practical experience, I have encountered challenges when working with AWS Glue job bookmarks, and I aim to share my insights to help others avoid similar frustrations.

Using JDBC Connections

As previously mentioned, Glue jobs can source data from various repositories, with Amazon RDS and Amazon S3 being common choices. Bookmarks function effectively when ingesting data from files or objects stored in S3 buckets, regardless of whether records are new or have been updated since the last job execution. However, when jobs source data from RDS-hosted transactional databases, bookmarks may not capture records that have been modified post the last job run, even if they were initially captured.

Data Source and Record State Analysis

Addressing this issue is straightforward, considering that AWS Glue operates on Spark. For instance, if a job reads from the “projects” table in an RDS database, which includes a timestamp field like “updated_at” that updates with each record modification, this field can serve as our incremental data load bookmark. We can create a Python function, like the one below, to check the last job run timestamp stored in a JSON file in the S3 bucket "my-ancillary-bucket" and formulate a SQL statement that filters records based on the bookmark.

#!/usr/bin/env python

# -- coding: utf-8 --

# Importing libraries

import boto3

import datetime

import json

import botocore

def checkingLastJobRunning(FileKey, jobBookmarkKeys):

'''

Function to check the last running date for the Glue job

Args:

  • FileKey (string): Key for the object in S3 containing the last run timestamp
  • jobBookmarkKeys (string): Name for the job bookmark key

Output:

  • FilterStatement (string): Query statement

'''

# Checking current running timestamp

now_timestamp = datetime.datetime.utcnow()

.strftime("%Y-%m-%d %H:%M:%S.%f")

to_json = {

'previous_timestamp': now_timestamp

}

# Create a S3 client

s3 = boto3.resource('s3')

# Checking last running timestamp for the job

try:

content_object = s3.Object('my-ancillary-bucket', FileKey)

file_content = content_object.get()['Body'].read().decode('utf-8')

json_content = json.loads(file_content)

previous_timestamp = json_content['previous_timestamp']

symbol = '>'

except botocore.exceptions.ClientError:

previous_timestamp = now_timestamp

symbol = '<='

# Save new run timestamp

s3.Bucket('my-ancillary-bucket').put_object(Key=FileKey,

Body=(bytes(json.dumps(to_json).encode('utf-8'))))

# Filter statement

FilterStatement = f"WHERE {jobBookmarkKeys} ::timestamp {symbol} '{previous_timestamp}' :: timestamp"

return FilterStatement

Now, the above function can be utilized to formulate the SQL query that filters records from the "projects" table. By leveraging a JDBC connection and a Spark session (using a PostgreSQL driver in this example), a Spark DataFrame can be created containing entries made or altered since the last job execution.

#!/usr/bin/env python

# -- coding: utf-8 --

# Importing libraries

import sys

from pyspark.conf import SparkConf

from pyspark.context import SparkContext

from awsglue.utils import getResolvedOptions

from awsglue.context import GlueContext

from awsglue.job import Job

# Add additional spark configurations

conf = SparkConf() Any additional spark configuration you need --------------------------------------------------------

# Initial configuration

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

glueContext = GlueContext(SparkContext.getOrCreate(conf=conf))

sparkSession = glueContext.spark_session

job = Job(glueContext)

job.init(args['JOB_NAME'], args)

# Checking filter statement

filter_statement = checkingLastJobRunning('my-previous-job-run.json',

'updated_at')

# RDS table to query

tablename = "projects"

# Data source

datasource0 = sparkSession.read.format("jdbc").

options(

url=f'jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}',

dbtable=f"(SELECT * FROM {tablename} {filter_statement}) as subquery",

user=DB_USER,

password=DB_PASSWORD,

driver='org.postgresql.Driver').

load()

Maximum Size Constraints for Bookmarks

One notable limitation with Glue bookmarks is the maximum state size, which is capped at 400 KB. If you encounter an error message such as:

com.amazonaws.services.gluejobexecutor.model.InvalidInputException: Entity size has exceeded the maximum allowed size

when using Amazon CloudWatch logs or event notifications via Amazon SNS topics, consider refining your jobs with multiple stages by omitting the “transformation_ctx” parameters that differ from the source and target. Remember that a job bookmark comprises states for various components like sources, transformations, and targets. Additionally, when utilizing S3 connection types, adjusting the “maxFilesInBand” parameter can help minimize the bookmark state size. This parameter determines the maximum number of files retained during the last maxBand seconds; if the limit is surpassed, excess files will be excluded and addressed in the subsequent job execution. By default, “maxFilesInBand” is set to 1000, allowing the bookmark state to store up to 1000 S3 prefixes. This can be modified to 0 to resolve the issue.

Conclusion

In this article, we explored the drawbacks associated with AWS Glue job bookmarks. We discussed why bookmarks may fail when transferring data from JDBC-connected sources and examined the constraints related to the maximum size of job run states, along with potential solutions. I hope you found this post enlightening, and feel free to reach out with any questions.

If you enjoyed this content, consider following the Globalwork tech team on Medium, where we share actionable insights and tips from our experience in developing digital products.

Stay updated with my posts on Medium for more thought-provoking material. Don’t forget to clap for this publication and share it with your colleagues!

Visual Overview of AWS Glue Job Bookmarks

Share the page:

Twitter Facebook Reddit LinkIn

-----------------------

Recent Post:

# The Rise of Smart Biosensors: Innovations in Health Tech

Smart biosensors are transforming health tech, enabling continuous monitoring of vital signs and disease markers, especially in the context of COVID-19.

Nuclear Solutions for Asteroid Threats: A Cosmic Dilemma

Exploring the effectiveness of nuclear bombs versus innovative propulsion technologies in averting asteroid collisions with Earth.

Navigating Life's Challenges: Embrace a Calm Perspective

A reflective approach to handling life's problems through mindfulness and perception.