Prädikat-Pushdown:
Wenn Sie in SQL eine Abfrage verwenden, die sowohl die Join- als auch die Where-Bedingung enthält, erfolgt die Verknüpfung zuerst über die gesamten Daten und dann die Filterung basierend auf der Where-Bedingung. Was passiert, wenn sich Spark bei einem sehr großen Dataset genauso verhält wie SQL, würde der Join mehrere Stunden dauern, um dem Dataset beizutreten, da er über das ungefilterte Dataset erfolgt.
Prädikat Pushdown, der Name selbst ist selbsterklärend, Prädikat ist im Allgemeinen eine where-Bedingung, die True oder False zurückgibt. Während der Map-Phase schiebt Spark die Prädikatbedingungen direkt in die Datenbank, filtert die Daten auf Datenbankebene selbst anhand der Prädikatbedingungen, reduziert so die aus der Datenbank abgerufenen Daten und verbessert die Abfrageleistung. Da die Filterung im Datenspeicher selbst erfolgt, ist die Abfrage sehr schnell und da die Filterung bereits stattgefunden hat, wird die Übertragung ungefilterter Daten über das Netzwerk vermieden, und jetzt werden nur noch die gefilterten Daten im Speicher gespeichert.
Wir können die explain-Methode verwenden, um den physischen Plan des Datenrahmens anzuzeigen, ob Prädikat-Pushdown verwendet wird oder nicht. Prädikate müssen in den entsprechenden Datentyp umgewandelt werden, wenn nicht, funktionieren Prädikate nicht.
>>> df = spark.read.parquet("file1").filter((F.col("date") >= "2019-08-01") & (F.col("date") <= "2019-09-01"))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#237) && (cast(date#237 as string) >= 2019-08-01)) && (cast(date#237 as string) <= 2019-09-01))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a..
Im obigen Beispiel versuche ich, einen Datensatz basierend auf dem Zeitrahmen zu filtern, wobei Filter alle Prädikate anzeigen, die über den Datensatz ausgeführt werden müssen.
>>> df = spark.read.parquet("file1").filter((F.col("date") >= datetime.strptime("2019-08-01", "%Y-%m-%d").date()) & (F.col("date") <= datetime.strptime("2019-09-01", "%Y-%m-%d").date()))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#273) && (date#273 >= 18109)) && (date#273 <= 18140))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<frame_id:string,id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a...
Im obigen Beispiel wird das Datum ordnungsgemäß in das DateTime-Format umgewandelt.