Documentation

⌘K
Connectors
Getting started with the Kensu Community Edition
Marketing campaign
Financial data report
Getting credentials
Recipe: Observe Your First Pipeline
Agents: getting started
Python
PySpark
Scala Spark
Databricks Notebook
Agent Listing
Docs powered by archbee 

Configure the Agent

8min

💻 Include Kensu in your program

Modify conf.ini File

Enter the kensu_ingestion_token in the conf.ini file:

ini
|
[kensu]
kensu_ingestion_url=https://community-api.kensuapp.com
kensu_ingestion_token=
 
environment=workshop
project_name=dodd

kensu_api_verify_ssl=False
logical_datasource_name_strategy=File


🕵️‍♀️Explore the program

Here is the code before we add Kensu:

Scala
|
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Example")
  .getOrCreate()

val business_info = spark.read.option("inferSchema","true").option("header","true").csv("data/business-data.csv")
val customers_info = spark.read.option("inferSchema","true").option("header","true").csv("data/customers-data.csv")
val contact_info = spark.read.option("inferSchema","true").option("header","true").csv("data/contact-data.csv")

val df = business_info.join(customers_info,business_info("id") === customers_info("id"), "inner").drop(customers_info("id"))
val df_final = df.join(contact_info,df("id") === contact_info("id"), "inner").drop(contact_info("id"))

df_final.write.mode("overwrite").save("data/data")


👨‍💻 Modify Program to use Kensu-Spark

1. Init Kensu in your code

First, we create a Spark session, adding the Kensu JAR file.

Scala
|
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Example")
  .config("spark.driver.extraClassPath", "kensu-spark-collector-1.0.0_spark-3.0.1.jar")
  .getOrCreate()


2. Create a Connection to Kensu API

SparkSessionDAMWrapper creates a connection between the Spark job and Kensu in order to send the events through the API.

Scala
|
implicit val ch = new io.kensu.sparkcollector.sdk.ConnectHelper("conf.ini")
import io.kensu.sparkcollector.KensuSparkCollector.KensuSparkSession
KensuSparkSession(spark).track(ch.properties.get("kensu_ingestion_url").map(_.toString), None)(ch.properties.toList:_*)


3. Send data observations to Kensu

Then we write the data to the Kensu cloud using the join()function. Then we save the data model in a Parquet file and sends metadata, profiling and lineage to Kensu:

Scala
|
val business_info = spark.read.option("inferSchema","true").option("header","true").csv("data/business-data.csv")
val customers_info = spark.read.option("inferSchema","true").option("header","true").csv("data/customers-data.csv")
val contact_info = spark.read.option("inferSchema","true").option("header","true").csv("data/contact-data.csv")

val df = business_info.join(customers_info,business_info("id") === customers_info("id"), "inner").drop(customers_info("id"))
val df_final = df.join(contact_info,df("id") === contact_info("id"), "inner").drop(contact_info("id"))

df_final.write.mode("overwrite").save("data/data")


Complete Code

Here is the Complete code

Scala
|
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Example")
  .config("spark.driver.extraClassPath", "kensu-spark-collector-1.0.0_spark-3.0.1.jar")
  .getOrCreate()


implicit val ch = new io.kensu.sparkcollector.sdk.ConnectHelper("conf.ini")
import io.kensu.sparkcollector.KensuSparkCollector.KensuSparkSession
KensuSparkSession(spark).track(ch.properties.get("kensu_ingestion_url").map(_.toString), None)(ch.properties.toList:_*)

val business_info = spark.read.option("inferSchema","true").option("header","true").csv("data/business-data.csv")
val customers_info = spark.read.option("inferSchema","true").option("header","true").csv("data/customers-data.csv")
val contact_info = spark.read.option("inferSchema","true").option("header","true").csv("data/contact-data.csv")

val df = business_info.join(customers_info,business_info("id") === customers_info("id"), "inner").drop(customers_info("id"))
val df_final = df.join(contact_info,df("id") === contact_info("id"), "inner").drop(contact_info("id"))

df_final.write.mode("overwrite").save("data/data")




Updated 15 Feb 2023
Did this page help you?
Yes
No
PREVIOUS
Install the Library
NEXT
Running your First Scala Kensu Spark Program
Docs powered by archbee 
TABLE OF CONTENTS
💻 Include Kensu in your program
Modify conf.ini File
🕵️‍♀️Explore the program
👨‍💻 Modify Program to use Kensu-Spark
Complete Code