techniki optymalizacji Spark

predykat Pushdown :

w SQL, za każdym razem, gdy używasz zapytania, które ma warunek join i where, dzieje się to Join najpierw w całych danych, a następnie filtrowanie odbywa się na podstawie warunku where. Co się stanie, jeśli spark zachowuje się tak samo jak SQL, dla bardzo dużego zbioru danych połączenie zajęłoby kilka godzin obliczeń, aby dołączyć do zbioru danych, ponieważ dzieje się to w niefiltrowanym zbiorze danych, po czym ponownie filtrowanie za pomocą warunku where zajmuje kilka godzin.

predykat pushdown, sama nazwa jest oczywista, predykat jest zazwyczaj warunkiem where, który zwróci True lub False. Podczas fazy Map spark wypycha warunki predykatu bezpośrednio do bazy danych, filtruje dane na samym poziomie bazy danych za pomocą warunków predykatu, zmniejszając w ten sposób dane pobierane z bazy danych i zwiększając wydajność zapytań. Ponieważ filtrowanie odbywa się w samym magazynie danych, zapytania są bardzo szybkie, a ponieważ filtrowanie już się stało, unika się przesyłania niefiltrowanych danych przez Sieć i teraz tylko przefiltrowane dane są przechowywane w pamięci.
możemy użyć metody explain, aby zobaczyć fizyczny plan ramki danych, czy predykat pushdown jest używany, czy nie. Predykaty muszą być rzucane do odpowiedniego typu danych, jeśli nie, to predykaty nie działają.

>>> df = spark.read.parquet("file1").filter((F.col("date") >= "2019-08-01") & (F.col("date") <= "2019-09-01"))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#237) && (cast(date#237 as string) >= 2019-08-01)) && (cast(date#237 as string) <= 2019-09-01))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a..

w powyższym przykładzie próbuję filtrować zestaw danych na podstawie przedziału czasowego, filtry wypchnięte wyświetlą wszystkie predykaty, które muszą być wykonane w zestawie danych, w tym przykładzie, ponieważ DateTime nie jest prawidłowo rzutowany predykaty większe-niż i mniejsze niż nie są pchane w dół do zestawu danych.

>>> df = spark.read.parquet("file1").filter((F.col("date") >= datetime.strptime("2019-08-01", "%Y-%m-%d").date()) & (F.col("date") <= datetime.strptime("2019-09-01", "%Y-%m-%d").date()))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#273) && (date#273 >= 18109)) && (date#273 <= 18140))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<frame_id:string,id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a...

w powyższym przykładzie Data jest poprawnie wpisywana do formatu DateTime, teraz w wyjaśnieniu można zobaczyć predykaty są przesuwane w dół.

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany.

More: