Techniques d’optimisation Spark

Pushdown de prédicat:

En SQL, chaque fois que vous utilisez une requête qui a à la fois une condition de jointure et une condition where, la jointure se produit d’abord sur l’ensemble des données, puis le filtrage se produit en fonction de la condition where. Que se passera-t-il si spark se comporte de la même manière que SQL, pour un très grand ensemble de données, la jointure prendrait plusieurs heures de calcul pour rejoindre l’ensemble de données car elle se produit sur l’ensemble de données non filtré, après quoi il faut encore plusieurs heures pour filtrer en utilisant la condition where.

Pushdown du prédicat, le nom lui-même est explicite, le prédicat est généralement une condition where qui retournera True ou False. Pendant la phase de mappage, spark pousse les conditions de prédicat directement vers la base de données, filtre les données au niveau de la base de données elle-même à l’aide des conditions de prédicat, réduisant ainsi les données extraites de la base de données et améliorant les performances de la requête. Comme le filtrage se produit au niveau du magasin de données lui-même, l’interrogation est très rapide et, comme le filtrage s’est déjà produit, elle évite de transférer des données non filtrées sur le réseau et maintenant seules les données filtrées sont stockées dans la mémoire.
Nous pouvons utiliser la méthode explain pour voir le plan physique de la trame de données, que le pushdown du prédicat soit utilisé ou non. Les prédicats doivent être convertis en type de données correspondant, sinon les prédicats ne fonctionnent pas.

>>> df = spark.read.parquet("file1").filter((F.col("date") >= "2019-08-01") & (F.col("date") <= "2019-09-01"))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#237) && (cast(date#237 as string) >= 2019-08-01)) && (cast(date#237 as string) <= 2019-09-01))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a..

Dans l’exemple ci-dessus, j’essaie de filtrer un ensemble de données en fonction de la période de temps, les filtres poussés afficheront tous les prédicats qui doivent être effectués sur l’ensemble de données, dans cet exemple, car DateTime n’est pas correctement moulé les prédicats supérieurs et inférieurs à ne sont pas poussés vers l’ensemble de données.

>>> df = spark.read.parquet("file1").filter((F.col("date") >= datetime.strptime("2019-08-01", "%Y-%m-%d").date()) & (F.col("date") <= datetime.strptime("2019-09-01", "%Y-%m-%d").date()))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#273) && (date#273 >= 18109)) && (date#273 <= 18140))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<frame_id:string,id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a...

Dans l’exemple ci-dessus, la date est correctement typée au format DateTime, maintenant dans l’explication, vous pouvez voir que les prédicats sont poussés vers le bas.

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée.

More: