The inspiration for this post comes from the fact that in recent times the need for real time data streaming from SQL Server RDBMS has been an important aspect for many projects. There are multiple ways to do this.
The scenario on hand requires data from an application database hosted in SQLServer to be streamed in real time so that it can be used for scenarios like synchronizing between multiple systems, operational reporting requirements etc.
Apache Kafka is a popular streaming tool which can be used for this type of requirements. The SQLServer data will be streamed using a topic created in Apache Kafka.
For illustrating the above scenario, the below setup would be used
For the ease of setting up the above environment, dockerized containers are used.
Dockerized containers can be installed through docker environment and it will have all the necessary configurations and the environment setup for the software to work.
The official docker image for SQLServer can be found in the below link
https://hub.docker.com/_/microsoft-mssql-server
For the sake of this illustration the SQL 2017 docker image is selected
The steps to create the docker are explained clearly in the above link. This being SQL Server instance setup in Linux server, the connection can be checked using command line tool SQLCMD
One thing to be careful while creating the image using docker run statement is the port to be configured for the SQLServer instance.
The port is set using –p switch and is of the form host port:container port so if its 1433:1433 it means port 1433 of the host machine is mapped to 1433 port of the docker which is the standard SQLServer port. This is fine for the illustration purpose but in actual case it makes sense to use a non-standard port in the host machine to map to standard sqlserver port to avoid any possible injection attacks.
In order to check on the status of the instance, the docker container needs to be first identified using docker ps command
From the above screenshot it can be seen that the container for the above illustration is named as flamboyant_varahamihira.
This is not always constant and has to be checked each time a docker image is setup.
The password for SA account can be check from within docker container using below commands
docker
exec
-it <container_id|container_name> bash
This will enter the bash terminal for docker image
Then use the below
echo $SA_PASSWORD
And the SA password will get displayed
Use it to login to the instance and ensure its up and in running state
The password should be used in the below command to check on the status of the SQLServer instance
/opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P <your_password>
And assuming the connection was successful, the sqlcmd prompt will appear as shown below
Just type any sql command on the prompt and add a GO to get it executed
As an example, get version information like this
The image given in the link also includes an initiation script (entrypoint.sh) which creates a sample database named DemoData and creates a table Products within it.
The DockerFile which builds the docker image will call this shell script at the end of the build to execute commands written inside It.
The contents of the shell script can be checked by using the below command
cat /usr/src/app/entrypoint.sh
The import-data.sh file would contain the script to create the sample database and table and populates the data within it. Based on the requirement more parts can be added to the script.
The import-data.sh includes scripts to create a sample database, a table called Products inside it and populate it with some sample data
The script looks like below
The first command waits for the SqlServer service to come up
The next command uses SQLCMD to login to the instance and execute the setup.sql script which looks like below to create the sample database
Finally, it executes the bcp utility to populate the table Products with some sample data from the included csv file within the docker
Now that instance is up with the database and table setup with some sample data, the next attempt is to try connecting to the instance using a client tool like SQL Management Studio (SSMS)
For this purpose a windows based laptop with SSMS installed was used
Launch SSMS and give IP or name of the server to connect. Ensure TCP/IP is selected as the protocol
The instance will get connected and the object explorer will show the demo database with the list of objects created in the database.
The above screenshot shows the object created along with the data populated using the shell script executed from docker
The next step is to set up the Kafka server which can be used to stream the SQLServer data.
Kafka can be used to stream data in real time from heterogenous sources like MySQL, SQLServer etc. Kafka creates topics based on objects from source to stream the real time data. This data can then be used to populate any destination system or to visualize using any visualization tools.
For the sake of this illustration, the docker container for kafka and kafka connect from below project are used
https://github.com/confluentinc/demo-scene/tree/master/no-more-silos
Once the dockers are up and running, the status can be checked using below script
docker ps
And the below result will get displayed
Kafka is used for creating the topics for live streaming of RDBMS data. Kafka connect provides the required connector extensions to connect to the list of sources from which data needs to be streamed and also destinations to which data needs to be stored
The flow diagram for the entire setup looks like this The Kafka connect server makes use of Debezium based connectors to connect to the source databases in the given illustration. Changes within each SQLServer source table will be published as a topic inside Kafka.
There are two general ways of capturing data changes from RDBMS systems.
JDBC based connector is available for SQLServer to capture query based CDC. For utilizing log based CDC, Debezium has released a set of connectors for various RDBMS including SQLServer. For the sake of illustration, Debezium based connectors are used to avoid direct impact on the transactional tables.
Debezium makes use of the CDC feature for picking up the changes happening at source. For this reason, CDC feature has to be enabled at server instance and table level for Debezium connectors to work.
The below document give detailed information on how CDC can be enabled at instance and table level in SQLServer
https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server?view=sql-server-ver15
CDC feature requires SqlAgent service to be up and running within the SQLServer instance.
In Linux based servers, the below approach can be used to enable SQLAgent service within the SQLServer instance.
https://docs.microsoft.com/en-us/sql/linux/sql-server-linux-setup-sql-agent?view=sql-server-ver15
Now that the setup is done, the next stage is to get data streamed from the SQLServer instance based on the DML changes happening.
Since Debezium relies upon CDC data from SQLServer, the connector has to be setup prior to populating any data to the table. So assuming the instructions in the first part of this article is followed; the table has to be cleared and data needs to be populated again after CDC is enabled and the Debezium connector is being setup so as to allow the capture instance (CT table) to pick up the changes and the connector to stream them through the created Kafka topic.
The steps below outline how the setup that has to be done prior to capturing the data changes
curl -i -X PUT -H
"Accept:application/json"
-H
"Content-Type:application/json"
http://localhost:8083/connectors/sqlserver-product-test/config -d
'{"connector.class":"io.debezium.connector.sqlserver.SqlServerConnector","database.user":"sa","database.dbname":"DemoData","transforms.unwrap.delete.handling.mode":"rewrite","database.history.kafka.bootstrap.servers":"kafka:29092","database.history.kafka.topic":"sshist_Products","transforms":"unwrap,route","database.server.name":"sstest","database.port":"1433","table.whitelist":"dbo.Products","transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)","database.hostname":"<IP Address>","database.password":"<Password>","transforms.unwrap.drop.tombstones":"false","transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope","transforms.route.replacement":"sqlserv_$3"}'
Once created the connector status can be checked using below command
curl -s
"http://localhost:8083/connectors"
| jq
'.[]'
| xargs -I{connector_name} curl -s
"http://localhost:8083/connectors/"
{connector_name}
"/status"
| jq -c -M
'[.name,.connector.state,.tasks[].state]|join(":|:")'
|
column
-s : -t| sed
's/\"//g'
| sort
Result would be displayed as per below screenshot Which indicates that the connector as well as its associated task is running fine
USE [DemoData]
GO
INSERT
[dbo].[Products] ([ID], [ProductName])
VALUES
(1, N
'Car'
)
(2, N
'Truck'
(3, N
'Motorcycle'
(4, N
'Bicycle'
(5, N
'Horse'
(6, N
'Boat'
(7, N
'Plane'
(8, N
'Scooter'
(10, N
'Test Product 25 Jan'
Check in kafka server for the associated topic and ensure it contains the DML changes applied so far
This can be done using below command
-it
no
-more-silos_kafka_1 kafka-console-consumer
--bootstrap-server kafka:29092 --topic sqlserv_Products --from-beginning
The result will be displayed as below This indicates that Kafka topic is created based on the data from the specified table and it keeps transmitting the DML operations happening on the table as a continuous stream by means of the CDC feature enabled in SQLServer.
To illustrate the continuous close to real time streaming of data, some DMLs shall be applied to the source table whilst monitoring the stream.
A data modification DML is applied at first to see how stream captures it
UPDATE
[dbo].[Products]
SET
ProductName =
'Hovercraft'
WHERE
ID = 10
Recheck the topic and notice that the new DML change has been picked up by the stream Similarly data deletion can be done and see how the topic picks up the change
DELETE
FROM
ID = 4 ;
ID = 5 ;
And this time result can be checked from another platform, K-SQL server, which is part of the project. KSQL helps to build streams and tables over Kafka topics and will integrate with Kafka to ingest topics as objects within it.
The topic can be reviewed using PRINT command in K-SQL
'auto.offset.reset'
=
'earliest'
; PRINT
'sqlserv_Products'
BEGINNING;
The first command ensures it always starts displaying from start of the stream
The result would be displayed as below The result in this case would be in JSON format and will show details of columns and their values as Key Value pairs. A boolean key __deleted in the end indicates whether the entry was a deletion entry. For each modification stream includes a new JSON entry by virtue of SQL Server’s CDC feature.
The format used for serializing values would be AVRO format which includes the schema definition as well as the actual values. The first two fields are system generated (ROWTIME and ROWKEY) which identifies each message within the stream corresponding to each of the row affected by the DML changes