Spark最適化テクニック

Predicate Pushdown:

SQLでは、joinとwhere条件の両方を持つクエリを使用するたびに、Joinが最初にデータ全体で発生し、where条件に基づいてフィル SparkがSQLと同じように動作するとどうなりますか、非常に巨大なデータセットの場合、フィルターされていないデータセットで発生するため、結合はデータセットを結合するために数時間の計算が必要になり、その後もwhere条件を使用してフィルタリングするのに数時間かかります。

述語プッシュダウン、名前自体は自明であり、述語は一般的にTrueまたはFalseを返すwhere条件です。 マップフェーズでは、sparkは述語条件を直接データベースにプッシュダウンし、述語条件を使用してデータベースレベルでデータ自体をフィルタリングするため、デー フィルタリングはデータストア自体で行われているため、クエリは非常に高速であり、フィルタリングがすでに行われているため、フィルタリングされていないデータをネットワーク経由で転送することはなくなり、フィルタリングされたデータのみがメモリに格納されます。
explainメソッドを使用して、述語pushdownが使用されているかどうかをdataframeの物理計画を確認できます。 述語は対応するデータ型にキャストする必要がありますが、そうでない場合は述語は機能しません。

>>> df = spark.read.parquet("file1").filter((F.col("date") >= "2019-08-01") & (F.col("date") <= "2019-09-01"))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#237) && (cast(date#237 as string) >= 2019-08-01)) && (cast(date#237 as string) <= 2019-09-01))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a..

上記の例では、時間枠に基づいてデータセットをフィルタリングしようとしています。datetimeが適切にキャストされていないため、この例では、datasetで実行する必要があるすべての述語が表示されます。greater-than述語とlesser than述語はdatasetにプッシュダウンされません。上記の例では、日付はDateTime形式に適切に型キャストされています。explainでは、述語がプッシュダウンされていることがわかりました。

コメントを残す

メールアドレスが公開されることはありません。

More: