Luftstrøm: Mindre kendte tip, tricks og bedste praksis

Der er visse ting med alle de værktøjer, du bruger, som du ikke kender, selv efter at du har brugt det i lang tid. Og når du først ved det, er du som "Jeg ønsker at jeg vidste dette før", da du allerede havde fortalt din klient, at det ikke kan gøres på nogen bedre måde . Luftstrøm som noget andet værktøj er ikke anderledes, der er nogle skjulte perler, der kan gøre dit liv let og gøre DAG-udvikling sjov.

Du kender måske allerede nogle af dem, og hvis du kender dem alle - ja, du er en PRO da.

(1) DAG med context Manager

Blev du irriteret over dig selv, da du glemte at tilføje dag = dag til din opgave og Airflow-fejl er blevet? Ja, det er let at glemme at tilføje det til hver opgave. Det er også overflødigt at tilføje den samme parameter som vist i følgende eksempel (eksempel_dag.py-fil):

Eksemplet (eksempel_dag.py-fil) ovenfor har bare 2 opgaver, men hvis du har 10 eller mere, bliver redundansen mere tydelig. For at undgå dette kan du bruge Airflow DAGs som konteksthåndterere til automatisk at tildele nye operatører til den DAG som vist i ovenstående eksempel (eksempel_dag_with_context.py) ved hjælp af sætning.

(2) Brug af liste til at indstille opgaveafhængigheder

Når du vil oprette DAG, der ligner den, der er vist på billedet herunder, bliver du nødt til at gentage opgavenavne, når du indstiller opgaveafhængighed.

Som vist i ovennævnte kodestykker, vil brug af vores normale måde at indstille opgaveafhængigheder betyde, at opgave_two og afslutning gentages 3 gange. Dette kan erstattes ved hjælp af pythonlister for at opnå det samme resultat på en mere elegant måde.

(3) Brug standardargumenter for at undgå gentagelse af argumenter

Luftstrøm, der giver mulighed for at videregive en ordbog med parametre, der ville være tilgængelig for alle opgaverne i den DAG.

For eksempel bruger vi BigQuery på DataReply til alle vores DataWareshouse-relaterede DAG'er og i stedet for at videregive parametre som labels, bigquery_conn_id til hver opgave, videresender vi simpelthen den indefault_args ordbog som vist i DAG nedenfor.

Dette er også nyttigt, når du ønsker advarsler om individuelle opgavefejl i stedet for kun DAG-fejl, som jeg allerede har nævnt i mit sidste blogindlæg om Integrering af Slack Alerts i luftstrøm.

(4) "Params" -argumentet

“Params” er en ordbog med parametre på DAG-niveau, der gøres tilgængelige i skabeloner. Disse params kan tilsidesættes på opgaveniveau.

Dette er et yderst nyttigt argument, og jeg har personligt brugt det meget, da det kan fås adgang til i templeret felt med jinja-templering ved hjælp af params.param_name. Et eksempel på brug er som følger:

Det gør det nemt for dig at skrive parameteriseret DAG i stedet for hårdkodende værdier. Som vist i eksemplerne ovenfor kan paramsordbog defineres 3 steder: (1) I DAG-objekt (2) I default_args-ordbog (3) Hver opgave.

(5) Gemning af følsomme data i forbindelser

De fleste brugere er opmærksomme på dette, men jeg har stadig set adgangskoder gemt i almindelig tekst inde i DAG. For godheds skyld - ikke gør det. Du skal skrive dine DAG'er på en sådan måde, at du er sikker nok til at gemme dine DAG'er i et offentligt lager.

Som standard gemmer Airflow adgangskoder til forbindelsen i ren tekst i metadatadatabasen. Crypto-pakken anbefales stærkt under Airflow-installation og kan ganske enkelt udføres ved pip installere apache-airflow [crypto].

Du kan derefter nemt få adgang til det som følger:

fra airflow.hooks.base_hook import BaseHook
slack_token = BaseHook.get_connection ('slack'). adgangskode

(6) Begræns antallet af luftstrømvariabler i din DAG

Luftmængdevariabler gemmes i Metadata-databasen, så ethvert opkald til variabler ville betyde en forbindelse til Metadata-DB. DAG-filer analyseres hvert X. sekund. Brug af et stort antal variabler i din DAG (og værre i default_args) kan betyde, at du muligvis ender med at mætte antallet af tilladte forbindelser til din database.

For at undgå denne situation kan du enten bare bruge en enkelt luftstrømvariabel med JSON-værdi. Da en luftstrømvariabel kan indeholde JSON-værdi, kan du gemme al din DAG-konfiguration inden i en enkelt variabel som vist på billedet herunder:

Som vist i dette skærmbillede kan du enten gemme værdier i separate luftstrømvariabler eller under en enkelt luftstrømvariabel som et JSON-felt

Du kan derefter få adgang til dem som vist nedenfor under Anbefalet måde:

(7) "kontekst" -ordbogen

Brugere glemmer ofte indholdet i kontekstordbogen, når de bruger PythonOperator med en konverterbar funktion.

Konteksten indeholder henvisninger til relaterede objekter til opgaveinstansen og er dokumenteret under makrosafsnittet i API, da de også er tilgængelige for det templerede felt.

{
      'dag': task.dag,
      'ds': ds,
      'next_ds': next_ds,
      'next_ds_nodash': next_ds_nodash,
      'prev_ds': prev_ds,
      'prev_ds_nodash': prev_ds_nodash,
      'ds_nodash': ds_nodash,
      'ts': ts,
      'ts_nodash': ts_nodash,
      'ts_nodash_with_tz': ts_nodash_with_tz,
      'gær_ds': gårsdag_ds,
      'gær_ds_nodash': gårsdag_ds_nodash,
      'morgen_ds': morgen_ds,
      'morgen_ds_nodash': morgen_ds_nodash,
      'END_DATE': ds,
      'end_date': ds,
      'dag_run': dag_run,
      'run_id': run_id,
      'udførelsesdato': self.execution_date,
      'prev_execution_date': prev_execution_date,
      'next_execution_date': next_execution_date,
      'nyeste dato': ds,
      'makroer': makroer,
      'params': params,
      'borde': borde,
      'opgave': opgave,
      'task_instance': self,
      'ti': selv,
      'task_instance_key_str': ti_key_str,
      'konf': konfiguration,
      'test_mode': self.test_mode,
      'var': {
          'værdi': VariableAccessor (),
          'json': VariableJsonAccessor ()
      },
      'inlets': task.inlets,
      'outlets': task.outlets,
}

(8) Generering af dynamiske luftmængdeopgaver

Jeg har besvaret mange spørgsmål om StackOverflow om, hvordan man skaber dynamiske opgaver. Svaret er enkelt, du skal bare generere unik task_id til alle dine opgaver. Nedenfor er 2 eksempler på, hvordan man opnår dette:

(9) Kør “airflow upgradedb” i stedet for “airflow initdb”

Tak til Ash Berlin for dette tip i hans tale i First Apache Airflow London Meetup.

airflow initdb opretter alle standardforbindelser, diagrammer osv., som vi måske ikke bruger og ikke ønsker i vores produktionsdatabase. airflow upgradedb anvender i stedet bare eventuelle manglende migrationer til databasetabellen. (inklusive oprettelse af manglende tabeller osv.) Det er også sikkert at køre hver gang, det sporer hvilke migrationer, der allerede er blevet anvendt (ved hjælp af Alembic-modulet).

Fortæl mig det i kommentarfeltet nedenfor, hvis du ved noget, der ville være værd at tilføje i dette blogindlæg. Happy Airflow’ing :-)