Automatizando ETL com AWS Glue, AWS DocumentDB e S3
Como arquiteto de software, estou constantemente buscando maneiras de otimizar a performance e reduzir custos das soluções que gerencio. Recentemente, me deparei com a necessidade de expurgar um grande dataset de dados armazenados no Amazon DocumentDB (~1TB). A ideia era transferir esses dados para o AWS S3 em formato Apache Parquet, uma vez que esse formato não só é altamente eficiente em termos de armazenamento, mas também oferece um excelente desempenho para consultas analíticas.
Neste artigo, vamos explorar como configurar e automatizar um pipeline de ETL (extrair, transformar e carregar) usando AWS Glue e Amazon DocumentDB, gravando os dados transformados no Amazon S3 em formato Parquet.
Compartilho aqui a solução que desenvolvi para enfrentar esse desafio.
Contexto
O Amazon DocumentDB é um serviço de banco de dados NoSQL gerenciado compatível com o MongoDB, ele é uma boa escolha para aplicações que precisam de um banco de dados gerenciado com alta disponibilidade e escalabilidade. No entanto, à medida que os dados crescem, os custos de armazenamento podem se tornar um fator crítico. Para mitigar isso, decidi mover os dados históricos para o Amazon S3 em formato Parquet. O Parquet é um formato de armazenamento colunar, ideal para grandes volumes de dados, pois permite compressão eficiente e consultas rápidas.
O AWS Glue é um serviço gerenciado de ETL que facilita a preparação e carga de dados. Utilizaremos o formato de arquivo Parquet, que oferece diversas vantagens em termos de performance e eficiência de armazenamento.
Configuração do Ambiente e Leitura dos Dados do DocumentDB
Configuração do AWS Glue:
- Criação do Job no AWS Glue: Para começar, é necessário criar um job no AWS Glue, configurando as permissões necessárias no IAM para permitir a leitura dos dados no DocumentDB e a escrita no S3.
Leitura dos Dados do Amazon DocumentDB:
- Conexão com o DocumentDB: Configuramos a conexão com o Amazon DocumentDB utilizando o URI de conexão.
- Leitura dos Dados: Utilizamos o PySpark para ler os dados do DocumentDB, aplicando um pipeline para filtrar os documentos pela data desejada.
Transformação e Escrita dos Dados no S3
Transformação dos Dados:
- Adição de Colunas de Particionamento: Para facilitar a organização e consulta dos dados, adicionamos colunas de ano, mês e dia aos dados lidos.
- Uso do PySpark: Utilizamos funções do PySpark para manipular e transformar os dados conforme necessário.
Escrita dos Dados no S3 em Formato Parquet:
- Configuração do Caminho de Destino: Definimos o caminho de destino no Amazon S3 onde os dados transformados serão armazenados.
- Utilização do Formato Parquet: O formato Parquet é escolhido devido à sua eficiência em termos de espaço e velocidade de leitura. Este formato colunar permite uma compactação eficiente e uma leitura seletiva das colunas necessárias, tornando-o ideal para análises de grandes datasets.
- Particionamento dos Dados: Particionamos os dados por várias chaves (client_id, stuff_type, ano, mês, dia) para facilitar consultas futuras e melhorar a performance.
Código fonte
Esse script leva em consideração que existe no banco de dados collections de vários clientes (clients) e essas collections guardam determinados tipos de documentos (stuff_types). Assim, ele faz o processo para várias collections do banco.
pythonCopiar código
import sys
import logging
import pyspark
import json
from pyspark.sql.functions import col, year, month, dayofmonth, lit
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
from datetime import datetime, timedelta, timezone
logger = logging.getLogger("etl_docdb_to_s3")
logger.setLevel(logging.INFO)
# Gets job parameters
args = getResolvedOptions(
sys.argv,
['JOB_NAME', 'BUCKET_NAME']
)
def validate_arguments(required_params):
# logic to validate required parameters
validate_arguments(args)
job_name = args['JOB_NAME']
bucket_name = args['BUCKET_NAME']
# you can retrieve other parameters here
def get_collection_name(client_id, collection_type):
# retrieves the collection name
def define_query_dates():
yesterday = datetime.now(timezone.utc) - timedelta(days=1)
gte = datetime(yesterday.year, yesterday.month, yesterday.day, 0, 0, 0, tzinfo=timezone.utc)
lte = datetime(yesterday.year, yesterday.month, yesterday.day, 23, 59, 59, tzinfo=timezone.utc)
return gte.isoformat(), lte.isoformat()
def write_data(df):
s3_output_path = f"s3://{bucket_name}/{output_prefix}"
logger.info(f"Writing data to S3 path at {s3_output_path}")
(df.write
.partitionBy("client_id", "stuff_type", "year", "month", "day")
.format("parquet")
.mode("append")
.option("path", s3_output_path)
.save())
logger.info(f"Data written to S3 path at {s3_output_path}")
def read_data(spark, collection_name, stuff_type, client_id):
yesterday_start, yesterday_end = define_query_dates()
pipeline = [
{
"$match": {
"timestamp": {
"$gte": {"$date": yesterday_start},
"$lte": {"$date": yesterday_end}
}
}
}
]
df = (spark.read.format("mongodb")
.option("aggregation.pipeline", json.dumps(pipeline))
.option("database", database_name)
.option("connection.uri", mongo_uri)
.option("collection", collection_name)
.option("partitioner", "com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner")
.load())
if df.isEmpty():
logger.info(f"No data found in collection {collection_name}")
return None
df = df.withColumn("year", year(col("timestamp")))
df = df.withColumn("month", month(col("timestamp")))
df = df.withColumn("day", dayofmonth(col("timestamp")))
df = df.withColumn("client_id", lit(client_id))
df = df.withColumn("stuff_type", lit(stuff_type))
return df
def main():
sc = pyspark.SparkContext()
glue_context = GlueContext(sc)
spark = glue_context.spark_session
job = Job(glue_context)
job.init(job_name, args)
for client_id in clients:
logger.debug(f"Processing for client_id [{client_id}]")
for stuff_type in stuff_types:
logger.debug(f"Processing for stuff_type [{stuff_type}]")
collection_name = get_collection_name(client_id, stuff_type)
logger.info(f"Processing for collection {collection_name}")
df = read_data(spark, collection_name, stuff_type, client_id)
if df is None:
logger.info(f"No data found in collection {collection_name}")
else:
write_data(df)
job.commit()
if __name__ == "__main__":
main()
Conclusão
Neste artigo, exploramos como configurar e automatizar um pipeline de ETL utilizando AWS Glue e Amazon DocumentDB, gravando os dados transformados no Amazon S3 em formato Parquet. O formato Parquet foi escolhido devido às suas vantagens em termos de eficiência de espaço e performance de leitura, sendo ideal para análises de grandes volumes de dados.
A automação desse processo garante que as informações estejam sempre atualizadas e prontas para análise, ajudando as organizações a tomar decisões baseadas em dados de forma mais rápida e precisa. A aplicação dessas técnicas pode ser um diferencial significativo na gestão e análise de grandes volumes de dados, tornando-se uma habilidade valiosa para profissionais da área.
Se você gostou deste artigo e quer continuar aprendendo sobre automação de processos de ETL, tecnologias de banco de dados, boas práticas em engenharia de dados, e arquitetura de software, não deixe de me seguir no LinkedIn. Vamos nos conectar e compartilhar conhecimento!
Inicie sua jornada conosco
Estamos prontos para guiar o seu negócio rumo ao futuro, com a solução certa para você se beneficiar do potencial das APIs e integrações modernas.
Conteúdos relacionados
Confira os conteúdos produzidos pela nossa equipe
Sua história de sucesso começa aqui
Conte com nosso apoio para levar as melhores integrações para o seu negócio, com soluções e equipes profissionais que são referência no mercado.