Documentation

⌘K
Connectors
Getting started with the Kensu Community Edition
Marketing campaign
Financial data report
Getting credentials
Recipe: Observe Your First Pipeline
Agents: getting started
Python
PySpark
Scala Spark
Databricks Notebook
Agent Listing
Docs powered by archbee 

Configure the PySpark agent

7min

Modify the Conf.ini

Modify ./kensu-spark-example/conf.ini. Add your Ingestion Token where it says ingestion_token. Put the ingestion_url. You can get those tokens from Getting credentials

ini
|
[kensu]
kensu_ingestion_url=https://community-api.kensuapp.com

;token for Kensu Ingestion API
kensu_ingestion_token=

environment=Prod
project_name=Test Spark
process_name=First Pyspark Program

user_name=guest
logical_data_source_naming_strategy=File


Code before modification

This is what the code looks like before we add Kensu Spark. It creates a regular Spark DataFrame by merging 3 csv files and saves it in a Parquet file.

Python
|
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("Example")\
    .getOrCreate()


business_info = spark.read.option("inferSchema","true").option("header","true").csv("data/business-data.csv")
customers_info = spark.read.option("inferSchema","true").option("header","true").csv("data/customers-data.csv")
contact_info = spark.read.option("inferSchema","true").option("header","true").csv("data/contact-data.csv")


df = business_info.join(customers_info,['id']).join(contact_info,['id'])

df.write.mode("overwrite").save("data/data")



👨‍💻 Modify the program to use Kensu-Spark

To include Kensu in your program, follow these steps:

1️⃣ Modify the libraries imports to import Kensu modules

Note that we import the regular Spark SQL libraries. We also import kensu.pyspark.

Python
|
from pyspark.sql import SparkSession
from kensu.pyspark import init_kensu_spark


2️⃣ Init Kensu in your code

Here we create an instance of Kensu. We include the jar file that we passed to spark-submit.

Python
|

#Add the path to the .jar to the SparkSession
spark = SparkSession.builder.appName("Example")\
    .config("spark.driver.extraClassPath", "kensu-spark-collector-1.0.0_spark-3.0.1.jar")\
    .getOrCreate()

#Init Kensu

init_kensu_spark(spark)




3️⃣ Send Metadata to Kensu

As with regular Spark, the DataFrames in the code are Spark DataFrames. The code uses regular Spark functions to read the .csv files and joins them.

The program saves the created DataFrame as a Parquet File with df.write.mode() . This sends the metadata to Kensu, like the DataSources, Schemas, and observability metrics.

Python
|
business_info = spark.read.option("inferSchema","true").option("header","true").csv("materials/data/jan/business-data.csv")
customers_info = spark.read.option("inferSchema","true").option("header","true").csv("materials/data/jan/customers-data.csv")
contact_info = spark.read.option("inferSchema","true").option("header","true").csv("materials/data/jan/contact-data.csv")


df = business_info.join(customers_info,['id']).join(contact_info,['id'])

df.write.mode("overwrite").save("materials/data/jan/data")




The complete code

Here is the complete code after the modifications.

Python
|
from pyspark.sql import SparkSession
from kensu.pyspark import init_kensu_spark


#Add the path to the .jar to the SparkSession
spark = SparkSession.builder.appName("Example")\
    .config("spark.driver.extraClassPath", "kensu-spark-collector-1.0.0_spark-3.0.1.jar")\
    .getOrCreate()

#Init Kensu

init_kensu_spark(spark)


business_info = spark.read.option("inferSchema","true").option("header","true").csv("data/business-data.csv")
customers_info = spark.read.option("inferSchema","true").option("header","true").csv("data/customers-data.csv")
contact_info = spark.read.option("inferSchema","true").option("header","true").csv("data/contact-data.csv")


df = business_info.join(customers_info,['id']).join(contact_info,['id'])

df.write.mode("overwrite").save("data/data")




Updated 21 Feb 2023
Did this page help you?
Yes
No
PREVIOUS
Install the library
NEXT
Running your first PySpark Kensu program
Docs powered by archbee 
TABLE OF CONTENTS
Modify the Conf.ini
Code before modification
👨‍💻 Modify the program to use Kensu-Spark
The complete code