import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder().appName("Example").getOrCreate()
val business_info = spark.read.option("inferSchema","true").option("header","true").csv("data/business-data.csv")
val customers_info = spark.read.option("inferSchema","true").option("header","true").csv("data/customers-data.csv")
val contact_info = spark.read.option("inferSchema","true").option("header","true").csv("data/contact-data.csv")
val df = business_info.join(customers_info,business_info("id")===customers_info("id"),"inner").drop(customers_info("id"))
val df_final = df.join(contact_info,df("id")===contact_info("id"),"inner").drop(contact_info("id"))
df_final.write.mode("overwrite").save("data/data")
👨💻 Modify Program to use Kensu-Spark
1. Init Kensu in your code
First, we create a Spark session, adding the Kensu JAR file.
Scala
|
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder().appName("Example").config("spark.driver.extraClassPath","kensu-spark-collector-1.0.0_spark-3.0.1.jar").getOrCreate()
2. Create a Connection to Kensu API
SparkSessionDAMWrapper creates a connection between the Spark job and Kensu in order to send the events through the API.
Scala
|
implicit val ch =newio.kensu.sparkcollector.sdk.ConnectHelper("conf.ini")import io.kensu.sparkcollector.KensuSparkCollector.KensuSparkSession
KensuSparkSession(spark).track(ch.properties.get("kensu_ingestion_url").map(_.toString), None)(ch.properties.toList:_*)
3. Send data observations to Kensu
Then we write the data to the Kensu cloud using the join()function. Then we save the data model in a Parquet file and sends metadata, profiling and lineage to Kensu:
Scala
|
val business_info = spark.read.option("inferSchema","true").option("header","true").csv("data/business-data.csv")
val customers_info = spark.read.option("inferSchema","true").option("header","true").csv("data/customers-data.csv")
val contact_info = spark.read.option("inferSchema","true").option("header","true").csv("data/contact-data.csv")
val df = business_info.join(customers_info,business_info("id")===customers_info("id"),"inner").drop(customers_info("id"))
val df_final = df.join(contact_info,df("id")===contact_info("id"),"inner").drop(contact_info("id"))
df_final.write.mode("overwrite").save("data/data")
Complete Code
Here is the Complete code
Scala
|
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder().appName("Example").config("spark.driver.extraClassPath","kensu-spark-collector-1.0.0_spark-3.0.1.jar").getOrCreate()
implicit val ch =newio.kensu.sparkcollector.sdk.ConnectHelper("conf.ini")import io.kensu.sparkcollector.KensuSparkCollector.KensuSparkSession
KensuSparkSession(spark).track(ch.properties.get("kensu_ingestion_url").map(_.toString), None)(ch.properties.toList:_*)
val business_info = spark.read.option("inferSchema","true").option("header","true").csv("data/business-data.csv")
val customers_info = spark.read.option("inferSchema","true").option("header","true").csv("data/customers-data.csv")
val contact_info = spark.read.option("inferSchema","true").option("header","true").csv("data/contact-data.csv")
val df = business_info.join(customers_info,business_info("id")===customers_info("id"),"inner").drop(customers_info("id"))
val df_final = df.join(contact_info,df("id")===contact_info("id"),"inner").drop(contact_info("id"))
df_final.write.mode("overwrite").save("data/data")