top of page

Data Streaming by using Azure Databricks and Confluent - kafka

How do you process IoT data, change data capture (CDC) data, or streaming data from sensors, applications, and sources in real time? Apache Kafka® and Apache Spark® are widely adopted technologies in the industry, but they require specific skills and expertise to run. Leveraging Confluent Cloud and Azure Databricks as fully managed services in Microsoft Azure, you can implement new real-time data pipelines with less effort and without the need to upgrade your datacenter (or set up a new one).


Below is a common architectural pattern used for streaming data:



Prerequisites:

Configure the CDC in Azure SQL Server


Step 1: Enable Database for CDC template

Open a query editor and select the correct database to run the below command to enable the CDC.

EXEC sys.sp_cdc_enable_db  
GO
Step 2: Create a table or choose an existing table

Create a new table called Users as an example for the demo.

CREATE TABLE Users
(
    ID int NOT NULL PRIMARY KEY,
    FirstName varchar(30),
    LastName varchar(30),
    Email varchar(50)
)
Step 3: Enable Table for CDC

Next, we need to enable the CDC for specific table. According to the demo, the CDC is needed to enable for Users table.

EXEC sys.sp_cdc_enable_table  
@source_schema = N'dbo',  
@source_name   = N'Users',  
@role_name     = N'admin',  
@supports_net_changes = 1  
GO 
Step 4: Insert the data into the table

Insert the data into the table to test the CDC.

INSERT INTO Users Values (1, 'Jorge', 'Ramos', 'ramos@yahoo.com')
INSERT INTO Users Values (2, 'Andrew', 'Strudwick', 'andrew@yahoo.com')
INSERT INTO USERS Values (3, 'Thomas', 'Tuchel', 'tuchel@gmail.com')


Prepare the Confluent Cloud environment


Step 1: Create a Kafka cluster

Sign in to the Azure portal and search for Confluent Cloud.




If you already have a Confluent organisation set up in Azure, you can use it, otherwise select Apache Kafka® on Confluent Cloud™ under the “Marketplace” section.




Choose your desired Subscription and Resource Group to host the Confluent organisation, complete the mandatory fields, and then click Review + create. On the review page, click Create.

Wait for the deployment to complete and then click Go to resource. On the overview page, click the Confluent SSO link on the right.



Once you are redirected to Confluent Cloud, click the Create cluster button. Select the cluster type Basic and click Begin Configuration.



Select an Azure region, then click Continue.



Specify a cluster name and click Launch cluster.



Create API keys for your Kafka cluster

Next, click the API access link on the left menu and click Create key.



Select Create an API key associated with your account and then select Next.



Copy the key and secret to a local file, and check I have saved my API key and secret and am ready to continue. You will need the key and secret later for the Datagen Source connector as well as in the Azure Databricks code.



Create a topic

Return to your cluster. Click the Topics link in the left menu and click Create topic.



Type “Clickstreams” as the topic name and select Create with defaults.



Step 2: Enable Schema Registry

To enable Schema Registry, go back to your environment page. Click the Schemas tab and choose an Azure region. Select Azure, choose a region, and click Enable Schema Registry.



Create Schema Registry API access key

Click on the Settings tab. Open the Schema Registry API access section, and click on the Create key button (or Add Key button if you already have some keys created).



Copy the key and secret, and make sure to write them down. Add a description for the key. Check the I have saved my API keys checkbox. Click Continue.



Step 3: Configure the Confluent Cloud Microsoft SQL Server CDC Source Connector

Click on the Connectors link on the left menu. Select the Microsoft SQL Server CDC Source Connector (you can also use the search box). Fill in the setup connections.



You can now inspect the messages flowing in. Click the Topics link in the left menu. Select the clickstreams topic. Click on the Messages tab.


Process the Data with Azure Databricks

Step 1: Prepare the Azure Databricks environment


If you do not already have an Azure Databricks environment, you will need to spin one up:

  1. Log in to the Azure Portal and search for “Azure Databricks”

  2. When the page comes up, click the +Add button

  3. Choose the subscription that you want the instance to be in (if it isn’t already filled in)

  4. Choose the Resource Group that you want the instance to be in, or click Create new and give it a name

  5. Enter a name for the Azure Databricks workspace

  6. Select the region that you want the workspace to be created in

  7. Select your pricing tier—you can select Trial to get 14 days of free Azure Databricks units

  8. On the networking page, you can choose whether or not to deploy in your own VNet (deploying in your own VNet is recommended for instances that you intend to use long term; however, if you are spinning it up just to follow this blog example. Keep the default “No” selected and Azure Databricks will spin up a VNet)

  9. Enter in any required tags

  10. Click Review + create; if everything comes back as valid, then click the Create button to spin up the workspace

When the Azure Databricks instance finishes deploying, you can navigate to it in the Azure Portal and click Launch Workspace. Alternatively, if you already have the URL for an Azure Databricks workspace, you can go to the URL directly in your browser.

Spin up and configure an Azure Databricks cluster

Once you’re logged in to the Azure Databricks workspace, you will need a running cluster. If you do not already have a cluster that you would like to use for this example, you can spin one up by following these steps:

  • Click the Clusters icon in the left sidebar

  • Click +Create Cluster

  • Give the cluster a name

  • Keep all the rest of the defaults and click Create Cluster


Create a new notebook

Once your cluster is spun up, you can create a new notebook and attach it to your cluster. There are multiple ways to do this:

  • Click on the Azure Databricks symbol in the left menu to get to the initial screen in the Azure Databricks workspace. The “New Notebook” link is listed under “Common Tasks.”

  • Or, you can click on the Home icon, which will take you to your home directory. Next, right-click under your account name and select Create > Notebook.

Once you’ve clicked on either Create Notebook or Create > Notebook, the following screen appears:

Give your notebook a name, pick your default language (select Python to follow the example below), and then select the cluster that you just spun up. From there, click Create.


Step 2: Gather keys, secrets, and paths

Using the following information, connect to the topic that you created in Confluent Cloud from Azure Databricks:

  • The Kafka API key for the cluster that you generated earlier

  • The Kafka API secret for the cluster that you generated earlier

  • The name of the Kafka topic

Step 3: Setup the Spark ReadStream

Now the Spark ReadStream from Kafka needs to be set up and the data manipulated. Both of these operations are combined into one statement.

import pyspark.sql.functions as fn
from pyspark.sql.types import StringType


clickstreamTestDf = (
  spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", host)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret))
  .option("kafka.ssl.endpoint.identification.algorithm", "https")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("subscribe", 'shamenserver.database.windows.net.dbo.Users')
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")
  .load()
  .withColumn('key', fn.col("key").cast(StringType()))
  .withColumn('value', fn.col("value").cast(StringType()))
  .select('topic', 'partition', 'offset', 'timestamp', 'timestampType', 'key', 'value') #,'valueSchemaId','fixedValue'
)
Step 4: Setup the Spark WriteStream

After reading the Kafka messages, we need to write streaming data.

query = (
  clickstreamTestDf
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only)
    .queryName("raw")        # raw = name of the in-memory table
    .outputMode("append")    # append = add new events#
    .trigger(processingTime='10 seconds')
    .start()
)

Display the streaming data

Conclusion

This blog post has guided you through first steps in using Databricks and Confluent Cloud together on Azure. Now you are ready to build your own data pipelines and get the value out of your data leveraging whatever service best suits the specific task at hand. With Confluent Cloud, Databricks, and all the Azure services at your disposal, the possibilities are wide open.


Comments


Subscribe Here.

Thanks for subscribing!

+94766088392

Colombo, Sri Lanka.

  • LinkedIn
  • Facebook
  • Instagram
bottom of page