Automatizando ETL com AWS Glue, AWS DocumentDB e S3

Alex Pereira Maranhão
Author
September 12, 2024
3
min de leitura

Como arquiteto de software, estou constantemente buscando maneiras de otimizar a performance e reduzir custos das soluções que gerencio. Recentemente, me deparei com a necessidade de expurgar um grande dataset de dados armazenados no Amazon DocumentDB (~1TB). A ideia era transferir esses dados para o AWS S3 em formato Apache Parquet, uma vez que esse formato não só é altamente eficiente em termos de armazenamento, mas também oferece um excelente desempenho para consultas analíticas.

Neste artigo, vamos explorar como configurar e automatizar um pipeline de ETL (extrair, transformar e carregar) usando AWS Glue e Amazon DocumentDB, gravando os dados transformados no Amazon S3 em formato Parquet.

Compartilho aqui a solução que desenvolvi para enfrentar esse desafio.

Contexto

O Amazon DocumentDB é um serviço de banco de dados NoSQL gerenciado compatível com o MongoDB, ele é uma boa escolha para aplicações que precisam de um banco de dados gerenciado com alta disponibilidade e escalabilidade. No entanto, à medida que os dados crescem, os custos de armazenamento podem se tornar um fator crítico. Para mitigar isso, decidi mover os dados históricos para o Amazon S3 em formato Parquet. O Parquet é um formato de armazenamento colunar, ideal para grandes volumes de dados, pois permite compressão eficiente e consultas rápidas.

O AWS Glue é um serviço gerenciado de ETL que facilita a preparação e carga de dados. Utilizaremos o formato de arquivo Parquet, que oferece diversas vantagens em termos de performance e eficiência de armazenamento.

Configuração do Ambiente e Leitura dos Dados do DocumentDB

Configuração do AWS Glue:

  • Criação do Job no AWS Glue: Para começar, é necessário criar um job no AWS Glue, configurando as permissões necessárias no IAM para permitir a leitura dos dados no DocumentDB e a escrita no S3.

Leitura dos Dados do Amazon DocumentDB:

  • Conexão com o DocumentDB: Configuramos a conexão com o Amazon DocumentDB utilizando o URI de conexão.
  • Leitura dos Dados: Utilizamos o PySpark para ler os dados do DocumentDB, aplicando um pipeline para filtrar os documentos pela data desejada.

Transformação e Escrita dos Dados no S3

Transformação dos Dados:

  • Adição de Colunas de Particionamento: Para facilitar a organização e consulta dos dados, adicionamos colunas de ano, mês e dia aos dados lidos.
  • Uso do PySpark: Utilizamos funções do PySpark para manipular e transformar os dados conforme necessário.

Escrita dos Dados no S3 em Formato Parquet:

  • Configuração do Caminho de Destino: Definimos o caminho de destino no Amazon S3 onde os dados transformados serão armazenados.
  • Utilização do Formato Parquet: O formato Parquet é escolhido devido à sua eficiência em termos de espaço e velocidade de leitura. Este formato colunar permite uma compactação eficiente e uma leitura seletiva das colunas necessárias, tornando-o ideal para análises de grandes datasets.
  • Particionamento dos Dados: Particionamos os dados por várias chaves (client_id, stuff_type, ano, mês, dia) para facilitar consultas futuras e melhorar a performance.

Código fonte

Esse script leva em consideração que existe no banco de dados collections de vários clientes (clients) e essas collections guardam determinados tipos de documentos (stuff_types). Assim, ele faz o processo para várias collections do banco.

pythonCopiar código
import sys
import logging
import pyspark
import json
from pyspark.sql.functions import col, year, month, dayofmonth, lit
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
from datetime import datetime, timedelta, timezone

logger = logging.getLogger("etl_docdb_to_s3")
logger.setLevel(logging.INFO)

# Gets job parameters
args = getResolvedOptions(
    sys.argv,
    ['JOB_NAME', 'BUCKET_NAME']
)

def validate_arguments(required_params):
    # logic to validate required parameters

validate_arguments(args)

job_name = args['JOB_NAME']
bucket_name = args['BUCKET_NAME']
# you can retrieve other parameters here

def get_collection_name(client_id, collection_type):
    # retrieves the collection name

def define_query_dates():
    yesterday = datetime.now(timezone.utc) - timedelta(days=1)
    gte = datetime(yesterday.year, yesterday.month, yesterday.day, 0, 0, 0, tzinfo=timezone.utc)
    lte = datetime(yesterday.year, yesterday.month, yesterday.day, 23, 59, 59, tzinfo=timezone.utc)
    return gte.isoformat(), lte.isoformat()

def write_data(df):
    s3_output_path = f"s3://{bucket_name}/{output_prefix}"
    logger.info(f"Writing data to S3 path at {s3_output_path}")
    (df.write
     .partitionBy("client_id", "stuff_type", "year", "month", "day")
     .format("parquet")
     .mode("append")
     .option("path", s3_output_path)
     .save())
    logger.info(f"Data written to S3 path at {s3_output_path}")

def read_data(spark, collection_name, stuff_type, client_id):
    yesterday_start, yesterday_end = define_query_dates()
    pipeline = [
        {
            "$match": {
                "timestamp": {
                    "$gte": {"$date": yesterday_start},
                    "$lte": {"$date": yesterday_end}
                }
            }
        }
    ]
    df = (spark.read.format("mongodb")
          .option("aggregation.pipeline", json.dumps(pipeline))
          .option("database", database_name)
          .option("connection.uri", mongo_uri)
          .option("collection", collection_name)
          .option("partitioner", "com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner")
          .load())
    if df.isEmpty():
        logger.info(f"No data found in collection {collection_name}")
        return None
    df = df.withColumn("year", year(col("timestamp")))
    df = df.withColumn("month", month(col("timestamp")))
    df = df.withColumn("day", dayofmonth(col("timestamp")))
    df = df.withColumn("client_id", lit(client_id))
    df = df.withColumn("stuff_type", lit(stuff_type))
    return df

def main():
    sc = pyspark.SparkContext()
    glue_context = GlueContext(sc)
    spark = glue_context.spark_session
    job = Job(glue_context)
    job.init(job_name, args)
    for client_id in clients:
        logger.debug(f"Processing for client_id [{client_id}]")
        for stuff_type in stuff_types:
            logger.debug(f"Processing for stuff_type [{stuff_type}]")
            collection_name = get_collection_name(client_id, stuff_type)
            logger.info(f"Processing for collection {collection_name}")
            df = read_data(spark, collection_name, stuff_type, client_id)
            if df is None:
                logger.info(f"No data found in collection {collection_name}")
            else:
                write_data(df)
    job.commit()

if __name__ == "__main__":
    main()

Conclusão

Neste artigo, exploramos como configurar e automatizar um pipeline de ETL utilizando AWS Glue e Amazon DocumentDB, gravando os dados transformados no Amazon S3 em formato Parquet. O formato Parquet foi escolhido devido às suas vantagens em termos de eficiência de espaço e performance de leitura, sendo ideal para análises de grandes volumes de dados.

A automação desse processo garante que as informações estejam sempre atualizadas e prontas para análise, ajudando as organizações a tomar decisões baseadas em dados de forma mais rápida e precisa. A aplicação dessas técnicas pode ser um diferencial significativo na gestão e análise de grandes volumes de dados, tornando-se uma habilidade valiosa para profissionais da área.

Se você gostou deste artigo e quer continuar aprendendo sobre automação de processos de ETL, tecnologias de banco de dados, boas práticas em engenharia de dados, e arquitetura de software, não deixe de me seguir no LinkedIn. Vamos nos conectar e compartilhar conhecimento!

Inicie sua jornada conosco

Estamos prontos para guiar o seu negócio rumo ao futuro, com a solução certa para você se beneficiar do potencial das APIs e integrações modernas.

Sua história de sucesso começa aqui

Conte com nosso apoio para levar as melhores integrações para o seu negócio, com soluções e equipes profissionais que são referência no mercado.