Integration
Agents

PySpark

30min

PySpark agent: an overview

The PySpark integration relies on two components:

  1. The Spark jar, that ensures the Spark jobs are tracked and events are sent to Kensu. The jar has to be added to the SparkSession
  2. The Python kensu-py library, which includes a module to configure the parameters of the Spark jar and some other helpers like circuit breakers

Prerequisites and Installation

Install kensu python lib

The Python library is a prerequisite to include the PySpark agent. Please make sure to have installed the Python Kensu-py library.

To install the library, please execute this command in your Python environment:

Shell


Download spark agent jar

Download a jar to match your Spark version:

Ideally it should be a full verison match, but often a major.minor match is good enough too (i.e. last version number may differ).

Configuration

Before creating the configuration file, make sure you have retrieved your Kensu Ingestion URL and Token.

To include the PySpark agent inside, you need to:

  1. Create a configuration file conf.ini.
  2. Add the jar to the spark session. To do so, include the path of the jar in the configuration of the spark session by adding .config("spark.driver.extraClassPath", "<PATH_TO_JAR>") in the definition of the session
  3. Import the Kensu pyspark module and initialize the tracker with init_kensu_spark(spark)

Here is a summary of the code modification you need to perform to include pyspark agent:

Python


The Spark agent can also be included during the spark-submit command execution:

CMD


Here the conf.ini template:

conf.ini


You will find below this page the list of custom parameters you can add to this configuration file.

If the relative path of the conf.ini file is different, please register an environmental variable called KSU_CONF_FILE with the value being the relative or absolute path of the configuration file.

All the properties mentioned have their environment variable counterpart with KSU_ prefix, e.g. property project_name has KSU_PROJECT_NAME environment variable. Furthermore the properties can be specified also during the Kensu initialization.

The order of precedence for the properties is the following:

  1. Environment variable
  2. The argument passed to init_kensu_spark method
  3. Configuration file
  4. Default value

Parameters

The basic parameters of the configuration file are the following:

Name

Definition

Example

Default Value

Comments

kensu_ingestion_url

The URL of the Kensu Ingestion API

"https://community-api.kensuapp.com"

None

Remove the final "/" (i.e. "https://community-api.kensuapp.com/" is wrong)

kensu_ingestion_token

The token used to identify on Ingestion API

"*******"

None



kensu_api_token

The token used to identify on Expectation API

"*******"

None



project_name

The project in which the application is running, used to group multiple applications in Kensu

"Recommendation Pipeline"

None

process_name

An explicit name for your application, script or notebook

"Data_Preparation.py"

If the script is launched at the command line, it takes the Spark application name

Default for Databricks: notebook path

environment

The Environment in which the application is running

"Staging"

None



user_name

The user launching the script

"Root"

None

Default for Databricks: user

code_location

The location of the code

"user://path-to-file"

None If GitPython is installed, the git location of the code

Default for Databricks: notebook path

report_to_file

A boolean flag, activates the offline ingestion reporting the logs to a file

True

False



offline_file_name

The name of the offline file in case report_to_file=True

"logfile.log"

"kensu-offline.log"



compute_stats

Turns on/off the automatic computation about data accessed by Spark



True



compute_input_stats

Turns on/off the automatic data statistics computation for the input data sources.



True



compute_output_stats

Turns on/off the automatic data statistics computation for the output data sources.



True



Metrics configuration

Metrics computation in Kensu PySpark agent can be controlled by using these parameters in the conf.ini file as described below. Metrics computation can also be fine-tuned for each Logical Datasource usage & Application in the Kensu web app.

Fine-tune statistics computation from Kensu app (Remote configuration)

In Kensu web app, by going to Logical Datasource summary page, you may choose which statistics for which columns to compute or disable them for each individual datasource access in a given application.

Click here to configure the statistics computation or circuit breaker
Click here to configure the statistics computation or circuit breaker

Select statistics to compute for a given app & datasource
Select statistics to compute for a given app & datasource


Remote conf takes priority over conf.ini

By default, the remote configuration is enabled, and if configured on the Kensu app, it would override the behaviour specified in conf.inibut only for that specific Application & Datasource.

How to disable remote conf?

If for any reason, you want to use only the conf.ini and disable usage of the remote configuration (e.g. for testing new global settings), set remote_conf_enabled=False parameter.

How to debug, if needed?

Actual metrics configuration used for each datasource is printed in Kensu logs during metrics computation, indicating which configuration source was used for each datasource

Metrics config parameters

Name

Definition

Default Value

input_stats_for_only_used_columns

If turned on, the data stats computation happens only for columns (schema fields) that are used in the lineage

True

input_stats_compute_quantiles

Boolean that activates the computation of the quantiles statistics on the input data sources (25/50/75%)

False

input_stats_cache_by_path

Boolean to cache the statistics computed for an input data source if it is used in several lineages

True

input_stats_coalesce_enabled

Boolean that limits the number of spark partitions allocated to the computation of the input stats

True

input_stats_coalesce_workers

Number of spark partitions allocated to the computations of input stats. Needs to be activated by input_stats_coalesce_enabled .

See Spark docs on coalesce. The optimal value may depend on the size of your data, however as this affect only number of partitions which "collect" the aggregation results, often a large number is not needed here

1

output_stats_compute_quantiles

Boolean that activates the computation of the quantiles statistics on the output data sources (25/50/75%)

False

output_stats_cache_by_path

Boolean to cache the statistics computed for an output data source if it is used in several lineages

False

output_stats_coalesce_enabled

Boolean that limits the number of spark partitions allocated to the computation of the output stats

False

output_stats_coalesce_workers

Number of spark partitions used for computations of output stats. Needs to be activated by output_stats_coalesce_enabled .

See Spark docs on coalesce. The optimal value may depend on the size of your data, however as this affect only number of partitions which "collect" the aggregation results, often a large number is not needed here

100

enable_collector_log_file

Boolean that will create a file containing Kensu agent debug logs

False

collector_log_include_spark_logs

Boolean that will also include the log messages from Apapche Spark to the log file if activated by enable_collector_log_file

False

collector_log_level

Sets the Log Level for the debug log file. Needs to be activated by enable_collector_log_file

"INFO"

disable_spark_writes

Boolean that will send metadata to Kensu without writing to the data sources (affects only the writes performed via DataFrame writer, i.e. via df.write. )

False

execution_timestamp

EPOCH ms timestamp that allows to override the execution time of the application.

current timestamp

output_stats_compute_null_rows

Decides if to compute column.{nullrows|nrows}by default (does not affect explicit config in Kensu UI)

True

output_stats_compute_numeric_stats_columns_limit

If there are more numeric columns than this limit, the numeric stats won't be computed (unless explicitly enabled by remote configuration)

50

output_stats_compute_null_rows_columns_limit

If there are more columns than this limit, the column.{nullrows|nrows} metrics won't be computed (unless explicitly enabled by remote configuration)

100

Circuit breaker

Enabling circuit breaker & its semantics

Add the following code, at your PySpark at places where you want the circuit breaking to happen.

Python


If not already done, define some rules in Kensu app, and mark some of them as circuit-breaker for the given Spark app.

The circuit breaker will kill the Spark application if any of the datasources that were already accessed (i.e. read or written) by Spark has any open tickets due to broken circuit breaker rules in Kensu at Logical datasource level for any of the previous runs (only if the associated circuit breaker rule is activated for the current Spark app).

If you think that the issue with previous circuit breaker was resolved (let's say by new code or the data was fixed), you still have to mark the ticket in Kensu as resolved, otherwise the old ticket will still trigger the circuit breaker because circuit breaker checks are global (over metrics of all previous runs, not just the latest run)

Customizing logic for handling failures

If you have more complex use-case, you may also perform your own handling by just checking the circuit breaker status. Don't forget to properly shutdown spark & kensu spark agent if you need to quit the application.

Python


Configuration

Name

Definition

Default Value

remote_circuit_breaker_enabled

This controls if circuit breaker is activated. P.S. Circuit breaker will run, only if you add the code as described above

True

remote_circuit_breaker_precheck_delay_secs

Delay to wait for recently sent metrics to be processed by Kensu backend

300

delay_before_waiting_for_kensu_agent_to_complete_secs

A small delay to account for asynchronous nature of Spark QueryListener

5

Data sources naming convention

Kensu tracks every data source based on its location (i.e. full path of a file dbfs:/FileStore/Financial_Data_Report/2021/dec/AppTech.csv), and there are different option to choose how to name the data source. In the following example only the last folder and filename is used:

Document image


In addition, Kensu introduces also the concept of "logical" data sources, this can be useful to group data sources (that are actually partitions) to be part of the same logical data source. As an example of logical data source usage, consider to have these physical paths:

  • dbfs://application/OUTPUT/2022/Customers
  • dbfs://application/OUTPUT/2023/Customers

In this case these 2 data sources can be grouped into dbfs://application/OUTPUT/Customers , this can be achieved with the available logical_data_source_naming_strategy options:

Name

Definition

Default Value

shorten_data_source_names

Boolean to indicate if shorten or not the data source name

True

data_source_naming_strategy

Used if shorten_data_source_names=True. Defines the strategy for building the short DS names.

Available values:

  • File
  • LastFolderAndFile
  • LastTwoFoldersAndFile
  • LastFolder
  • PathBasedRule

LastFolderAndFile

data_source_naming_strategy_rules

Used if data_source_naming_strategy=PathBasedRule.

Allows to customise the strategy for a given DS path by providing a ; separated list of rules.



logical_data_source_naming_strategy

Defines the strategy for building the logical DS names.

Available:

  • File
  • LastFolderAndFile
  • LastTwoFoldersAndFile
  • LastFolder
  • PathBasedRule



logical_data_source_naming_strategy_rules

Used if logical_data_source_naming_strategy=PathBasedRule

Allows to customise the strategy for a given logical DS path by providing a ; separated list of rules.



Example of naming strategy options:

Assuming the original filepath is dbfs:/FileStore/Financial_Data_Report/2021/dec/AppTech.csv, these are the results of the different options:

Option

Result

File

AppTech.csv

LastFolderAndFile

dec/AppTech.csv

LastTwoFoldersAndFile

2021/dec/AppTech.csv

LastFolder

dec

PathBasedRule

See data_source_naming_strategy_rules option

The data_source_naming_strategy_rules allows to set multiple rules (separated by ;). The rules could be path specific (i.e. dbfs->>LastFolderAndFile) or using regular expression in the following format: regexp-match:<regular expression>->>replace:<replace expression>

Consider to have in one application the following data sources:

  • Spark Catalog: default.financial_data_monthly_assets_demo1
  • dbfs:/FileStore/Financial_Data_Report/2021/dec/AppTech.csv

Applying the following rules:

data_source_naming_strategy_rules=regexp-match:Spark Catalog: default.(.*)->>replace:database:$1;dbfs->>LastTwoFoldersAndFile

This is the result:

  • database:financial_data_monthly_assets_demo1
  • 2021/dec/AppTech.csv

Because the first rule is a regular expression with a substitution and the second one is a default path search with a standard strategy option. The first expression that matches is taken into account.

If no expression matches, the default strategy is File.

Similarly for logical_data_source_naming_strategy_rules if we have two data sources:

  • dbfs://application/OUTPUT/2022/Customers
  • dbfs://application/OUTPUT/2023/Customers

to obtain that these 2 data sources are grouped under dbfs://application/OUTPUT/Customers this configuration should be added:

logical_data_source_naming_strategy_rules=regexp-match:(.*)/\\d*/(.*)->>replace:$1/$2

Advanced parameters

Name

Definition

Default Value

shutdown_timeout_sec

The maximum time in seconds to wait for the collector to shutdown. This is the time to finish entities/stats computation and publishing as well deallocate all the used resources.

300

offline_report_dir_path

The directory path to the place where offline report will be stored.



kensu_api_verify_ssl

Turns on/off the certificate validation for the Kensu Ingestion and Services API calls.

True

enable_entity_compaction

Turns on/off the compaction of the entities, replacement of descriptive PKs to compacted stable hashes.

True

disable_column_lineage

Turns on/off the computation of the column lineage

False

missing_column_lineage_strategy

Options:

  • CaseInsensitiveNameOrAll2AllMatcherStrat
  • ColumnLineageFullyDisabledStat
  • AllToAllLineageStrat
  • CaseInsensitiveColumnNameMatcherStrat
  • CaseSensitiveColumnNameMatcherStrat
  • OutFieldEndsWithInFieldNameLineageStrat
  • OutFieldStartsWithInFieldNameLineageStrat



column_lineage_max_bytes_size

If the estimated lineage size is greater than column_lineage_max_bytes_size, the lineage is trimmed

20000000

schema_max_fields_number

The maximum number of fields/columns to report for each Partition/DS schema.

300

stats_ingestion_batch_size

The maximum number of datasources to report the statistics for per ingestion reporting batch (i.e. per API request)

50

stats_wait_for_ingestion_response

If True, waits (blocks) for the response from the ingestion of data stats. Useful for logging/debugging purposes

True

stats_wait_for_ingestion_response_seconds

The maximum of time to wait (block) for the ingestion API reposnse when reporting data stats

300

Troubleshoot

  • When does the PySpark agent send data to Kensu?

The PySpark agent sends metadata in batches to Kensu each time a write operation of a data source is finished successfully, whatever the format.

e.g.: spark.write.save() operations

P.S. we currently do not report anything when such write operation that did not happen due to say error in code or a Spark/cluster error, this is so that records in Kensu match the reallity

  • The agent does not detect the correct type of the data field in order to compute numerical or categorical statistics, what should I do?

The Kensu PySpark agent relies on the schema of the data source in order to define the set of automatic metrics it has to retrieve. To make sure the data type is well inferred, you need to add the inferschema option while reading some schema-less data source types like CSV.

e.g. df = spark.read.option("inferschema","true").csv("/path/to/ds")

  • Metadata is not sent to Kensu, what is the issue?
  1. Please make sure that the API token and URL are correct (we have logs that verify the connection status when Kensu agent starts), the URL should be written in this format https://community-api.kensuapp.com , without any / at the end which is often added by copy/paste operation.
  2. Please check if the offline mode is not activated.
  3. If you're not using the Kensu Community Edition, check with your provider that the Kensu set-up doesn't require any custom certificate.
  4. If this did not solve your issue, please contact our support [email protected] for further analysis.
  • Logs are not written in the log file, why?

Whether the logs are written in the log file, depends on the log level of your spark session. A lot of issues with the log file can be troubleshot by adding the following line before calling the init_kensu_spark() function:

Python


Summary

  1. Install the python and spark client
Shell


2. Create a Configuration file

conf.ini


3. Initialize the Kensu agent in the your Spark job script

Python