Integration
Agents
PySpark
30 min
pyspark agent an overview the pyspark integration relies on two components the spark jar, that ensures the spark jobs are tracked and events are sent to kensu the jar has to be added to the sparksession the python kensu py library, which includes a module to configure the parameters of the spark jar and some other helpers like circuit breakers prerequisites and installation install kensu python lib the python library is a prerequisite to include the pyspark agent please make sure to have installed the python kensu py library to install the library, please execute this command in your python environment pip install kensu>=2 6 9 download spark agent jar download a jar to match your spark version kensu spark collector 1 3 0 spark 3 4 0 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 4 0 jar kensu spark collector 1 3 0 spark 3 3 1 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 3 1 jar kensu spark collector 1 3 0 spark 3 3 2 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 3 2 jar kensu spark collector 1 3 0 spark 3 3 0 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 3 0 jar kensu spark collector 1 3 0 spark 3 2 1 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 2 1 jar kensu spark collector 1 3 0 spark 3 1 1 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 1 1 jar kensu spark collector 1 3 0 spark 3 1 2 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 1 2 jar kensu spark collector 1 3 0 spark 3 0 1 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 3 0 1 jar kensu spark collector 1 3 0 spark 2 4 0 jar https //public usnek com/n/repository/kensu public/releases/kensu spark collector/stable/kensu spark collector 1 3 0 spark 2 4 0 jar ideally it should be a full verison match, but often a major minor match is good enough too (i e last version number may differ) configuration before creating the configuration file, make sure you have retrieved your kensu ingestion url and token to include the pyspark agent inside, you need to create a configuration file conf ini add the jar to the spark session to do so, include the path of the jar in the configuration of the spark session by adding config("spark driver extraclasspath", "\<path to jar>") in the definition of the session import the kensu pyspark module and initialize the tracker with init kensu spark(spark) here is a summary of the code modification you need to perform to include pyspark agent from pyspark sql import sparksession \# add the path to the jar to the sparksession spark = sparksession builder appname("example") \\ config("spark driver extraclasspath", "\<kensu spark agent> jar")\\ getorcreate() \# import the init function from kensu pyspark import init kensu spark \# init kensu init kensu spark( spark session=spark, project name="my first kensu project", shutdown timeout sec=10 60, process name="my first kensu application", \# add here further parameters ) \# application code the spark agent can also be included during the spark submit command execution cmd spark submit jars \<kensu spark agent> jar main py here the conf ini template conf ini \[kensu] ;kensu api endpoint kensu ingestion url=\<insert the ingestion url here> ;token for kensu ingestion api kensu ingestion token=\<insert your ingestion token here> compute stats=false compute input stats=false compute output stats=false you will find below this page the list of custom parameters you can add to this configuration file if the relative path of the conf ini file is different, please register an environmental variable called ksu conf file with the value being the relative or absolute path of the configuration file all the properties mentioned have their environment variable counterpart with ksu prefix, e g property project name has ksu project name environment variable furthermore the properties can be specified also during the kensu initialization the order of precedence for the properties is the following environment variable the argument passed to init kensu spark method configuration file default value parameters the basic parameters of the configuration file are the following 187,202,200,202,100,100,100,100 false true false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type metrics configuration metrics computation in kensu pyspark agent can be controlled by using these parameters in the conf ini file as described below metrics computation can also be fine tuned for each logical datasource usage & application in the kensu web app fine tune statistics computation from kensu app (remote configuration) in kensu web app, by going to logical datasource summary page, you may choose which statistics for which columns to compute or disable them for each individual datasource access in a given application remote conf takes priority over conf ini by default, the remote configuration is enabled, and if configured on the kensu app, it would override the behaviour specified in conf ini but only for that specific application & datasource how to disable remote conf? if for any reason, you want to use only the conf ini and disable usage of the remote configuration (e g for testing new global settings), set remote conf enabled=false parameter how to debug, if needed? actual metrics configuration used for each datasource is printed in kensu logs during metrics computation, indicating which configuration source was used for each datasource metrics config parameters 281,338,100,100 false true false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type circuit breaker enabling circuit breaker & its semantics add the following code, at your pyspark at places where you want the circuit breaking to happen from kensu utils remote circuit breakers import kill app on circuit breaker kill app on circuit breaker(spark=spark) if not already done, define some rules in kensu app, and mark some of them as circuit breaker for the given spark app the circuit breaker will kill the spark application if any of the datasources that were already accessed (i e read or written) by spark has any open tickets due to broken circuit breaker rules in kensu at logical datasource level for any of the previous runs (only if the associated circuit breaker rule is activated for the current spark app) if you think that the issue with previous circuit breaker was resolved (let's say by new code or the data was fixed), you still have to mark the ticket in kensu as resolved, otherwise the old ticket will still trigger the circuit breaker because circuit breaker checks are global (over metrics of all previous runs, not just the latest run) customizing logic for handling failures if you have more complex use case, you may also perform your own handling by just checking the circuit breaker status don't forget to properly shutdown spark & kensu spark agent if you need to quit the application \# if you still want to perform some custom python logic in case \# the circuit breaker failed, you may call kensu circuit breaker \# with sys exit=false param from kensu utils remote circuit breakers import kill app on circuit breaker try kill app on circuit breaker( spark=spark, stop spark=false, # or true, if will want to quit the spark app immediately sys exit=false, \# ) except assertionerror \# handle the circuit breaker pass configuration 248,295,100,100 false false false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type data sources naming convention kensu tracks every data source based on its location (i e full path of a file dbfs\ /filestore/financial data report/2021/dec/apptech csv ), and there are different option to choose how to name the data source in the following example only the last folder and filename is used in addition, kensu introduces also the concept of "logical" data sources, this can be useful to group data sources (that are actually partitions) to be part of the same logical data source as an example of logical data source usage, consider to have these physical paths dbfs\ //application/output/2022/customers dbfs\ //application/output/2023/customers in this case these 2 data sources can be grouped into dbfs\ //application/output/customers , this can be achieved with the available logical data source naming strategy options 241,384,116,84,128 false true false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type example of naming strategy options assuming the original filepath is dbfs\ /filestore/financial data report/2021/dec/apptech csv , these are the results of the different options 339,404,115,100,100 false true false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type false unhandled content type the data source naming strategy rules allows to set multiple rules (separated by ; ) the rules could be path specific (i e dbfs >>lastfolderandfile ) or using regular expression in the following format regexp match \<regular expression> >>replace \<replace expression> consider to have in one application the following data sources spark catalog default financial data monthly assets demo1 dbfs\ /filestore/financial data report/2021/dec/apptech csv applying the following rules data source naming strategy rules=regexp match\ spark catalog default ( ) >>replace\ database $1;dbfs >>lasttwofoldersandfile this is the result database\ financial data monthly assets demo1 2021/dec/apptech csv because the first rule is a regular expression with a substitution and the second one is a default path search with a standard strategy option the first expression that matches is taken into account if no expression matches, the default strategy is file similarly for logical data source naming strategy rules if we have two data sources dbfs\ //application/output/2022/customers dbfs\ //application/output/2023/customers to obtain that these 2 data sources are grouped under dbfs\ //application/output/customers this configuration should be added logical data source naming strategy rules=regexp match ( )/\\\d /( ) >>replace $1/$2 advanced parameters 274,474,115,100 false false unhandled content type unhandled content type false unhandled content type unhandled content type unhandled content type false unhandled content type unhandled content type unhandled content type false unhandled content type unhandled content type unhandled content type false unhandled content type unhandled content type unhandled content type false unhandled content type unhandled content type unhandled content type false unhandled content type unhandled content type unhandled content type false unhandled content type unhandled content type unhandled content type false unhandled content type unhandled content type unhandled content type false unhandled content type unhandled content type unhandled content type false unhandled content type unhandled content type unhandled content type false unhandled content type unhandled content type unhandled content type false unhandled content type troubleshoot when does the pyspark agent send data to kensu? the pyspark agent sends metadata in batches to kensu each time a write operation of a data source is finished successfully, whatever the format e g spark write save() operations p s we currently do not report anything when such write operation that did not happen due to say error in code or a spark/cluster error, this is so that records in kensu match the reallity the agent does not detect the correct type of the data field in order to compute numerical or categorical statistics, what should i do? the kensu pyspark agent relies on the schema of the data source in order to define the set of automatic metrics it has to retrieve to make sure the data type is well inferred, you need to add the inferschema option while reading some schema less data source types like csv e g df = spark read option("inferschema","true") csv("/path/to/ds") metadata is not sent to kensu, what is the issue? please make sure that the api token and url are correct (we have logs that verify the connection status when kensu agent starts), the url should be written in this format https //community api kensuapp com , without any / at the end which is often added by copy/paste operation please check if the offline mode is not activated if you're not using the kensu community edition, check with your provider that the kensu set up doesn't require any custom certificate if this did not solve your issue, please contact our support support\@kensu io for further analysis logs are not written in the log file, why? whether the logs are written in the log file, depends on the log level of your spark session a lot of issues with the log file can be troubleshot by adding the following line before calling the init kensu spark() function spark sparkcontext setloglevel("info") summary install the python and spark client pip install kensu 2\ create a configuration file conf ini \[kensu] ;kensu ingestion endpoint kensu ingestion url=\<insert the ingestion url here> ;token for kensu ingestion kensu ingestion token=\<insert your ingestion token here> 3\ initialize the kensu agent in the your spark job script from pyspark sql import sparksession \# add the path to the jar to the sparksession spark = sparksession builder appname("example")\\ config("spark driver extraclasspath", "kensu spark collector 1 3 0 spark 3 0 1 jar")\\ getorcreate() \# import the init function from kensu pyspark import init kensu spark \# initialize kensu agent init kensu spark(spark)