Monday, June 26, 2023

Improving near real-time streaming system performance

 

 

This is part 2 rambling in this series related to trade-offs when deciding between solution options.

 

In one of my client assignments where I was a lead architect for their data platform, on an occasion there was a jour-fixe session to discuss together as a team involving client data strategists, data owners, product owners and project managers,  there was a requirement to  handle streaming data which comes in real-time from their trading systems to develop a real-time ingestion, analytics and reporting to help traders, front-office team to get a view of their positions on any given time of the day.

 

(We take a diversion here to set some context)

 

Context setting:

 

Clients had license for the ETL tool Talend, which was hosted in on-premises. Talend cloud was still getting matured. Hence, clients had already taken a decision to use Talend for any data integration, transformation requirements to have one consistent tooling framework and to leverage cost-benefit ratio as they were already heavily invested into it.

 

It’s like since I have bought this expensive medicine for cough, I should only use this medicine for any health ailments. :)

 

Talend on-premises is a great tool it can copy / transform millions of rows of data from any data source in few seconds.

 

Azure data factory was not that mature in those days (year 2018) in terms of performance and features.

 

Talend allows us to go beyond standard built-in transformation options and lets us write custom Java code deploying it as jar files to enable us to do advanced rule-based transformations.

 

On the hindsight, Talend on-premises did not have a great DevOps support.

 

Every time we want to re-create the Talend job in a higher environment from dev, we had to develop manually or manually export job and import it.

 

Also, Talend on-premises was great for batch processing but does not farewell in real-time.

 

For our use-case, there was already a prototype in place for the real-time reporting solution which involved a relatively longer route for data to reach from source to destination. Prototype version's architecture is as follows:

 



 

First version of real-time reporting solution with a relatively longer lead-time for a streaming data to reflect in a report.

 

 

Though this architecture was delivering the purpose, there was about 10 minutes lag in data reaching from the ETRM system to the target tables in aggregated format for reporting purpose.

 

There were too many azure functions and most times, there were issues with messages not being processed in queue.

 

Also, there were issue of raw data coming in with more than few megabytes on cases where the ETRM systems sends burst of many transactions at once causing messages size to increase leading to failure of processing due to limitations of around message size.

 

Lead to next blog:

This particular problem is wide-faced with cloud messaging services, in a later blog we can look at how to resolve this issue using claim-check cloud design pattern (Claim-Check pattern - Azure Architecture Center | Microsoft Learn)

 

(Coming back to the previous lane after diversion)

 

In the Jour fixe session, we were discussing ways to reduce this 10-minute lag time to get to near real-time to few seconds or about a minute.

 

For traders, as you know every clock tick is a million lost or gained. If they get to see their positions (in an aggregated or detailed view) much in advance, it will help them to adjust / plan their future transactions based on past transactions profit / loss. It is much more to that but my domain knowledge in this case is limited, to which i still regret not learning more on this area.

 

I proposed Azure event hub to avoid these multiple hops and to increase streaming and scalability.

 

It was a healthy debate with many client colleagues challenging event hub and some proposed Kafka on-premises.

 

Talend on-premises was already eating up some of the time due to latency between cloud and on-premises, introducing Kafka on-premises would only increase the lag time further.

 

Finally, the meeting ended with an action for me to do a quick POC using event hub. With help of a developer (who is passionate to technology, i must say) in the team, we created a modified implementation which looked like this one below reducing function apps, replacing service bus with event hub.

 



 Improved version with Azure event hub leading to less lag in streaming data reaching the destination store and availability in report.

 

With default scaling level, event hub was trying to catchup with incoming real-time streaming messages. With some capacity planning, we increased partition size and throughput which then helped us to massively increase the messaging arrival and processing speed. Event hub has in-build retry mechanism to resend failed message.

 

Later we migrated from Azure SQL warehouse to Snowflake cloud warehouse adapting to clients Data strategy, this migration effort alone deserves another blog.

 

Takeaway: we discussed choosing between messaging services like service bus, topics, queues over PaaS streaming services like Azure event hub based on the use-case requirements. Again, it is not one-size fits for all. Each solution has its own requirement, and it deserves the services that justifies making it an optimal solution.

 

P:S.

Of course, the devil is in details. There are many intricacies with respect to business logic. Like for example, if a trading transaction is done and it has reached the data warehouse but there was a cancellation transaction done at later point. In this case, both the initial and cancellation transaction must be consolidated and negated in order to maintain consistent and data correctness. There are many such cases which are not discussed here considering blog scope.

 

Thursday, June 8, 2023

 

Part 2 : Applying 12 factor app principle to data engineering

 

2nd principle is handling dependencies The Twelve-Factor App (12factor.net)

 

Explicitly declare and isolate all dependencies

 


Reference : https://developer.ibm.com/developer/default/articles/creating-a-12-factor-application-with-open-liberty/images/images02.png

 This principle states that the source code maintained in source control should contain only code that is relevant and unique to the application.

 External dependencies (like Node JS, python libraries, .NET add-ins etc..) should not reside within source  control but those should be identified, isolated and made sure it is available during runtime when the application  / service is trying to do reference.

 Lets take this principle and fit it in our data engineering /  data integration scenario.

 Handling dependencies in Databricks 

 In your python based solution, create a requirements.txt file including all dependency packages along with their versions:

  • Numpy==1.24.3
  • Pandas==2.0.2
  • Py==3.1.0
  • Py4 ==0.10.9.7
  • Xlrd==1.2.0

to list a few.

These Python libraries are installed in Databricks cluster from DevOps pipeline downloading it from external artefacts repository. Alternatively, we could also have it stored in Azure DevOps -> Artefacts and download from there from pipeline if there is a need to restrict internet access in pipeline or when working with a self-hosted deployment agent.

 Practical Tip:

If we don't specify explicit version of packages, from Azure DevOps pipeline when it downloads from respective store, it gets the latest version. In our project, this lead to unexpected issues with data pipeline. Since these packages are Open source and community driven, it keeps evolving with new versions. High chance a particular feature might not work in a higher version. Hence it is always recommended to specify exact version with which you have done your development and testing. 

Following Microsoft article explains steps involved in creating a DevOps pipeline to package python libraries and to deploy libraries to Databricks cluster with use of Databricks-connect CLI tool to securely connect to cluster from DevOps agent.

https://learn.microsoft.com/en-us/azure/databricks/dev-tools/ci-cd/ci-cd-azure-devops

 In addition to external libraries listed above, our python code needs to be packaged, versioned and deployed to cluster for the data pipeline to work.

Link to blog on writing python script in notebooks vs writing software engineering approach-based Python classes in Databricks https://bharaniblogs.blogspot.com/2023/06/writing-python-script-in-notebooks-vs.html

 Initially, in our project we were hard-coding python code version every time we were doing a build and deploy during development phase. As part of continuous improvement, we introduced dynamic versioning using Versioneer.

 Link to blog on how to create and apply dynamic version to python packages and deploy to Databricks from DevOps

Bharani blogs: Dynamically generate version and apply python package from DevOps 

 


Writing python script in notebooks vs writing software engineering approach-based Python classes in Databricks

 

Software engineering approach to code for Data engineering

Typically data engineers / developers write blocks of code in Databricks or Jupyter notebooks in the form of code snippets /scripts as shown in a screen snippet below.

 

Usage of ad-hoc code in the form of scripting

 This works fine in case of single member team or one-time activity to get data from source to target with or without transactions and enrichment.

If the project under consideration is a major program involving many team members or if there is a repeated ingestion of files and running of pipelines to cleanse, transform, enrich and harmonize data, it   makes more sense to take a software engineering approach towards source code management instead of just writing ad-hoc non-reusable scripts.

 Create python classes and ideally one python class per one python file(.py) with logically related functions:


This brings following benefits:

 

  • Modularity
  • consistent and  standardized solution framework
  • Use Git Hub Repos or Azure DevOps or any such source control supported
  • limiting number of libraries, packages to install on clusters
    • Instead of each developer using their own set of libraries, having common set and reusing would save a lot of time in cluster initialization  (less libraries to install on new executors of cluster, less time it takes to get a cluster up and running)

This brings following benefits:

 

  • Allows code versioning
  • Collaborate better between teams
  • Follow GitOps, DevOps process
  • Unit testing automation thru DevOps, 
  • Code scanning
  • Automated deployment thru DevOps pipelines


 


          

 

Usage of python classes reusing classes


This page explains the style guide, coding conventions to follow as a standard recommendation when writing Python code:


PEP 8 – Style Guide for Python Code | peps.python.org


 

 

 

 

 

 

 

 





Dynamically generate version and apply python package from DevOps

 

Dynamically versioning for packages from DevOps


This is a side-note / microblog to our blog series on Applying DevOps for data engineering topics.

 

In our use-case, considering Python as the programming language used in Databricks.

 

Our source code is written as python class files and committed to repos in Azure DevOps.

 

To build, package and deploy these classes as a library to Databricks cluster, DevOps pipelines is used to automate this task.

 

Every time we make some changes, the build package version needs incrementing.

 

To make this versioning process automatic, versioneer  is one such versioning solution among other options. 

 

Jacob has done wonderful job clearly explaining step-by-step instructions on how to use versioneer in this blog.

 

https://jacobtomlinson.dev/posts/2020/versioning-and-formatting-your-python-code/

 

With versioneer configured, custom packages built from DevOps pipeline gets a dynamic version based on based on "git tag <version>" tag string.



Screenshot showing artifacts from DevOps pipeline 
.whl package built and versioned using Versioneer 
            





 


Thursday, May 25, 2023

Part 1 : Applying DevOps, GitOps principles for data engineering / analytics use-cases


In this series, we will take 12 factor app principles and apply it to a typical data engineering use-case. 

First principle is Codebase. This principle states that one codebase (branch, typically) is to be maintained which would take various configuration settings, parameters, secrets as reference and deploys to multiple environments as shown in the figure below.


reference : developer.ibm.com

  

Often software engineering & DevOps perspective is overlooked for most of the analytics use-cases. Most Data engineers, Data scientists gives more priority on data and how to create data pipelines that would transport data from one layer to another to carry out activities cleansing, validation, transformation, enrichment but DevOps aspect is seen as if it is only applicable for an application workload.

It could be stemming from the thinking that the pipeline can be replicated with less effort from environment to another in a much less-hassle way compared to deploying a microservice or a web app from one to another environment. Thanks to many of the UI framework based ETL / ELT tools in the market. (Talend, Azure Data Factory etc..)

Not only data engineers, scientists in this case even clients or product owners or any relevant stakeholder of a data use-case does not really recognize the value DevOps, GitOps can bring to achieve quality, consistency, automation of deployment. 

Another factor attributes to this is the less availability of DevOps framework and tools for data analytics workloads compared to the toolset available for web or services based workloads.

Even if a team has taken this DevOps into consideration, in most cases it might not follow GitOps deployment model.

Having multiple codebase (branches) for each environment (dev, test, prod) with hard-coded environment specific parameters makes it difficult to do merge from Dev to Test or from Test to Prod branch. It violates the first principle of 12 factor app.

Trunk-based development model is the go-to option to achieve this principle of having single Codebase. This method defines one of the branch (usually 'Main') as trunk and other branches (feature/<feature name or developer name>) or short-lived branch which are created and destroyed post merge to trunk and feature release.

  

Environment specific branching 

 

Trunk-based branching approach

Trunk-based development model is used in Kubernetes and such cloud native development / deployment model when dealing with microservices or web apps like workloads. In Data related use-cases could also follow similar model to bring in best practices when it comes to continuous integration and deployment.


In Azure DevOps, such a trunk-based branching can be followed and to do the environment specific substitutions one could leverage variable groups (one per environment) of library section which maps to key vaults created per environment in their own resource groups / subscriptions based on how different environments are segregated.

  


Library section in Azure DevOps with environment specific variable groups pointing to key vault of respective environment

From DevOps pipelines, these variable groups can be used to substitute environment specific configurations or secrets from the deployment stages designated per environment.

Database deployment task in DevOps

When it comes to database deployment, one could dacpac tasks that are available in Azure DevOps by default. While deploying changes from dev to test or from test to prod (with or without data, based on need), database username and password, database names could be referred from key vault secrets mapped to variable groups. These variables could then be used in DevOps pipelines.

SQL Database deployment task 


Same goes for other data services like Data factory, Databricks, Analysis services etc. 

Data factory deployment task in DevOps

In case of data factory, it is a bit more complicated as one has to substitute many objects


  • global parameters
  • Linked connections to services
  • Datasets
  • Pipelines
  • etc..

Adftools is one great DevOps tool to use for Data factory DevOps.


Databricks deployment task in DevOps

 In case of Databricks deployment, packaging of code and deploying it as library in cluster can be done using Azure DevOps.

Databricks DevOps

 In upcoming blogs, we would see each of this deployments in detail. This blog is just to iterate on DevOps & GitOps practices for data engineering use-case.

 




Wednesday, May 17, 2023

 

Databricks performance issue and resolution

 

 

Setting the Stage:

 

We are using VNET Injected Databricks on Azure for a data engineering use-case.

Infra team created all the Databricks with VNET configuration in all the  environments thru Terraform code and Devops pipeline.

 

Issue:

 

In our DEV environment,  Databricks cluster works fine in terms of performance.

 

In DEV, it takes of about 1 hour to do end to end cleansing, validation, enrichment process for a file with approximately 1000+ transactions. 

 

Same process with similar setup in QA environment, took 4x times more time to complete the pipeline.

 

Troubleshooting:

 

Since the Databricks Terraform module code is same for all environments, we ruled out any infra related issues at first.

 

After spending loads of time on troubleshooting,  we have raised a ticket with Azure and in-turn with Databricks  support team.

 

First, support team looked at number of python packages that is getting installed on to the cluster and suggested adding init script to install all libraries to be installed on all executors during cluster init to avoid time taken to install on each executors after cluster start event. Following init script was added to the cluster configuration:

 

databricks.libraries.enableSparkPyPI false 

 

No luck with this suggestion.

 

Next step, we shared detailed logs the Databricks support team running the pipeline in our DEV and QA environment. Nothing significant there when we compared the logs from all the environments.

 

Next, we did a simple test in DEV and QA environment, to create few 100 items on the mounted storage location (which is a delta lake enabled blob storage) where we are reading raw data and writing delivery data. QA environment took few seconds more than the dev environment.

 

Azure support team investigated the cause of performance using the following command from the notebook from two environments:


%sh nslookup ourdatalake.dfs.core.windows.net
%sh telnet (ipaddress returned from nslookup)



The IP address returned from Dev was reachable thru telnet but QA returned different IP address. Telnet to DEV took only 0.1 seconds whereas telnet to QA IP took 4 seconds. 

 

It was found out that the VNET peering in QA environment was done to a custom DNS which was not correct. 

 

Though Databricks terraform code is same for both environments, VNET peering and configuration (IP Address to DNS) was different in QA environment which caused the latency. After doing the proper VNET peering to the correct DNS server, the issue was resolved.

 

Learnings:

 

- Standard rulebook defined for troubleshooting alone would not help. At times, it requires pragmatic, practical thinking outside the box, questioning the obvious.

 

- Not to rule out any specific area completely off the table (in our case we ruled out issues with infra at earlier stage citing Terraform created environment)

 

It took four parties (In this case, Databricks support, Azure support, project team (us), client infra team)  to work together as a team with transparency and non-blaming attitude to eventually resolve the issue. (Good) Team work, works!

 

 





Wednesday, March 15, 2023

  

Custom code or low-code / serverless ?

In this series, we will take a high-level architecture of a data engineering use-case and let us discuss on trade-offs that arises when deciding to choose technology / services option to implement a certain functionality.

 Use-case Requirement:

 To connect to a non-personal outlook email account and look for mails from vendors (distributors of our client products) which would have attachments considering transactions of sales made to end customers. Emails are sent on a monthly recurring schedule. Requirement is to download this attachment, store it in a data lake for further downstream processing like cleansing, validation, transformation, enrichment etc.

Trade off here is to choose between writing custom code option to connect to Outlook email account and look for specific email attachments from a specific email address or to use a PaaS serverless service like Logic Apps which can perform this functionality out-of-the-box following low-code model.

 Using one of the PaaS low-code service (In this case, Logic Apps)

Using logic apps

Pros

  • Establish connection to outlook, storage account using built in Azure authentication in quick time without having to write code to authenticate / authorize following token based auth model
  • Low-code framework helps to develop quick prototypes and turn it into production grade soon
  • Less maintenance as less code involved
  • Transparent cost and billing (Azure cost)
  • Provides security against common security threats thru Azure built-in security features

 Cons

  • Though cost here is not a drawback but one need to understand the pricing model as each connections and activities in the workflow consumes cost. If your requirement has a need to connect to multiple services or do multiple activities, it is best to do a rough estimate using pricing calculator (Pricing - Logic Apps | Microsoft Azure )
  • Vendor lock-in
  • Provides less flexibility to developers if there is more customization needed (like performing scanning of attachments)
  • Limitations that comes with such services (Known issues and limitations)

 Using any of the compute service in Azure (to host the custom code)

 

 

 

Pros

  • No vendor lock-in
  • Though you can run code locally free of cost, code requires hosting in a compute service on cloud. Cost factor depends on the service chosen to host the code. If shared with other compute services, cost can be reduced
  • More control over functionality

 Cons

  • Increase in development effort
  • Maintenance effort involved to troubleshoot, debug bugs in code
  • Manual effort to prevent against DDOS like attacks

 Ask the following questions (to self and to client stakeholder)

 

  1. Is the application / platform already using any of these compute instances?

 It makes sense to host this custom code  there to make use of the existing compute capacity there by we can save cost without going for an additional service. If Azure functions is already being run on Application service environment, we could indeed provision another instance to host this custom code to leverage existing compute capacity.

 

  1. Is there a requirement for the client to use this piece of functionality in multiple places or connecting to multiple email accounts?

 If there is intention on reusing it in multiple places in other apps as well, then it makes more sense to use go with custom code approach.

 

  1. Is it an one-off functionality used as a small piece in a large application, does it make sense to invest a lot of time writing code ?

 If yes, it does make sense to go for such serverless services out-of-the-box

 

  1. Is there a chance your application would get migrated to a different cloud platform?

 If yes, it does make sense to go with custom code functionality as we don't want to get into vendor lock-in with such cloud specific services which requires effort to redesign in an equivalent service in another cloud.


Summary

 

It all boils down to the client & application requirement, based on that do a cost-to-benefit ratio to select the option that fits. There is no one-option fits for all purpose when it comes to such trade-off decisions.

 Additional note here, with either of the option DevOps to do continuous integration, testing and deployment would require more or less similar effort.