Spark Optimization Techniques

Predicate Pushdown:

in SQL, wanneer u een query gebruikt die zowel join als where condition heeft, gebeurt Join eerst over de gehele data en dan filtering gebeurt op basis van where condition. Wat zal er gebeuren als spark zich op dezelfde manier gedraagt als SQL, voor een zeer grote dataset, zou de join enkele uren van berekening vergen om de dataset aan te sluiten aangezien het over de ongefilterde dataset gebeurt, waarna het opnieuw enkele uren duurt om te filteren met behulp van de where-voorwaarde.

predicaat pushdown, de naam zelf spreekt voor zich, predicaat is over het algemeen een waarvoorwaarde die Waar of onwaar zal teruggeven. Tijdens de Kaartfase wat spark doet is, het duwt de predicaat voorwaarden rechtstreeks naar de database, filtert de gegevens op het niveau van de database zelf met behulp van de predicaat voorwaarden, dus het verminderen van de gegevens opgehaald uit de database en verbetert de query prestaties. Aangezien het filteren gebeurt in de gegevensopslag zelf, de querying is zeer snel en ook omdat het filteren is al gebeurd vermijdt het overbrengen van ongefilterde gegevens over het netwerk en nu alleen de gefilterde gegevens worden opgeslagen in het geheugen.
we kunnen de explain methode gebruiken om het fysieke plan van het dataframe te zien of predicaat pushdown wordt gebruikt of niet. Predicaten moeten worden gegoten naar het overeenkomstige gegevenstype, zo niet dan predicaten werken niet.

>>> df = spark.read.parquet("file1").filter((F.col("date") >= "2019-08-01") & (F.col("date") <= "2019-09-01"))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#237) && (cast(date#237 as string) >= 2019-08-01)) && (cast(date#237 as string) <= 2019-09-01))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a..

in het bovenstaande voorbeeld probeer ik een dataset te filteren op basis van het tijdsbestek, pushed filters zullen alle predicaten weergeven die over de dataset moeten worden uitgevoerd, in dit voorbeeld omdat DateTime niet correct is gegoten groter-dan en minder dan predicaten worden niet naar beneden gepusht naar de dataset.

>>> df = spark.read.parquet("file1").filter((F.col("date") >= datetime.strptime("2019-08-01", "%Y-%m-%d").date()) & (F.col("date") <= datetime.strptime("2019-09-01", "%Y-%m-%d").date()))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#273) && (date#273 >= 18109)) && (date#273 <= 18140))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<frame_id:string,id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a...

in het bovenstaande voorbeeld wordt de datum correct getypt naar DateTime formaat, nu in de explain kun je zien dat de predicaten naar beneden worden geduwd.

Geef een antwoord

Het e-mailadres wordt niet gepubliceerd.

More: