Monday, June 26, 2023

Improving near real-time streaming system performance

 

 

This is part 2 rambling in this series related to trade-offs when deciding between solution options.

 

In one of my client assignments where I was a lead architect for their data platform, on an occasion there was a jour-fixe session to discuss together as a team involving client data strategists, data owners, product owners and project managers,  there was a requirement to  handle streaming data which comes in real-time from their trading systems to develop a real-time ingestion, analytics and reporting to help traders, front-office team to get a view of their positions on any given time of the day.

 

(We take a diversion here to set some context)

 

Context setting:

 

Clients had license for the ETL tool Talend, which was hosted in on-premises. Talend cloud was still getting matured. Hence, clients had already taken a decision to use Talend for any data integration, transformation requirements to have one consistent tooling framework and to leverage cost-benefit ratio as they were already heavily invested into it.

 

It’s like since I have bought this expensive medicine for cough, I should only use this medicine for any health ailments. :)

 

Talend on-premises is a great tool it can copy / transform millions of rows of data from any data source in few seconds.

 

Azure data factory was not that mature in those days (year 2018) in terms of performance and features.

 

Talend allows us to go beyond standard built-in transformation options and lets us write custom Java code deploying it as jar files to enable us to do advanced rule-based transformations.

 

On the hindsight, Talend on-premises did not have a great DevOps support.

 

Every time we want to re-create the Talend job in a higher environment from dev, we had to develop manually or manually export job and import it.

 

Also, Talend on-premises was great for batch processing but does not farewell in real-time.

 

For our use-case, there was already a prototype in place for the real-time reporting solution which involved a relatively longer route for data to reach from source to destination. Prototype version's architecture is as follows:

 



 

First version of real-time reporting solution with a relatively longer lead-time for a streaming data to reflect in a report.

 

 

Though this architecture was delivering the purpose, there was about 10 minutes lag in data reaching from the ETRM system to the target tables in aggregated format for reporting purpose.

 

There were too many azure functions and most times, there were issues with messages not being processed in queue.

 

Also, there were issue of raw data coming in with more than few megabytes on cases where the ETRM systems sends burst of many transactions at once causing messages size to increase leading to failure of processing due to limitations of around message size.

 

Lead to next blog:

This particular problem is wide-faced with cloud messaging services, in a later blog we can look at how to resolve this issue using claim-check cloud design pattern (Claim-Check pattern - Azure Architecture Center | Microsoft Learn)

 

(Coming back to the previous lane after diversion)

 

In the Jour fixe session, we were discussing ways to reduce this 10-minute lag time to get to near real-time to few seconds or about a minute.

 

For traders, as you know every clock tick is a million lost or gained. If they get to see their positions (in an aggregated or detailed view) much in advance, it will help them to adjust / plan their future transactions based on past transactions profit / loss. It is much more to that but my domain knowledge in this case is limited, to which i still regret not learning more on this area.

 

I proposed Azure event hub to avoid these multiple hops and to increase streaming and scalability.

 

It was a healthy debate with many client colleagues challenging event hub and some proposed Kafka on-premises.

 

Talend on-premises was already eating up some of the time due to latency between cloud and on-premises, introducing Kafka on-premises would only increase the lag time further.

 

Finally, the meeting ended with an action for me to do a quick POC using event hub. With help of a developer (who is passionate to technology, i must say) in the team, we created a modified implementation which looked like this one below reducing function apps, replacing service bus with event hub.

 



 Improved version with Azure event hub leading to less lag in streaming data reaching the destination store and availability in report.

 

With default scaling level, event hub was trying to catchup with incoming real-time streaming messages. With some capacity planning, we increased partition size and throughput which then helped us to massively increase the messaging arrival and processing speed. Event hub has in-build retry mechanism to resend failed message.

 

Later we migrated from Azure SQL warehouse to Snowflake cloud warehouse adapting to clients Data strategy, this migration effort alone deserves another blog.

 

Takeaway: we discussed choosing between messaging services like service bus, topics, queues over PaaS streaming services like Azure event hub based on the use-case requirements. Again, it is not one-size fits for all. Each solution has its own requirement, and it deserves the services that justifies making it an optimal solution.

 

P:S.

Of course, the devil is in details. There are many intricacies with respect to business logic. Like for example, if a trading transaction is done and it has reached the data warehouse but there was a cancellation transaction done at later point. In this case, both the initial and cancellation transaction must be consolidated and negated in order to maintain consistent and data correctness. There are many such cases which are not discussed here considering blog scope.

 

Thursday, June 8, 2023

 

Part 2 : Applying 12 factor app principle to data engineering

 

2nd principle is handling dependencies The Twelve-Factor App (12factor.net)

 

Explicitly declare and isolate all dependencies

 


Reference : https://developer.ibm.com/developer/default/articles/creating-a-12-factor-application-with-open-liberty/images/images02.png

 This principle states that the source code maintained in source control should contain only code that is relevant and unique to the application.

 External dependencies (like Node JS, python libraries, .NET add-ins etc..) should not reside within source  control but those should be identified, isolated and made sure it is available during runtime when the application  / service is trying to do reference.

 Lets take this principle and fit it in our data engineering /  data integration scenario.

 Handling dependencies in Databricks 

 In your python based solution, create a requirements.txt file including all dependency packages along with their versions:

  • Numpy==1.24.3
  • Pandas==2.0.2
  • Py==3.1.0
  • Py4 ==0.10.9.7
  • Xlrd==1.2.0

to list a few.

These Python libraries are installed in Databricks cluster from DevOps pipeline downloading it from external artefacts repository. Alternatively, we could also have it stored in Azure DevOps -> Artefacts and download from there from pipeline if there is a need to restrict internet access in pipeline or when working with a self-hosted deployment agent.

 Practical Tip:

If we don't specify explicit version of packages, from Azure DevOps pipeline when it downloads from respective store, it gets the latest version. In our project, this lead to unexpected issues with data pipeline. Since these packages are Open source and community driven, it keeps evolving with new versions. High chance a particular feature might not work in a higher version. Hence it is always recommended to specify exact version with which you have done your development and testing. 

Following Microsoft article explains steps involved in creating a DevOps pipeline to package python libraries and to deploy libraries to Databricks cluster with use of Databricks-connect CLI tool to securely connect to cluster from DevOps agent.

https://learn.microsoft.com/en-us/azure/databricks/dev-tools/ci-cd/ci-cd-azure-devops

 In addition to external libraries listed above, our python code needs to be packaged, versioned and deployed to cluster for the data pipeline to work.

Link to blog on writing python script in notebooks vs writing software engineering approach-based Python classes in Databricks https://bharaniblogs.blogspot.com/2023/06/writing-python-script-in-notebooks-vs.html

 Initially, in our project we were hard-coding python code version every time we were doing a build and deploy during development phase. As part of continuous improvement, we introduced dynamic versioning using Versioneer.

 Link to blog on how to create and apply dynamic version to python packages and deploy to Databricks from DevOps

Bharani blogs: Dynamically generate version and apply python package from DevOps 

 


Writing python script in notebooks vs writing software engineering approach-based Python classes in Databricks

 

Software engineering approach to code for Data engineering

Typically data engineers / developers write blocks of code in Databricks or Jupyter notebooks in the form of code snippets /scripts as shown in a screen snippet below.

 

Usage of ad-hoc code in the form of scripting

 This works fine in case of single member team or one-time activity to get data from source to target with or without transactions and enrichment.

If the project under consideration is a major program involving many team members or if there is a repeated ingestion of files and running of pipelines to cleanse, transform, enrich and harmonize data, it   makes more sense to take a software engineering approach towards source code management instead of just writing ad-hoc non-reusable scripts.

 Create python classes and ideally one python class per one python file(.py) with logically related functions:


This brings following benefits:

 

  • Modularity
  • consistent and  standardized solution framework
  • Use Git Hub Repos or Azure DevOps or any such source control supported
  • limiting number of libraries, packages to install on clusters
    • Instead of each developer using their own set of libraries, having common set and reusing would save a lot of time in cluster initialization  (less libraries to install on new executors of cluster, less time it takes to get a cluster up and running)

This brings following benefits:

 

  • Allows code versioning
  • Collaborate better between teams
  • Follow GitOps, DevOps process
  • Unit testing automation thru DevOps, 
  • Code scanning
  • Automated deployment thru DevOps pipelines


 


          

 

Usage of python classes reusing classes


This page explains the style guide, coding conventions to follow as a standard recommendation when writing Python code:


PEP 8 – Style Guide for Python Code | peps.python.org


 

 

 

 

 

 

 

 





Dynamically generate version and apply python package from DevOps

 

Dynamically versioning for packages from DevOps


This is a side-note / microblog to our blog series on Applying DevOps for data engineering topics.

 

In our use-case, considering Python as the programming language used in Databricks.

 

Our source code is written as python class files and committed to repos in Azure DevOps.

 

To build, package and deploy these classes as a library to Databricks cluster, DevOps pipelines is used to automate this task.

 

Every time we make some changes, the build package version needs incrementing.

 

To make this versioning process automatic, versioneer  is one such versioning solution among other options. 

 

Jacob has done wonderful job clearly explaining step-by-step instructions on how to use versioneer in this blog.

 

https://jacobtomlinson.dev/posts/2020/versioning-and-formatting-your-python-code/

 

With versioneer configured, custom packages built from DevOps pipeline gets a dynamic version based on based on "git tag <version>" tag string.



Screenshot showing artifacts from DevOps pipeline 
.whl package built and versioned using Versioneer