Empuje de predicados :
En SQL, cada vez que se utiliza una consulta que tiene condición de unión y de dónde, lo que sucede es que la unión se produce primero en todos los datos y, a continuación, se filtra en función de la condición de dónde. Qué sucederá si spark se comporta de la misma manera que SQL, para un conjunto de datos muy grande, la combinación llevaría varias horas de cómputo para unirse al conjunto de datos, ya que está sucediendo en el conjunto de datos sin filtrar, después de lo cual, de nuevo, se tarda varias horas en filtrar usando la condición where.
Empuje de predicado, el nombre en sí mismo se explica por sí mismo, el predicado generalmente es una condición where que devolverá Verdadero o Falso. Durante la fase de mapa, Spark empuja las condiciones de predicado directamente a la base de datos, filtra los datos a nivel de la base de datos utilizando las condiciones de predicado, lo que reduce los datos recuperados de la base de datos y mejora el rendimiento de la consulta. Dado que el filtrado se realiza en el propio almacén de datos, la consulta es muy rápida y, además, dado que el filtrado ya se ha realizado, evita la transferencia de datos sin filtrar a través de la red y ahora solo los datos filtrados se almacenan en la memoria.
Podemos usar el método explain para ver el plano físico del dataframe si se usa o no la pulsación del predicado. Los predicados deben fundirse en el tipo de datos correspondiente, de lo contrario, los predicados no funcionan.
>>> df = spark.read.parquet("file1").filter((F.col("date") >= "2019-08-01") & (F.col("date") <= "2019-09-01"))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#237) && (cast(date#237 as string) >= 2019-08-01)) && (cast(date#237 as string) <= 2019-09-01))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a..
En el ejemplo anterior, estoy tratando de filtrar un conjunto de datos en función del marco de tiempo, los filtros insertados mostrarán todos los predicados que deben realizarse sobre el conjunto de datos, en este ejemplo, dado que la fecha y hora no se moldea correctamente, los predicados mayores y menores que no se empujan hacia abajo al conjunto de datos.
>>> df = spark.read.parquet("file1").filter((F.col("date") >= datetime.strptime("2019-08-01", "%Y-%m-%d").date()) & (F.col("date") <= datetime.strptime("2019-09-01", "%Y-%m-%d").date()))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#273) && (date#273 >= 18109)) && (date#273 <= 18140))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<frame_id:string,id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a...
En el ejemplo anterior, la fecha se escribe correctamente en formato de fecha y hora, ahora en la explicación puede ver que los predicados se empujan hacia abajo.