Integration
Agents
PySpark
30 min
pyspark agent an overview the pyspark integration relies on two components the spark jar, that ensures the spark jobs are tracked and events are sent to kensu the jar has to be added to the sparksession the python kensu py library, which includes a module to configure the parameters of the spark jar and some other helpers like circuit breakers prerequisites and installation install kensu python lib the python library is a prerequisite to include the pyspark agent please make sure to have installed the python kensu py library to install the library, please execute this command in your python environment pip install kensu>=2 6 9 download spark agent jar download a jar to match your spark version kensu spark collector 1 3 0 spark 3 4 0 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 4 0 jar kensu spark collector 1 3 0 spark 3 3 1 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 3 1 jar kensu spark collector 1 3 0 spark 3 3 2 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 3 2 jar kensu spark collector 1 3 0 spark 3 3 0 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 3 0 jar kensu spark collector 1 3 0 spark 3 2 1 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 2 1 jar kensu spark collector 1 3 0 spark 3 1 1 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 1 1 jar kensu spark collector 1 3 0 spark 3 1 2 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 1 2 jar kensu spark collector 1 3 0 spark 3 0 1 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 0 1 jar kensu spark collector 1 3 0 spark 2 4 0 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 2 4 0 jar ideally it should be a full verison match, but often a major minor match is good enough too (i e last version number may differ) configuration before creating the configuration file, make sure you have retrieved your kensu ingestion url and token to include the pyspark agent inside, you need to create a configuration file conf ini add the jar to the spark session to do so, include the path of the jar in the configuration of the spark session by adding config("spark driver extraclasspath", "\<path to jar>") in the definition of the session import the kensu pyspark module and initialize the tracker with init kensu spark(spark) here is a summary of the code modification you need to perform to include pyspark agent from pyspark sql import sparksession \# add the path to the jar to the sparksession spark = sparksession builder appname("example") \\ config("spark driver extraclasspath", "\<kensu spark agent> jar")\\ getorcreate() \# import the init function from kensu pyspark import init kensu spark \# init kensu init kensu spark( spark session=spark, project name="my first kensu project", shutdown timeout sec=10 60, process name="my first kensu application", \# add here further parameters ) \# application code the spark agent can also be included during the spark submit command execution cmd spark submit jars \<kensu spark agent> jar main py here the conf ini template conf ini \[kensu] ;kensu api endpoint kensu ingestion url=\<insert the ingestion url here> ;token for kensu ingestion api kensu ingestion token=\<insert your ingestion token here> compute stats=false compute input stats=false compute output stats=false you will find below this page the list of custom parameters you can add to this configuration file if the relative path of the conf ini file is different, please register an environmental variable called ksu conf file with the value being the relative or absolute path of the configuration file all the properties mentioned have their environment variable counterpart with ksu prefix, e g property project name has ksu project name environment variable furthermore the properties can be specified also during the kensu initialization the order of precedence for the properties is the following environment variable the argument passed to init kensu spark method configuration file default value parameters the basic parameters of the configuration file are the following name definition example default value comments kensu ingestion url the url of the kensu ingestion api "https //community api kensuapp com" none remove the final "/" (i e "https //community api kensuapp com/" is wrong) kensu ingestion token the token used to identify on ingestion api " " none kensu api token the token used to identify on expectation api " " none project name the project in which the application is running, used to group multiple applications in kensu "recommendation pipeline" none process name an explicit name for your application, script or notebook "data preparation py" if the script is launched at the command line, it takes the spark application name default for databricks notebook path environment the environment in which the application is running "staging" none user name the user launching the script "root" none default for databricks user code location the location of the code "user //path to file" none if gitpython is installed, the git location of the code default for databricks notebook path report to file a boolean flag, activates the offline ingestion reporting the logs to a file true false offline file name the name of the offline file in case report to file=true "logfile log" "kensu offline log" compute stats turns on/off the automatic computation about data accessed by spark true compute input stats turns on/off the automatic data statistics computation for the input data sources true compute output stats turns on/off the automatic data statistics computation for the output data sources true metrics configuration metrics computation in kensu pyspark agent can be controlled by using these parameters in the conf ini file as described below metrics computation can also be fine tuned for each logical datasource usage & application in the kensu web app fine tune statistics computation from kensu app (remote configuration) in kensu web app, by going to logical datasource summary page, you may choose which statistics for which columns to compute or disable them for each individual datasource access in a given application remote conf takes priority over conf ini by default, the remote configuration is enabled, and if configured on the kensu app, it would override the behaviour specified in conf ini but only for that specific application & datasource how to disable remote conf? if for any reason, you want to use only the conf ini and disable usage of the remote configuration (e g for testing new global settings), set remote conf enabled=false parameter how to debug, if needed? actual metrics configuration used for each datasource is printed in kensu logs during metrics computation, indicating which configuration source was used for each datasource metrics config parameters name definition default value input stats for only used columns if turned on, the data stats computation happens only for columns (schema fields) that are used in the lineage true input stats compute quantiles boolean that activates the computation of the quantiles statistics on the input data sources (25/50/75%) false input stats cache by path boolean to cache the statistics computed for an input data source if it is used in several lineages true input stats coalesce enabled boolean that limits the number of spark partitions allocated to the computation of the input stats true input stats coalesce workers number of spark partitions allocated to the computations of input stats needs to be activated by input stats coalesce enabled see spark docs on coalesce the optimal value may depend on the size of your data, however as this affect only number of partitions which "collect" the aggregation results, often a large number is not needed here 1 output stats compute quantiles boolean that activates the computation of the quantiles statistics on the output data sources (25/50/75%) false output stats cache by path boolean to cache the statistics computed for an output data source if it is used in several lineages false output stats coalesce enabled boolean that limits the number of spark partitions allocated to the computation of the output stats false output stats coalesce workers number of spark partitions used for computations of output stats needs to be activated by output stats coalesce enabled see spark docs on coalesce the optimal value may depend on the size of your data, however as this affect only number of partitions which "collect" the aggregation results, often a large number is not needed here 100 enable collector log file boolean that will create a file containing kensu agent debug logs false collector log include spark logs boolean that will also include the log messages from apapche spark to the log file if activated by enable collector log file false collector log level sets the log level for the debug log file needs to be activated by enable collector log file "info" disable spark writes boolean that will send metadata to kensu without writing to the data sources (affects only the writes performed via dataframe writer, i e via df write ) false execution timestamp epoch ms timestamp that allows to override the execution time of the application current timestamp output stats compute null rows decides if to compute column {nullrows|nrows} by default (does not affect explicit config in kensu ui) true output stats compute numeric stats columns limit if there are more numeric columns than this limit, the numeric stats won't be computed (unless explicitly enabled by remote configuration) 50 output stats compute null rows columns limit if there are more columns than this limit, the column {nullrows|nrows} metrics won't be computed (unless explicitly enabled by remote configuration) 100 circuit breaker enabling circuit breaker & its semantics add the following code, at your pyspark at places where you want the circuit breaking to happen from kensu utils remote circuit breakers import kill app on circuit breaker kill app on circuit breaker(spark=spark) if not already done, define some rules in kensu app, and mark some of them as circuit breaker for the given spark app the circuit breaker will kill the spark application if any of the datasources that were already accessed (i e read or written) by spark has any open tickets due to broken circuit breaker rules in kensu at logical datasource level for any of the previous runs (only if the associated circuit breaker rule is activated for the current spark app) if you think that the issue with previous circuit breaker was resolved (let's say by new code or the data was fixed), you still have to mark the ticket in kensu as resolved, otherwise the old ticket will still trigger the circuit breaker because circuit breaker checks are global (over metrics of all previous runs, not just the latest run) customizing logic for handling failures if you have more complex use case, you may also perform your own handling by just checking the circuit breaker status don't forget to properly shutdown spark & kensu spark agent if you need to quit the application \# if you still want to perform some custom python logic in case \# the circuit breaker failed, you may call kensu circuit breaker \# with sys exit=false param from kensu utils remote circuit breakers import kill app on circuit breaker try kill app on circuit breaker( spark=spark, stop spark=false, # or true, if will want to quit the spark app immediately sys exit=false, \# ) except assertionerror \# handle the circuit breaker pass configuration name definition default value remote circuit breaker enabled this controls if circuit breaker is activated p s circuit breaker will run, only if you add the code as described above true remote circuit breaker precheck delay secs delay to wait for recently sent metrics to be processed by kensu backend 300 delay before waiting for kensu agent to complete secs a small delay to account for asynchronous nature of spark querylistener 5 data sources naming convention kensu tracks every data source based on its location (i e full path of a file dbfs\ /filestore/financial data report/2021/dec/apptech csv ), and there are different option to choose how to name the data source in the following example only the last folder and filename is used in addition, kensu introduces also the concept of "logical" data sources, this can be useful to group data sources (that are actually partitions) to be part of the same logical data source as an example of logical data source usage, consider to have these physical paths dbfs\ //application/output/2022/customers dbfs\ //application/output/2023/customers in this case these 2 data sources can be grouped into dbfs\ //application/output/customers , this can be achieved with the available logical data source naming strategy options name definition default value shorten data source names boolean to indicate if shorten or not the data source name true data source naming strategy used if shorten data source names=true defines the strategy for building the short ds names available values file lastfolderandfile lasttwofoldersandfile lastfolder pathbasedrule lastfolderandfile data source naming strategy rules used if data source naming strategy=pathbasedrule allows to customise the strategy for a given ds path by providing a ; separated list of rules logical data source naming strategy defines the strategy for building the logical ds names available file lastfolderandfile lasttwofoldersandfile lastfolder pathbasedrule logical data source naming strategy rules used if logical data source naming strategy=pathbasedrule allows to customise the strategy for a given logical ds path by providing a ; separated list of rules example of naming strategy options assuming the original filepath is dbfs\ /filestore/financial data report/2021/dec/apptech csv , these are the results of the different options option result file apptech csv lastfolderandfile dec/apptech csv lasttwofoldersandfile 2021/dec/apptech csv lastfolder dec pathbasedrule see data source naming strategy rules option the data source naming strategy rules allows to set multiple rules (separated by ; ) the rules could be path specific (i e dbfs >>lastfolderandfile ) or using regular expression in the following format regexp match \<regular expression> >>replace \<replace expression> consider to have in one application the following data sources spark catalog default financial data monthly assets demo1 dbfs\ /filestore/financial data report/2021/dec/apptech csv applying the following rules data source naming strategy rules=regexp match\ spark catalog default ( ) >>replace\ database $1;dbfs >>lasttwofoldersandfile this is the result database\ financial data monthly assets demo1 2021/dec/apptech csv because the first rule is a regular expression with a substitution and the second one is a default path search with a standard strategy option the first expression that matches is taken into account if no expression matches, the default strategy is file similarly for logical data source naming strategy rules if we have two data sources dbfs\ //application/output/2022/customers dbfs\ //application/output/2023/customers to obtain that these 2 data sources are grouped under dbfs\ //application/output/customers this configuration should be added logical data source naming strategy rules=regexp match ( )/\\\d /( ) >>replace $1/$2 advanced parameters name definition default value shutdown timeout sec the maximum time in seconds to wait for the collector to shutdown this is the time to finish entities/stats computation and publishing as well deallocate all the used resources 300 offline report dir path the directory path to the place where offline report will be stored kensu api verify ssl turns on/off the certificate validation for the kensu ingestion and services api calls true enable entity compaction turns on/off the compaction of the entities, replacement of descriptive pks to compacted stable hashes true disable column lineage turns on/off the computation of the column lineage false missing column lineage strategy options caseinsensitivenameorall2allmatcherstrat columnlineagefullydisabledstat alltoalllineagestrat caseinsensitivecolumnnamematcherstrat casesensitivecolumnnamematcherstrat outfieldendswithinfieldnamelineagestrat outfieldstartswithinfieldnamelineagestrat column lineage max bytes size if the estimated lineage size is greater than column lineage max bytes size , the lineage is trimmed 20000000 schema max fields number the maximum number of fields/columns to report for each partition/ds schema 300 stats ingestion batch size the maximum number of datasources to report the statistics for per ingestion reporting batch (i e per api request) 50 stats wait for ingestion response if true , waits (blocks) for the response from the ingestion of data stats useful for logging/debugging purposes true stats wait for ingestion response seconds the maximum of time to wait (block) for the ingestion api reposnse when reporting data stats 300 troubleshoot when does the pyspark agent send data to kensu? the pyspark agent sends metadata in batches to kensu each time a write operation of a data source is finished successfully, whatever the format e g spark write save() operations p s we currently do not report anything when such write operation that did not happen due to say error in code or a spark/cluster error, this is so that records in kensu match the reallity the agent does not detect the correct type of the data field in order to compute numerical or categorical statistics, what should i do? the kensu pyspark agent relies on the schema of the data source in order to define the set of automatic metrics it has to retrieve to make sure the data type is well inferred, you need to add the inferschema option while reading some schema less data source types like csv e g df = spark read option("inferschema","true") csv("/path/to/ds") metadata is not sent to kensu, what is the issue? please make sure that the api token and url are correct (we have logs that verify the connection status when kensu agent starts), the url should be written in this format https //community api kensuapp com , without any / at the end which is often added by copy/paste operation please check if the offline mode is not activated if you're not using the kensu community edition, check with your provider that the kensu set up doesn't require any custom certificate if this did not solve your issue, please contact our support support\@kensu io for further analysis logs are not written in the log file, why? whether the logs are written in the log file, depends on the log level of your spark session a lot of issues with the log file can be troubleshot by adding the following line before calling the init kensu spark() function spark sparkcontext setloglevel("info") summary install the python and spark client pip install kensu 2\ create a configuration file conf ini \[kensu] ;kensu ingestion endpoint kensu ingestion url=\<insert the ingestion url here> ;token for kensu ingestion kensu ingestion token=\<insert your ingestion token here> 3\ initialize the kensu agent in the your spark job script from pyspark sql import sparksession \# add the path to the jar to the sparksession spark = sparksession builder appname("example")\\ config("spark driver extraclasspath", "kensu spark collector 1 3 0 spark 3 0 1 jar")\\ getorcreate() \# import the init function from kensu pyspark import init kensu spark \# initialize kensu agent init kensu spark(spark)