Se você faz parte do mundo de dados é muito provável que já tenha, pelo menos, ouvido falar no Apache Airflow. Essa ferramenta vem ganhando cada vez mais popularidade e neste post vamos ver o motivo de ela estar tão na moda.
Quando falamos em processos de ETL básicos, muitas vezes pensamos que não precisamos de uma ferramenta tão robusta para gerenciarmos nossos processos. Muitas vezes um simples scheduler consegue dar conta do recado e realizar todas as tarefas de um pipeline tranquilamente. No entanto, quando começam a surgir dependências entre as tarefas e, principalmente, quando é difícil de se determinar o tempo de execução de tarefas que possuem tarefas subsequentes dependentes de seus términos para iniciarem, vemos que existe a necessidade de utilizarmos ferramentas mais complexas e desenvolvidas, justamente para este tipo de situação.
O Airflow é uma ferramenta que nos possibilita realizar a criação, agendamento e monitoramento de pipeline de forma programática. Esta é, inclusive, a definição do Airflow em sua documentação. É uma definição bastante simples, somente com a qual não conseguimos entender a quantidade de possibilidades que o Airflow pode nos oferecer.
Um resumo sobre o Airflow: é uma ferramenta Open-Source que foi criada no Airbnb em 2014 e desde 2019 fez parte da Apache Software Foundation. Ele se baseia em 4 princípios diretos:
Além disso, um detalhe importante do Airflow é que ele é totalmente escrito na linguagem Python, que é uma das linguagens mais populares atualmente na área de dados.
Para começar a entender a ferramenta vamos passar pelos termos principais que vão sempre acompanhar o estudo e a utilização do Airflow.
As DAGs são o ponto chave para o bom funcionamento do Airflow e um dos porquês de ele ser tão adotado. DAG ou Directed Acyclic Graph — que significa grafo acíclico dirigido — de forma bem resumida, é uma ferramenta matemática composta de nós e arestas que conectam estes nós. O fato de ser direcionado implica que estas arestas são “setas” unidirecionais e, por ser acíclico, cria a restrição de não entrarmos em nenhum caminho que leve a um loop, ou seja, seguindo o sentido das arestas eventualmente iremos parar em um ou mais nós onde não há mais caminho a ser percorrido.
O Airflow se baseia em DAGs para desenhar um pipeline de dados, seja qual for a sua função. Em uma dada DAG, cada uma das tarefas ou tasks do Airflow é um nó do grafo e as arestas representam a ordem de execução desejada dessas tarefas, conforme exemplo abaixo:
No exemplo acima, vemos que a única task que não depende de nenhuma outra para iniciar é a print_date. Com isso, ela deve ser a primeira a ser executada. Vemos, também, que as tasks sleep e templated dependem da task print_date para serem executadas, e, por isso, apenas começam quando a task print_date tiver sido completada com sucesso.
As DAGs devem ser escritas em Python e as dependências entre as tasks são totalmente definidas no próprio código.
A DAG Run é a realização de uma DAG. Enquanto a DAG engloba o conjunto das tasks e suas dependências, ela é atemporal e não implica que algo tenha sido executado ou não. Por outro lado, uma DAG Run é uma execução propriamente dita de uma DAG, incluindo horários, tempos de execução de cada uma das tasks, status de cada uma delas, etc.
Já falamos bastante nas tasks ao longo do post, porque é algo bastante claro. Dando um pouco mais de detalhe, uma task deve implementar alguma lógica a ser executada, que pode ser a ingestão de um dado através de uma API, a transformação de uma tabela ou até mesmo o treinamento de um modelo de machine learning. Além disso, quanto mais pudermos isolar estas atividades em diferentes tasks e desenhar uma DAG de forma coerente com a dependência entre elas, o processo ficará mais enxuto e será mais fácil de detectarmos e corrigirmos erros, pois teremos isolado cada uma das atividades e, consequentemente, as possíveis fontes de erro.
A Task Instance está para uma Task assim como a DAG Run está para a DAG. Dessa maneira, uma task instance nada mais é que a realização de uma task, onde podemos ter acesso aos logs cada vez que aquela task for executada, os tempos de execução, o status da task, se foi concluída ou se houve algum erro, etc.
XCom (comunicação cruzada) é uma forma de troca de mensagens curtas entre diferentes tasks. Apesar do Airflow poder ser integrado a diferentes ferramentas de Big Data, por exemplo o Apache Spark, o tratamento de dados grandes deve permanecer dentro de cada Task de forma que somente pequenas mensagens, basicamente metadados, sejam trocados entre as Tasks por meio de XCom. Por exemplo, o retorno de uma função executada em uma task pode servir de parâmetro para uma execução de tasks subsequentes, e com isso, as XComs tornam-se bastante úteis.
O executor é o mecanismo pelo qual uma Task é realizada. Ele é definido no momento da instalação do Airflow, de forma que, uma vez definido, será o mesmo em todas as DAGs de uma mesma instalação do Airflow. Basicamente, o executor trata do mecanismo de execução de cada código, ou seja, qual infraestrutura o código deverá ser executado, por exemplo, utilizando as capacidades do Kubernetes, através do KubernetesExecutor, ou simplesmente uma execução em máquina local, utilizando o LocalExecutor.
O operator é o template de uma Task, determina qual a ferramenta usada para executar determinado código. Como exemplo, temos o PythonOperator, que é o template para a execução de scripts escritos em Python. O Airflow permite a utilização de uma gama de diferentes operators para as mais diversas situações. Por exemplo, se precisarmos realizar uma tarefa bem específica onde há a necessidade da montagem de um ambiente complexo, podemos utilizar o DockerOperator, com o qual é possível realizar a execução de um código inclusive em linguagem diferente do Python, contido em determinada imagem que é passada como parâmetro ao operator.
Agora, vamos falar um pouco sobre os diferentes serviços que integram o Airflow para que ele consiga fazer funcionar as diferentes DAGs com robustez.
Conforme podemos ver na imagem acima, temos de mais próximo ao usuário o Webserver, que é responsável por fornecer uma interface com a qual podemos interagir com o Airflow para controlar e monitorar nossas DAGs. Por sinal, a interface do Airflow dispensa comentários por ser ao mesmo tempo enxuta e conter diversas informações muito úteis para a execução e monitoramento das DAGs. Com certeza é um dos motivos que fez o Airflow ganhar tanta popularidade.
Além do Webserver, temos um banco de dados que é responsável por guardar metadados. E que dados seriam estes? Todos aqueles relacionados a cada task instance de cada DAG Run. Essencialmente, lá ficam armazenadas informações como: os momentos de execução, o status de cada task instance, os tempos de execução de cada task instance, os agendamentos de cada DAG, etc. Só não ficam armazenados no banco de dados os próprios códigos das DAGs e nem os logs de cada task instance.
Outra peça bastante fundamental nesse quebra-cabeça é o scheduler. Ele é responsável por garantir que cada uma das tasks dentro de cada uma das DAGs seja executada no momento correto. Ele controla, portanto, os agendamentos e garante a ordem de execuções de tasks que dependem de outras tasks. Está em contato direto com o Executor para disparar uma solicitação para que seja executada cada uma das tasks. O executor, por sua vez, será o responsável por fazer a coisa acontecer.
Não podemos deixar de falar do DAG Directory, que é o local onde serão armazenados os códigos em Python que compõem cada uma das DAGs. Há algumas opções bastante interessantes para o armazenamento dos códigos. Por exemplo, podemos colocá-los na própria imagem do Airflow, de forma que quando feito o deploy, os códigos estejam locais à instalação. Uma forma mais interessante é a utilização da ferramenta git-sync, que é basicamente um “container sidecar” que é colocado nos containers do Webserver e do Scheduler, que permite realizar a sincronização com periodicidade pré-definida com o repositório onde se encontram os códigos com as DAGs. Dessa maneira, com uma atualização simples do repositório, de tempos em tempos o Airflow irá buscar por atualizações das DAGs e realizar o pull desses códigos para que estejam presentes em uma pasta local ao Airflow que irá conter todos as DAGs.
Na figura acima, há também a presença dos workers, que vão depender do tipo do Executor escolhido. Por exemplo, no caso da utilização do CeleryExecutor, utiliza-se a ferramenta Celery para distribuir diferentes tasks entre os workers disponíveis. No caso do KubernetesExecutor, o Kubernetes cria um POD para cada Task Instance, situação em que podemos entender os Workers como sendo basicamente os PODs em que cada uma das Tasks será executada.
Existem casos práticos em que determinada task deve ser executada somente após o disparo de algum evento, por exemplo, a chegada de um arquivo em um storage. Como muitas vezes não é possível precisar o momento de chegada do arquivo, o uso de sensors torna-se bastante interessante. Os sensors são um tipo especial de operator e se baseiam tanto em triggers internos do Airflow, como o término de um Task em outra DAG, ou externos, como o exemplo do storage. No caso do sensor que monitora a chegada de um arquivo, periodicamente ele vai fazer a busca por este arquivo no local especificado e, somente quando detectado, a task será iniciada.
Os Hooks são interfaces de alto nível para a utilização de serviços externos ao Airflow e são muitas vezes as peças fundamentais dos operators. Podem ser utilizados, por exemplo, para realizar a conexão com diversos bancos de dados.
Inicialmente o Airflow era uma peça única composta de diversos pacotes que muitas vezes nem eram utilizados. Na versão 2.0, esses pacotes foram separados e mantidos pelos providers. Cada um dos providers tem uma interface disponível para o Airflow com seu pacote. Com isso, a instalação principal fica mais leve, além de termos acesso mais rápido às atualizações de cada um dos providers e nos beneficiarmos de forma mais rápida do avanço das ferramentas disponíveis em cada um deles, dado que agora o desenvolvimento é mais modular.
Vamos mostrar os principais trechos que compõem a sintaxe da criação de uma DAG. Apesar de já existir uma nova sintaxe com o uso de decorator, vamos mostrar a sintaxe que se utiliza desde a primeira versão e que se tornou bastante popular e ainda é compatível.
Podemos ver acima que utilizamos o gerenciador de contexto with para instanciar uma DAG, que é uma classe importada do Airflow e dentro dela definimos as tasks através dos operators que queremos utilizar:
No caso do uso do PythonOperator, um dos parâmetros que passamos é a própria função (código da função não mostrado acima) que deverá ser executada naquela task.
Por fim, uma das coisas mais interessantes relacionadas à sintaxe do Airflow, que é a definição da ordem das tasks.
Inserimos este trecho ainda dentro do gerenciador de contexto e seu significado é bastante intuitivo. A sintaxe acima significa que a task t1 deve ser executada anteriormente à task t3. Além disso, a task t1 deve ser anterior à task t2, que por sua vez deve ser anterior à task t4. Os colchetes indicam que as tasks t5 e t6 podem ser executadas em paralelo e vemos que elas dependem da task t4. Por fim, quando concluídas ambas as tasks t5 e t6 a task t7 pode ser executada. O resultado deste trecho de código é a seguinte DAG:
As cores nas bordas de cada Task Instance acima evidenciam o status da Task Instance para dada DAG Run, conforme a legenda abaixo:
No caso acima, vemos que todas as Tasks foram concluídas com sucesso, com exceção da Task send_email_API_down, que foi pulada, devido à condição encontrada dentro da task try_connection_to_API, que com a utilização do operator BranchPythonOperator, define um caminho a ser seguido dependendo da verificação de uma condição.
Conhecemos um pouco sobre o Airflow, seus conceitos e principais terminologias desta ferramenta incrível. Ficou curioso pra testar? A documentação do Airflow é bastante clara, inclusive na seção de instalação há uma sugestão e os passos para realizarmos a instalação com Docker. Seguindo estes passos, em poucos minutos você terá o Airflow funcionando localmente e poderá testar diferentes pipelines para seus modelos!
Obrigado pela leitura e mantenha-se informado em nosso Blog!
airflow.apache.org