Faísca Técnicas de Otimização

Predicado Pushdown :

Em SQL, sempre que você usar uma consulta que tem tanto de participar e onde a condição, o que acontece é Aderir a primeira acontece em toda a dados e, em seguida, a filtragem acontece com base na condição where. O que acontecerá se o spark se comportar da mesma forma que o SQL faz, para um conjunto de dados muito grande, a junção levaria várias horas de computação para se juntar ao conjunto de dados, uma vez que está acontecendo sobre o conjunto de dados não filtrados, após o que, novamente, leva várias horas para filtrar usando a condição de onde.

predicate pushdown, o nome em si é auto-explicativo, predicado é geralmente uma condição onde retornará verdadeiro ou Falso. Durante a fase de mapa o que spark faz é, ele empurra as condições de predicado diretamente para o banco de dados, filtra os dados no próprio nível de banco de dados usando as condições de predicado, reduzindo assim os dados recuperados do banco de dados e aumenta o desempenho da consulta. Uma vez que a filtragem está acontecendo na própria loja de dados, o questionamento é muito rápido e também uma vez que a filtragem já aconteceu, evita a transferência de dados não filtrados sobre a rede e agora apenas os dados filtrados são armazenados na memória.
podemos usar o método de explicação para ver o plano físico do dataframe se predicate pushdown é usado ou não. Predicados precisam ser rodados para o tipo de dados correspondente, se não, então predicados não funcionam.

>>> df = spark.read.parquet("file1").filter((F.col("date") >= "2019-08-01") & (F.col("date") <= "2019-09-01"))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#237) && (cast(date#237 as string) >= 2019-08-01)) && (cast(date#237 as string) <= 2019-09-01))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a..

No exemplo acima, eu estou tentando filtrar um conjunto de dados com base no quadro de horário, empurrado filtros irá exibir todos os predicados que precisam ser realizadas sobre o conjunto de dados, neste exemplo, pois DateTime não é adequadamente modelado de maior que e menor do que os predicados não são enviadas para o conjunto de dados.

>>> df = spark.read.parquet("file1").filter((F.col("date") >= datetime.strptime("2019-08-01", "%Y-%m-%d").date()) & (F.col("date") <= datetime.strptime("2019-09-01", "%Y-%m-%d").date()))>>> df.explain()
== Physical Plan ==
*(1) Project
+- *(1) Filter ((isnotnull(date#273) && (date#273 >= 18109)) && (date#273 <= 18140))
+- *(1) FileScan parquet Batched: true, Format: Parquet, Location: InMemoryFileIndex, PushedFilters: , ReadSchema: struct<frame_id:string,id:string,date:date,day_of_week:int,hour_of_day:int,impressions:bigint,a...

no exemplo acima, a data é corretamente digitada para o formato DateTime, agora na explicação você pode ver os predicados são empurrados para baixo.

Deixe uma resposta

O seu endereço de email não será publicado.

More: