Spark Optimization Techniques

predikat Pushdown:

i SQL, när du använder en fråga som har både join och where condition, vad som händer är Join först händer över hela data och sedan filtrering sker baserat på where condition. Vad händer om spark beter sig på samma sätt som SQL gör, för en mycket stor dataset, skulle kopplingen ta flera timmars beräkning för att gå med i datasetet eftersom det händer över det ofiltrerade datasetet, varefter det igen tar flera timmar att filtrera med where-tillståndet.

predikat pushdown, själva namnet är självförklarande, predikat är i allmänhet ett where-tillstånd som kommer att returnera sant eller falskt. Under Kartfasen vad spark gör är att det trycker ner predikatförhållandena direkt till databasen, filtrerar data på databasnivån själv med hjälp av predikatförhållandena, vilket minskar data som hämtas från databasen och förbättrar frågans prestanda. Eftersom filtreringen sker i själva datalagret är frågan mycket snabb och även eftersom filtrering redan har hänt undviker det att överföra ofiltrerade data över nätverket och nu lagras bara de filtrerade data i minnet.
vi kan använda metoden förklara för att se den fysiska planen för dataframen om predikat pushdown används eller inte. Predikat måste gjutas till motsvarande datatyp, om inte fungerar predikat inte.

>>> df = spark.read.parquet("file1").filter((F.col("date") >= "2019-08-01") & (F.col("date") <= "2019-09-01"))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#237) && (cast(date#237 as string) >= 2019-08-01)) && (cast(date#237 as string) <= 2019-09-01))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a..

i ovanstående exempel försöker jag filtrera en dataset baserad på tidsramen, pressade filter visar alla predikat som måste utföras över datasetet, i det här exemplet eftersom DateTime inte är korrekt gjutet större än och mindre än predikat inte skjuts ner till dataset.

>>> df = spark.read.parquet("file1").filter((F.col("date") >= datetime.strptime("2019-08-01", "%Y-%m-%d").date()) & (F.col("date") <= datetime.strptime("2019-09-01", "%Y-%m-%d").date()))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#273) && (date#273 >= 18109)) && (date#273 <= 18140))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<frame_id:string,id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a...

i exemplet ovan är datumet korrekt typgjutet till DateTime-format, nu i förklaringen kan du se att predikaten trycks ner.

Lämna ett svar

Din e-postadress kommer inte publiceras.

More: