Apache Spark og Amazon S3 - Gotchas og bedste praksis

S3 er et objektlager og ikke et filsystem, hvorfor de problemer, der opstår som følge af eventuel konsistens, ikke-atomære omdøb skal håndteres i applikationskoden. Katalogserveren i et filsystem er erstattet af en hash-algoritme til filnavnet. Dette er dårligt til liste over ting, katalogoperationer, sletning og omdøbning (kopiering og sletning, da det teknisk ikke er nogen omdøbning i objektbutikker)

Begynd at bruge S3A (URI-skema: s3a: //) - Hadoop 2.7+. S3a er den anbefalede S3-klient til Hadoop 2.7 og senere er S3a mere performant og understøtter større filer (op til 5TB) og har understøttelse af multipart-upload. Alle objekter, der er tilgængelige fra s3n: // URL'er, skal også være tilgængelige fra s3a ved blot at udskifte URL-skemaet. De fleste bugrapporter mod S3N lukkes som WONTFIX

At få Spark 2.0.1 til at arbejde med S3a Til Spark 2.0.1 skal du bruge hadoop-aws-2.7.3.jar, aws-java-sdk-1.7.4.jar, joda-time-2.9.3.jar på din klassepad; glem ikke at opdatere spark-default.conf med AWS-nøglerne og S3A FileSystemClass

Spark.hadoop.fs.s3a.access.key XXXXXXX
spark.hadoop.fs.s3a.secret.key XXXXXXX
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

Brug bestemt Dataframes som forudbestilling af forespørgsel og presikat-push-down er tilgængelig ud af boksen, og derfor henter mindre data til sidst hurtigere dine forespørgsler

Hvis du læser de samme data flere gange, kan du prøve at bruge .cache eller s3distcp til at overføre filerne til din lokale EMR-klynge for at drage fordel af den bedre fillæsningsydelse af et rigtigt filsystem. Indstillingen groupBy for s3distcp er en fantastisk mulighed for at løse det lille filproblem ved at flette et stort antal små filer.

Hvilket bringer mig til spørgsmålet om at læse et stort antal små filer. Hvis sammenlægning af filer ved hjælp af et værktøj ikke er en mulighed, prøv følgende kode, der effektivt fungerer rundt om den langsomme S3-katalogliste-flaskehals

import com.amazonaws.services.s3._, model._
    import com.amazonaws.auth.BasicAWS-legitimationsoplysninger

    val request = new ListObjectsRequest ()
    request.setBucketName (spand)
    request.setPrefix (prefix)
    request.setMaxKeys (pageLength)
    def s3 = ny AmazonS3Client (nye BasicAWSCredentials (nøgle, hemmelighed))

    val objs = s3.listObjects (anmodning) // Bemærk, at denne metode returnerer trunkerede data, hvis de er længere end "sidelængde" ovenfor. Du skal muligvis tackle det.
    sc.parallelize (objs.getObjectSummaries.map (_. getKey) .toList)
        .flatMap {key => Source.fromInputStream (s3.getObject (spand, nøgle) .getObjectContent: InputStream) .getLines}

Sørg for, at spark.sql.parquet.filterPushdown-indstillingen er sand, og spark.sql.parquet.mergeSchema er falsk (for at undgå, at skemaet fusionerer under skrivning, der virkelig bremser du skriver scenen). Heldigvis har Spark 2.0 den rigtige standard

Har du undret dig over, hvorfor netop det tidspunkt, et job er ved at afsluttes, intet bliver skrevet til logfilerne, og alle gnistoperationer ser ud til at være stoppet, men resultaterne findes endnu ikke i outputmappen til S3 ... hvad foregår der? Nå, hver gang eksekutiverne skriver resultatet af jobbet, skriver hver af dem til en midlertidig mappe uden for hovedkataloget, hvor filerne skulle skrives, og når alle eksekutører er færdig, foretages et omdøb for at få atomisk eksklusivitet. Dette er helt fint i et standardfilsystem som hdfs, hvor omdøb er øjeblikkelig, men i et objektlager som S3 er dette ikke befordrende, da omdøbninger på S3 udføres ved 6MB / s.

Skriv evt. Output af jobbet til EMR hdfs (for at udnytte de næsten øjeblikkelige omdøb og bedre fil IO af lokale hdfs) og tilføj et dstcp-trin for at flytte filerne til S3, for at gemme dig selv alle problemer med at håndtere indersiden af en objektlager, der prøver at være et filsystem. Skrivning til lokale hdfs giver dig også mulighed for at aktivere spekulation til at kontrollere løbende opgaver uden at falde i deadlock-fælderne, der er forbundet med DirectOutputCommiter.

Hvis du skal bruge S3 som outputmappe, skal du sikre dig, at følgende gnistkonfigurationer er indstillet

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.speculation falsk

Bemærk: DirectParquetOutputCommitter fjernes fra Spark 2.0 på grund af risikoen for datatab. Indtil vi har forbedret konsistensen fra S3a, er vi desværre nødt til at arbejde med løsningen. Tingene forbedres med Hadoop 2.8

Undgå keynames i leksikografisk rækkefølge. Man kunne bruge hashing / tilfældige præfikser eller omvendt dato for at komme rundt. Tricket er at navngive dine taster hierarkisk og placere de mest almindelige ting, du filtrerer efter til venstre for din nøgle. Og har aldrig understregninger i spandavne på grund af DNS-problemer.

Aktivering af fs.s3a.fast.upload upload af dele af en enkelt fil til Amazon S3 parallelt

Nå, det var hjernedumpen af ​​problemer i produktionen, som jeg for nylig har løst for at få Spark til at arbejde med S3. Hold øje med mere om dette, da jeg graver dybere ned i det næste indlæg ...