predikátum Pushdown :
az SQL-ben, amikor olyan lekérdezést használ, amely mind a join, mind a where feltételt tartalmaz, a Csatlakozás először a teljes adaton történik, majd a szűrés a where feltétel alapján történik. Mi fog történni, ha a spark ugyanúgy viselkedik, mint az SQL, egy nagyon hatalmas adatkészlet esetében a Csatlakozás több órányi számítást igényel az adatkészlethez való csatlakozáshoz, mivel ez a szűretlen adatkészlet felett történik, ezután ismét több órát vesz igénybe a where feltétel használatával történő szűrés.
predikátum pushdown, maga a név magától értetődő, predikátum általában egy where feltétel, amely visszatér igaz vagy hamis. A leképezési fázisban a spark a predikátumfeltételeket közvetlenül az adatbázisba tolja, az adatokat maga az adatbázis szintjén szűri a predikátumfeltételek felhasználásával, ezáltal csökkentve az adatbázisból lekért adatokat, és javítva a lekérdezés teljesítményét. Mivel a szűrés az adattárban történik, a lekérdezés nagyon gyors, és mivel a szűrés már megtörtént, elkerüli a szűretlen adatok átvitelét a hálózaton keresztül, és most már csak a szűrt adatok tárolódnak a memóriában.
használhatjuk az explain metódust az adatkeret fizikai tervének megtekintéséhez, függetlenül attól, hogy predikátum pushdown-t használnak-e vagy sem. A predikátumokat a megfelelő adattípusra kell önteni, ha nem, akkor a predikátumok nem működnek.
>>> df = spark.read.parquet("file1").filter((F.col("date") >= "2019-08-01") & (F.col("date") <= "2019-09-01"))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#237) && (cast(date#237 as string) >= 2019-08-01)) && (cast(date#237 as string) <= 2019-09-01))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a..
a fenti példában egy adatkészletet próbálok szűrni az időkeret alapján, a tolt szűrők megjelenítik az összes predikátumot, amelyet végre kell hajtani az adatkészlet felett, ebben a példában, mivel a DateTime nincs megfelelően öntve, a predikátumoknál nagyobb és kisebb predikátumokat nem nyomják le az adatkészletbe.
>>> df = spark.read.parquet("file1").filter((F.col("date") >= datetime.strptime("2019-08-01", "%Y-%m-%d").date()) & (F.col("date") <= datetime.strptime("2019-09-01", "%Y-%m-%d").date()))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#273) && (date#273 >= 18109)) && (date#273 <= 18140))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<frame_id:string,id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a...
a fenti példában a dátum helyesen van beírva a DateTime formátumba, most a magyarázatban láthatja, hogy a predikátumok le vannak nyomva.