Tecniche di ottimizzazione Spark

Pushdown predicato:

In SQL, ogni volta che si utilizza una query che ha sia la condizione join che where, ciò che accade è che Join avvenga prima attraverso l’intero dato e quindi il filtraggio avviene in base alla condizione where. Cosa succederà se spark si comporta allo stesso modo di SQL, per un set di dati molto grande, il join richiederebbe diverse ore di calcolo per unirsi al set di dati poiché sta accadendo sul set di dati non filtrato, dopo di che ci vogliono ancora diverse ore per filtrare usando la condizione where.

Predicato pushdown, il nome stesso è autoesplicativo, Predicato è generalmente una condizione where che restituirà True o False. Durante la fase della mappa, spark spinge le condizioni del predicato direttamente al database, filtra i dati a livello del database stesso utilizzando le condizioni del predicato, riducendo quindi i dati recuperati dal database e migliorando le prestazioni della query. Poiché il filtraggio avviene nell’archivio dati stesso, l’interrogazione è molto veloce e anche dal momento che il filtraggio è già avvenuto evita il trasferimento di dati non filtrati sulla rete e ora solo i dati filtrati sono memorizzati nella memoria.
Possiamo usare il metodo explain per vedere il piano fisico del dataframe se viene utilizzato o meno il predicato pushdown. I predicati devono essere trasmessi al tipo di dati corrispondente, in caso contrario i predicati non funzionano.

>>> df = spark.read.parquet("file1").filter((F.col("date") >= "2019-08-01") & (F.col("date") <= "2019-09-01"))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#237) && (cast(date#237 as string) >= 2019-08-01)) && (cast(date#237 as string) <= 2019-09-01))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a..

Nell’esempio precedente, sto cercando di filtrare un set di dati in base all’intervallo di tempo, i filtri spinti visualizzeranno tutti i predicati che devono essere eseguiti sul set di dati, in questo esempio poiché DateTime non è correttamente castato maggiore di e minore di predicati non vengono spinti verso il set di dati.

>>> df = spark.read.parquet("file1").filter((F.col("date") >= datetime.strptime("2019-08-01", "%Y-%m-%d").date()) & (F.col("date") <= datetime.strptime("2019-09-01", "%Y-%m-%d").date()))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#273) && (date#273 >= 18109)) && (date#273 <= 18140))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<frame_id:string,id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a...

Nell’esempio precedente, la data viene correttamente digitata nel formato DateTime, ora nella spiegazione è possibile vedere i predicati vengono spinti verso il basso.

Lascia un commento

Il tuo indirizzo email non sarà pubblicato.

More: