StreamSets Data Collector (SDC) supports 69 sources, including relational and no-SQL databases, on-prem and cloud file systems and a handful of messaging applications (documentation). Yet, occasionally, customers ask if Data Collector can integrate with an app or system not explicitly called out in documentation. If there is a Java library providing an API, the answer is YES; StreamSets can use it for integration.
I recently described this option for Delta Standalone. Before that, a customer wanted to source data from Apache Qpid. Apache Qpid is an AMQP broker. In theory, it might have been available with StreamSets out of the box JMS Consumer origin – however, the implementation of the latter is rather old and not compatible with Qpid’s JMS client library.
This blog will walk you through how that JMS Client library can be used by SDC to consume messages from Qpid queues or send messages to Qpid.
Qpid Java JMS Client
Until very recently, SDC supported only Java 8. This is why our work starts with the latest pre-1.0.0 version of JMS Client, as JMS Client versions from 1.0.0 and onward would require Java11. When using SDC 5.3.0 and later releases, the same process can be done just using the latest JMS Client version.
From MVN Repository, I collected qpid-jms-client-0.61.0.jar and related files using the link above and produced a list that you can feed to your favorite download manager.
SDC Configuration
Libraries Installation
In SDC package manager, install the Groovy package.
Upload Qpid VMS client library JAR files onto the SDC engine as external resources. As a brief summary of the above external resources link, here are actions to perform on SDC:
- In $SDC_HOME/externalResources/streamsets-libs-extras directory, create a new directory streamsets-datacollector-groovy_2_4-lib, and inside that new directory, create another new directory lib.
- Copy the required JAR files into streamsets-datacollector-groovy_2_4-lib/lib/ directory.
- Restart SDC.
Another good option is to package external resources into an archive file. That will save you time and effort when upgrading SDCs or installing their multiple instances.
Truststore Configuration
Create Java Key Store (JKS) files for SSL Keystore and SSL Truststore in the SDC filesystem or use some existing ones.
Import the server certificate into Truststore JKS and the client certificate into Keystore JKS. Here’s an example for importing (assumes the certificate is in DER format; you need to know the JKS’ password):
keytool -import -alias your-alias -keystore your-keystore.jks -file certificate.der
The keystore files should be placed in the filesystem location available for SDC.
A quick and relatively easy option is to reuse the $SDC_HOME/etc/keystore.jks file, but be mindful of security implications.
You can find more details on Qpid client runtime configuration in the documentation on the Apache website.
Security Permissions
On your SDC machine, open the file $SDC_HOME/etc/sdc-security.policy in a text editor and add the lines from this list (text in <angle brackets> requires changing).
Pipeline Development
Below you will find instructions to develop the Qpid pipelines. Please note, the code samples below don’t take precautions for securing/hiding access credentials like passwords. While the coding evaluator doesn’t allow the use of credential functions, which secure credentials properly, runtime property functions can obfuscate access details and place them outside of pipelines.
Producer
For Qpid producer, use Groovy evaluator processor. Most likely, it will be the last stage of a pipeline, and no data processing will occur after it, so put a Trash destination after it in the pipeline layout like in the screenshot below:
In the Groovy evaluator configuration, replace the default code with the one provided in my repository. You will need to edit the texts in <angle brackets> according to your Qpid server config (and don’t forget to remove the angle brackets themselves).
The code assumes that data to be sent to the AMQP destination is stored in the incoming records in the field amqpOut (you can change that field’s name in the code). You may want to use ${runtime.conf} EL-function to hide sensitive data, but unfortunately, no credential storage functions can be used here.
Consumer
The consumer pipeline should start from Groovy Scripting origin:
The code at the repository link below places the data received from the AMQP destination in the outgoing records in the field amqpIn (you can change the field’s name in the code).
Incoming messages are read as text only; it’s suggested to do subsequent parsing in the downstream stages.
In the Groovy evaluator configuration, replace the default code with the one provided in my repository. Again, edit the texts in <angle brackets> according to your Qpid server config.
Conclusion
One might ask, ‘Why not just write a Java program that would receive Qpid messages?’. The answer is that StreamSets is not merely a data extraction tool but a complete end-to-end data integration platform with the ability to route data to various destinations, both on-premise and cloud, and to orchestrate and monitor pipelines. When your platform can cover most data integration needs, the one remaining use case can be solved with a Groovy evaluator using the available code stub.
The post Extend StreamSets Integration With Source Systems Using Groovy appeared first on StreamSets.