PySpark
The PySpark integration relies on two components:
- The Spark jar, that ensures the Spark jobs are tracked and events are sent to Kensu. The jar has to be added to the SparkSession
- The Python kensu-py library, which includes a module to configure the parameters of the Spark jar and some other helpers like circuit breakers
The Python library is a prerequisite to include the PySpark agent. Please make sure to have installed the Python Kensu-py library.
To install the library, please execute this command in your Python environment:
Download a jar to match your Spark version:
Ideally it should be a full verison match, but often a major.minor match is good enough too (i.e. last version number may differ).
Before creating the configuration file, make sure you have retrieved your Kensu Ingestion URL and Token.
To include the PySpark agent inside, you need to:
- Create a configuration file conf.ini.
- Add the jar to the spark session. To do so, include the path of the jar in the configuration of the spark session by adding .config("spark.driver.extraClassPath", "<PATH_TO_JAR>") in the definition of the session
- Import the Kensu pyspark module and initialize the tracker with init_kensu_spark(spark)
Here is a summary of the code modification you need to perform to include pyspark agent:
The Spark agent can also be included during the spark-submit command execution:
Here the conf.ini template:
You will find below this page the list of custom parameters you can add to this configuration file.
If the relative path of the conf.ini file is different, please register an environmental variable called KSU_CONF_FILE with the value being the relative or absolute path of the configuration file.
All the properties mentioned have their environment variable counterpart with KSU_ prefix, e.g. property project_name has KSU_PROJECT_NAME environment variable. Furthermore the properties can be specified also during the Kensu initialization.
The order of precedence for the properties is the following:
- Environment variable
- The argument passed to init_kensu_spark method
- Configuration file
- Default value
The basic parameters of the configuration file are the following:
Name | Definition | Example | Default Value | Comments |
kensu_ingestion_url | The URL of the Kensu Ingestion API | "https://community-api.kensuapp.com" | None | Remove the final "/" (i.e. "https://community-api.kensuapp.com/" is wrong) |
kensu_ingestion_token | The token used to identify on Ingestion API | "*******" | None | |
kensu_api_token | The token used to identify on Expectation API | "*******" | None | |
project_name | The project in which the application is running, used to group multiple applications in Kensu | "Recommendation Pipeline" | None |
|
process_name | An explicit name for your application, script or notebook | "Data_Preparation.py" | If the script is launched at the command line, it takes the Spark application name | Default for Databricks: notebook path |
environment | The Environment in which the application is running | "Staging" | None | |
user_name | The user launching the script | "Root" | None | Default for Databricks: user |
code_location | The location of the code | "user://path-to-file" | None | Default for Databricks: notebook path |
report_to_file | A boolean flag, activates the offline ingestion reporting the logs to a file | True | False | |
offline_file_name | The name of the offline file in case report_to_file=True | "logfile.log" | "kensu-offline.log" | |
compute_stats | Turns on/off the automatic computation about data accessed by Spark | | True | |
compute_input_stats | Turns on/off the automatic data statistics computation for the input data sources. | | True | |
compute_output_stats | Turns on/off the automatic data statistics computation for the output data sources. | | True | |
Metrics computation in Kensu PySpark agent can be controlled by using these parameters in the conf.ini file as described below. Metrics computation can also be fine-tuned for each Logical Datasource usage & Application in the Kensu web app.
In Kensu web app, by going to Logical Datasource summary page, you may choose which statistics for which columns to compute or disable them for each individual datasource access in a given application.
Remote conf takes priority over conf.ini
By default, the remote configuration is enabled, and if configured on the Kensu app, it would override the behaviour specified in conf.inibut only for that specific Application & Datasource.
How to disable remote conf?
If for any reason, you want to use only the conf.ini and disable usage of the remote configuration (e.g. for testing new global settings), set remote_conf_enabled=False parameter.
How to debug, if needed?
Actual metrics configuration used for each datasource is printed in Kensu logs during metrics computation, indicating which configuration source was used for each datasource
Name | Definition | Default Value |
input_stats_for_only_used_columns | If turned on, the data stats computation happens only for columns (schema fields) that are used in the lineage | True |
input_stats_compute_quantiles | Boolean that activates the computation of the quantiles statistics on the input data sources (25/50/75%) | False |
input_stats_cache_by_path | Boolean to cache the statistics computed for an input data source if it is used in several lineages | True |
input_stats_coalesce_enabled | Boolean that limits the number of spark partitions allocated to the computation of the input stats | True |
input_stats_coalesce_workers | Number of spark partitions allocated to the computations of input stats. Needs to be activated by input_stats_coalesce_enabled . See Spark docs on coalesce. The optimal value may depend on the size of your data, however as this affect only number of partitions which "collect" the aggregation results, often a large number is not needed here | 1 |
output_stats_compute_quantiles | Boolean that activates the computation of the quantiles statistics on the output data sources (25/50/75%) | False |
output_stats_cache_by_path | Boolean to cache the statistics computed for an output data source if it is used in several lineages | False |
output_stats_coalesce_enabled | Boolean that limits the number of spark partitions allocated to the computation of the output stats | False |
output_stats_coalesce_workers | Number of spark partitions used for computations of output stats. Needs to be activated by output_stats_coalesce_enabled . See Spark docs on coalesce. The optimal value may depend on the size of your data, however as this affect only number of partitions which "collect" the aggregation results, often a large number is not needed here | 100 |
enable_collector_log_file | Boolean that will create a file containing Kensu agent debug logs | False |
collector_log_include_spark_logs | Boolean that will also include the log messages from Apapche Spark to the log file if activated by enable_collector_log_file | False |
collector_log_level | Sets the Log Level for the debug log file. Needs to be activated by enable_collector_log_file | "INFO" |
disable_spark_writes | Boolean that will send metadata to Kensu without writing to the data sources (affects only the writes performed via DataFrame writer, i.e. via df.write. ) | False |
execution_timestamp | EPOCH ms timestamp that allows to override the execution time of the application. | current timestamp |
output_stats_compute_null_rows | Decides if to compute column.{nullrows|nrows}by default (does not affect explicit config in Kensu UI) | True |
output_stats_compute_numeric_stats_columns_limit | If there are more numeric columns than this limit, the numeric stats won't be computed (unless explicitly enabled by remote configuration) | 50 |
output_stats_compute_null_rows_columns_limit | If there are more columns than this limit, the column.{nullrows|nrows} metrics won't be computed (unless explicitly enabled by remote configuration) | 100 |
Add the following code, at your PySpark at places where you want the circuit breaking to happen.
If not already done, define some rules in Kensu app, and mark some of them as circuit-breaker for the given Spark app.
The circuit breaker will kill the Spark application if any of the datasources that were already accessed (i.e. read or written) by Spark has any open tickets due to broken circuit breaker rules in Kensu at Logical datasource level for any of the previous runs (only if the associated circuit breaker rule is activated for the current Spark app).
If you think that the issue with previous circuit breaker was resolved (let's say by new code or the data was fixed), you still have to mark the ticket in Kensu as resolved, otherwise the old ticket will still trigger the circuit breaker because circuit breaker checks are global (over metrics of all previous runs, not just the latest run)
If you have more complex use-case, you may also perform your own handling by just checking the circuit breaker status. Don't forget to properly shutdown spark & kensu spark agent if you need to quit the application.
Name | Definition | Default Value |
remote_circuit_breaker_enabled | This controls if circuit breaker is activated. P.S. Circuit breaker will run, only if you add the code as described above | True |
remote_circuit_breaker_precheck_delay_secs | Delay to wait for recently sent metrics to be processed by Kensu backend | 300 |
|
|
|
Kensu tracks every data source based on its location (i.e. full path of a file dbfs:/FileStore/Financial_Data_Report/2021/dec/AppTech.csv), and there are different option to choose how to name the data source. In the following example only the last folder and filename is used:
In addition, Kensu introduces also the concept of "logical" data sources, this can be useful to group data sources (that are actually partitions) to be part of the same logical data source. As an example of logical data source usage, consider to have these physical paths:
- dbfs://application/OUTPUT/2022/Customers
- dbfs://application/OUTPUT/2023/Customers
In this case these 2 data sources can be grouped into dbfs://application/OUTPUT/Customers , this can be achieved with the available logical_data_source_naming_strategy options:
Name | Definition | Default Value |
shorten_data_source_names | Boolean to indicate if shorten or not the data source name | True |
data_source_naming_strategy | Used if shorten_data_source_names=True. Defines the strategy for building the short DS names. Available values:
| LastFolderAndFile |
data_source_naming_strategy_rules | Used if data_source_naming_strategy=PathBasedRule. Allows to customise the strategy for a given DS path by providing a ; separated list of rules. | |
logical_data_source_naming_strategy | Defines the strategy for building the logical DS names. Available:
| |
logical_data_source_naming_strategy_rules | Used if logical_data_source_naming_strategy=PathBasedRule Allows to customise the strategy for a given logical DS path by providing a ; separated list of rules. | |
Assuming the original filepath is dbfs:/FileStore/Financial_Data_Report/2021/dec/AppTech.csv, these are the results of the different options:
Option | Result |
File | AppTech.csv |
LastFolderAndFile | dec/AppTech.csv |
LastTwoFoldersAndFile | 2021/dec/AppTech.csv |
LastFolder | dec |
PathBasedRule | See data_source_naming_strategy_rules option |
The data_source_naming_strategy_rules allows to set multiple rules (separated by ;). The rules could be path specific (i.e. dbfs->>LastFolderAndFile) or using regular expression in the following format: regexp-match:<regular expression>->>replace:<replace expression>
Consider to have in one application the following data sources:
- Spark Catalog: default.financial_data_monthly_assets_demo1
- dbfs:/FileStore/Financial_Data_Report/2021/dec/AppTech.csv
Applying the following rules:
data_source_naming_strategy_rules=regexp-match:Spark Catalog: default.(.*)->>replace:database:$1;dbfs->>LastTwoFoldersAndFile
This is the result:
- database:financial_data_monthly_assets_demo1
- 2021/dec/AppTech.csv
Because the first rule is a regular expression with a substitution and the second one is a default path search with a standard strategy option. The first expression that matches is taken into account.
If no expression matches, the default strategy is File.
Similarly for logical_data_source_naming_strategy_rules if we have two data sources:
- dbfs://application/OUTPUT/2022/Customers
- dbfs://application/OUTPUT/2023/Customers
to obtain that these 2 data sources are grouped under dbfs://application/OUTPUT/Customers this configuration should be added:
logical_data_source_naming_strategy_rules=regexp-match:(.*)/\\d*/(.*)->>replace:$1/$2
Name | Definition | Default Value |
shutdown_timeout_sec | The maximum time in seconds to wait for the collector to shutdown. This is the time to finish entities/stats computation and publishing as well deallocate all the used resources. | 300 |
offline_report_dir_path | The directory path to the place where offline report will be stored. | |
kensu_api_verify_ssl | Turns on/off the certificate validation for the Kensu Ingestion and Services API calls. | True |
enable_entity_compaction | Turns on/off the compaction of the entities, replacement of descriptive PKs to compacted stable hashes. | True |
disable_column_lineage | Turns on/off the computation of the column lineage | False |
missing_column_lineage_strategy | Options:
| |
column_lineage_max_bytes_size | If the estimated lineage size is greater than column_lineage_max_bytes_size, the lineage is trimmed | 20000000 |
schema_max_fields_number | The maximum number of fields/columns to report for each Partition/DS schema. | 300 |
stats_ingestion_batch_size | The maximum number of datasources to report the statistics for per ingestion reporting batch (i.e. per API request) | 50 |
stats_wait_for_ingestion_response | If True, waits (blocks) for the response from the ingestion of data stats. Useful for logging/debugging purposes | True |
stats_wait_for_ingestion_response_seconds | The maximum of time to wait (block) for the ingestion API reposnse when reporting data stats | 300 |
- When does the PySpark agent send data to Kensu?
The PySpark agent sends metadata in batches to Kensu each time a write operation of a data source is finished successfully, whatever the format.
e.g.: spark.write.save() operations
P.S. we currently do not report anything when such write operation that did not happen due to say error in code or a Spark/cluster error, this is so that records in Kensu match the reallity
- The agent does not detect the correct type of the data field in order to compute numerical or categorical statistics, what should I do?
The Kensu PySpark agent relies on the schema of the data source in order to define the set of automatic metrics it has to retrieve. To make sure the data type is well inferred, you need to add the inferschema option while reading some schema-less data source types like CSV.
e.g. df = spark.read.option("inferschema","true").csv("/path/to/ds")
- Metadata is not sent to Kensu, what is the issue?
- Please make sure that the API token and URL are correct (we have logs that verify the connection status when Kensu agent starts), the URL should be written in this format https://community-api.kensuapp.com , without any / at the end which is often added by copy/paste operation.
- Please check if the offline mode is not activated.
- If you're not using the Kensu Community Edition, check with your provider that the Kensu set-up doesn't require any custom certificate.
- If this did not solve your issue, please contact our support [email protected] for further analysis.
- Logs are not written in the log file, why?
Whether the logs are written in the log file, depends on the log level of your spark session. A lot of issues with the log file can be troubleshot by adding the following line before calling the init_kensu_spark() function:
- Install the python and spark client
2. Create a Configuration file
3. Initialize the Kensu agent in the your Spark job script