How Grasshopper uses BigQuery and Cloud Dataflow for their real-time financial data app

For the past year, we’ve been working with Grasshopper, a proprietary trading firm based in Singapore, to scale its custom-built data processing platform. Grasshopper’s technology powers their use of time-series data in financial markets, providing information to evaluate liquidity or liquidity risks in those markets. The firm uses Google Cloud Platform (GCP) extensively to store, process and analyze large volumes of real-time market and trading data globally. It specifically uses BigQuery and Cloud Dataflow to make near real-time trading decisions and develop trading strategies. Solace PubSub+ event brokers move events dynamically and in real time, to and from BigQuery via Cloud Dataflow.Grasshopper’s primary goal in adopting GCP was to improve and scale its quantitative research capabilities by creating a research infrastructure accessible to team members that doesn’t limit their work. The firm needed reliable, accurate and stable data processing pipelines.Grasshopper’s small team has to use technology strategically. Grasshopper designed Ahab, an Apache Beam-based Java application, as a high-performance data processing pipeline that uses real-time market data to calculate the value of the order book—a dynamic list of buy and sell orders that dictates stock pricing—from real-time market data received from multiple stock exchanges. The order book is the foundation for many trading and analytics applications. Ahab then ingests the end result to BigQuery, GCP’s fully managed data processing and analysis tool, utilizing the tool’s availability and scalability.Before Ahab, Grasshopper relied on a solution developed in-house on Cassandra. The operational burden to maintain this technology stack, add new features and scale fast enough to meet future needs was extremely difficult to manage, particularly without a dedicated team to handle both the required infrastructure and applications. Grasshopper shifted to GCP to design and build Ahab, allowing the firm’s data engineering team to focus on data science rather than infrastructure maintenance.Grasshopper’s quantitative analysts (known as quants) rely on the ability to process huge amounts of data in a precise and accurate way, to interact with rapidly changing financial markets. This allows for their trading research platforms to scale according to market conditions. For example, when markets are more volatile and produce more market data, the analysts have to be able to adjust accordingly. This solution allows the team to know that the data is reliable, correct and scalable, without having to worry about capacity or performance limitations.Grasshopper’s team built Ahab to meet several goals:To transport data from its on-premises environment to GCP;To persist the vast volumes of market data received into BigQuery in query-able form so that Grasshopper’s traders can analyze and transform the data based on their needs;To confirm the feasibility of using Apache Beam to calculate the order books in real time and store them on a daily basis from financial market data feeds; andUltimately, to allow traders to improve their probability of success when the strategies are deployed.Here’s an overview of how Ahab works to gather and process data:Gathering source data from financial marketsGrasshopper uses data sourced from its co-located environment, where the stock exchange allocates rack space so financial companies can connect to that platform in the most efficient, lowest-latency way. Grasshopper has been using Solace PubSub+ appliances at the co-located data centers for low-latency messaging in the trading platform. Solace PubSub+ software running in GCP natively bridges the on-premises Solace environment and the GCP project. Market data and other events are bridged from on-premises Solace appliances to Solace PubSub+ event brokers running in GCP as a lossless, compressed stream. Some event brokers are on-premises, while others are on GCP, creating a hybrid cloud event mesh and allowing orders published to the on-prem brokers to flow easily to the Solace cloud brokers. The SolaceIO connector consumes this market data stream and makes it available for further processing with Apache Beam running in Cloud Dataflow. The processed data can be ingested into Cloud Bigtable, BigQuery and other destinations, processing in a pub/sub manner.The financial market data arrives into Grasshopper’s environment in the form of deltas, which represent updates to an existing order book. Data needs to be processed in sequence for any given symbol, which requires using some advanced functions in Apache Beam, including keyed state and timers.This financial data comes with specific financial industry processing requirements:Data updates must be processed in the correct order and processing for each instrument is independent.If there is a gap in the deltas, a request must be sent to the data provider to provide a snapshot, which represents the whole state of an order book.Deltas must meanwhile be queued while Ahab waits for the snapshot response.Ahab also needs to produce its own regular snapshots so that clients can query the state of the order book at any time in the middle of the day without applying all deltas since the start-of-day snapshot. To do this, it needs to keep track of the book state.The data provider needs to be monitored so that connections to it and subscriptions to real-time streams of market data can be refreshed if necessary.How Grasshopper’s financial data pipeline worksThere are two important components to processing this financial data: reading data from the source and using stateful data processing.Reading data from the sourceGrasshopper uses SolaceIO, an Apache Beam read connector, as the messaging bus for market data publication and subscriptions. The team at Grasshopper uses Solace topics streaming into Solace queues to ensure that no deltas are lost and no gaps occur. Queues (and topics) provide guaranteed delivery semantics, such that if message consumers are slow or down, Solace stores the messages till the consumers are online, and delivers the messages to the available consumers. The SolaceIO code uses checkpoint marks to ensure that messages in the queues transition from Solace to Beam in a guaranteed delivery and fault-tolerant manner—with Solace waiting for message acknowledgements before deleting them. Replay is also supported when required.Using stateful data processingGrasshopper uses stateful processing to keep an order book’s state for the purpose of snapshot production. The firm uses timely processing to schedule snapshot production and to also check data provider availability. One of this project’s more challenging areas was retaining the state of the market financial instrument across time window boundaries.The keyed state API in Apache Beam let Grasshopper’s team store information for an instrument within a fixed window, also known as a keyed-window state. However, data for the instruments needs to propagate forward to the next window. Our Cloud Dataflow team worked closely with Grasshopper’s team to implement a workflow that let it have the fixed time windows flow into a global window and makes use of the timer API to ensure the Grasshopper team retained the necessary data order.The data processed with Apache Beam is written to BigQuery in day-partitioned tables, which can then be used for further analysis. Here’s a look at some sample data:Sample user acceptance testing data:For deployment and scheduling, Grasshopper uses Docker and Google Cloud Repositories to build, deploy and run the Cloud Dataflow jobs, using Nomad for job scheduling. The firm is also in the process of experimenting with Beam templates as an alternate deployment method.Tips for building fast data processing pipelinesThroughout the process of building the Ahab application, Grasshopper’s team learned some valuable lessons. Here are some tips:Increase throughput by dividing up topic subscriptions across more queues and more SolaceIO sources. Dataflow will also spin up more resources with its auto-scaling feature when needed.Use available tools to track the order book status. Since keyed state in Beam aligns to the “group by” key (in this case, the instrument), and window of time, Grasshopper uses the global window to keep track of the order book state. Employ panes to trigger incremental processing of deltas; use a bag state plus sorting to deal with out-of-order arrival of deltas within panes when in the global window.Rely on templates to separate pipeline graph construction and execution, simplifying deployment. Apache Beam’s ValueProvider isn’t an applicative functor; Grasshopper had to write its own implementation to combine multiple ValueProviders.Include all the data you need up front, as display data is set at pipeline construction time and cannot be deferred.Co-locate the Cloud Dataflow job coordinator and those teams working on the data pipeline in order to improve throughput, since watermark processing is centralized. Watermarks provide the foundation for enabling streaming systems to emit timely, correct results when processing data.Generate BigQuery schemas automatically with protocol buffers.  Use class variables with get-or-init to share connections across Apache Beam PTransform bundles and even across different instances in the same Java VM. Interesting annotations include DoFn.Setup, DoFn.StartBundle.Note that fusion, an optimization technique, can lead sources to fuse with downstream transforms, delaying checkpoint mark finalization. Try GroupByKey to prevent fusion.Use a counting source to trigger some pipeline setup work.These tools have helped Grasshopper researchers think big about their data possibilities. “Grasshopper researchers’ imaginations aren’t limited by what the infrastructure can do. We’ve resolved their data reliability, data accuracy and stability issues by using GCP,” says Tan T. Kiang, Grasshopper’s CTO. “Internal users can now all work with this data with the knowledge that it’s reliable and correct.”To learn more, check out this example of how to spin up a GCP environment with SolaceIO and Cloud Dataflow. Learn more about using GCP for data analytics.Thanks to additional contributions from: Guo Liang, Nan Li and Zhan Feng.
Quelle: Google Cloud Platform

Microsoft 365 boosts usage analytics with Azure Cosmos DB

This post is part of a 2-part series about how organizations are using Azure Cosmos DB to meet real-world needs, and the difference it’s making for them. In this first post we explore the challenges that led the Microsoft 365 usage analytics team to take action, the architecture of the new solution, and migration of the production workload. In part 2, we’ll examine additional implementation details and the outcomes resulting from the team’s efforts.

The challenge: Understanding the behavior of more than 150 million active users

Office 365 is a flagship service within the Microsoft 365 Enterprise solution, with millions of commercial customers and more than 150 million active commercial users each month. Office 365 provides extensive reporting for administrators within each company on how the service is being used including license assignment, product-level usage, user-level activity, site activity, group activity, storage consumption, and more. The Microsoft 365 usage analytics team incrementally adds new reports to cover more Office 365 services.

Previous architecture

The telemetry data needed to generate such reports was collected in a system called usage analytics, that until recently ran on the community version of MongoDB. The image below shows the data flow, with an importer web service used to write log streams collected in Azure Blob storage to MongoDB. An OData web service exposes APIs to extract the stored data for both reporting within the Microsoft 365 admin center and for access through Microsoft Graph. Every day, as part of a full daily refresh, several billion rows of data were added to the system.

Each of the primary geographies served by Office 365 has an independent usage analytics repository, all employing a similar architecture. In each geography, data was stored on two MongoDB clusters, with each cluster consisting of up to 50 virtual machines (VMs) hosted in Azure Virtual Machines and running MongoDB. The two clusters in each geography functioned in a primary/backup configuration. Data was written separately to both clusters and under normal operation, all reads were performed on the primary cluster.

Each cluster was designed for a write-heavy workload. To speed writes, sharding of data across individual cluster nodes was done using a random globally unique identifier (GUID) such as a MongoDB shard key. Every day for a few hours, new data from Azure Blob storage was written using a multithreaded importer. Each thread wrote batches of 2,000 records at a time to all cluster nodes and waited for all records to finish before starting on the next batch of 2,000.

Problems and pains

This architecture presented several problems for the Microsoft 365 usage analytics team, ranging from excessive administrative effort and costs to limited performance, reliability, availability, and scalability. Some specific pains included:

Poor performance. Reads were inefficient and reports sometimes timed out because of the use of a random GUID as a shard key required querying all nodes. In addition, during the few hours each day when new data was imported, with writes and reads hitting the primary cluster node during the same time, performance was poor. To make matters worse, if anything failed during a batch write, which often happened due to internal database errors, all 2,000 records had to be written again.
Full-time administration. Maintenance of the MongoDB clusters was manual and time-consuming, requiring human resources to dedicate time towards managing the clusters. This put an unnecessary resource constraint on the team, which would rather use its bandwidth to bring new reports to market. Plus, bugs in MongoDB 3.2 required all servers to be restarted weekly. And renewing the security certificates on each cluster node within the virtual network had to be completed annually, and required an additional two weeks of effort per cluster. During such routine administrative tasks, if an operation failed on one cluster node, the entire cluster was down until the issue was resolved.
High costs. Significant costs were incurred to run the MongoDB backup clusters, which remained idle most of the time. Those costs continued to increase as Office 365 usage grew.
Limited scalability. Less than three years after MongoDB was initially deployed, the largest repository was almost at maximum capacity. Any spare capacity was forecast to run out within six months as more products and reports were added, with no easy way to scale.

While the team was dealing with the architectural limitations of its existing solution, they were looking ahead to a lineup of new, high-scale capabilities that they wanted to enable for customers in the usage analytics space. The team started looking for a new, cost-effective, and low-maintenance solution that would let them move from self-maintained VMs running MongoDB to a fully managed database service.

Geo-distribution on Azure Cosmos DB: The key to an improved architecture

After exploring their options, the team decided to replace MongoDB with Azure Cosmos DB, a fully managed globally-distributed, multi-model database service designed for global distribution and virtually unlimited elastic scalability. The first step was to deploy the needed infrastructure.

In contrast to the primary/backup, two-cluster configuration that it had used with MongoDB, the team took advantage of turnkey global distribution of active data in Azure Cosmos DB. Using multiple Azure regions for data replication provided an easy way to write to any region, read from any region, and better balance the workload across the database instances—all while relying on Azure Cosmos DB to transparently handle active data replication and data consistency.

“True geo-replication had been deemed too hard to do with MongoDB, which is why the previous architecture separately wrote data to both the primary and backup clusters,” says Xiaodong Wang, a Software Engineer on the Microsoft 365 usage analytics team. “With Azure Cosmos DB, implementing transparent geo-distribution literally took minutes—just a few mouse clicks.”

The image below shows the internal architecture of the usage analytics system today. Each of the primary geographies served by Office 365 is served by Cosmos databases geo-replicated across two Azure regions within that geography. Under normal operating conditions, writes are sent to one region within each geography while reads are routed to both. If for some reason a region is prevented from serving reads, those reads are automatically routed to the other region serving that same geography.

Migrating a production workload to Azure Cosmos DB

Developers began writing a new data access layer on the new infrastructure to accommodate reads and writes, using the Azure Cosmos DB SQL (Core) API. After bringing the new system online, the team began to write new production data to both old and new systems, while continuing to serve production reports from the old one.

Developers began to address the reports that they would need to duplicate for the new solution, working through them one at a time. Separate Cosmos containers were created within the database for most reports, so that each collection would be separately scalable after the system came online. The largest reports were addressed first to ensure that Azure Cosmos DB could handle them, and after each new report was verified, the team began serving it from the new environment.

After all functionality and reports were being served by Azure Cosmos DB, and everything was running as it should, the team stopped writing new data to the old system and decommissioned the MongoDB environment. The development team was able to move to Azure Cosmos DB, rewrite the data access layer, and migrate all reports for all geographies without any service interruptions to end users.

In part 2 of this series, we'll cover additional implementation details and the outcomes resulting from the Microsoft 365 usage analytics team’s implementation of Azure Cosmos DB.
Quelle: Azure

Microsoft 365 boosts usage analytics with Azure Cosmos DB – Part 2

This post is part of a 2-part series about how organizations are using Azure Cosmos DB to meet real world needs, and the difference it’s making for them. In part 1, we explored the challenges that led the Microsoft 365 usage analytics team to take action, the architecture of the new solution, and migration of the production workload. In this post, we’ll examine additional implementation details and the outcomes resulting from the team’s efforts.

Finding the right partition key—a critical design decision

After moving to Azure Cosmos DB, the team revisited how data would be partitioned (referred to as “sharding” in MongoDB). With Azure Cosmos DB, each collection must have a partition key, which acts as a logical partition for the data and provides Azure Cosmos DB with a natural boundary for distributing data across partitions. The data for a single logical partition must reside inside a single physical partition. Physical partition management is managed internally by Azure Cosmos DB.

The Microsoft 365 usage analytics team worked closely with the Azure Cosmos DB team to optimize data distribution in a way that would ensure high performance. The team initially tried the same approach as they used with MongoDB, which was using a random GUID as the partition key. However, this required scanning all of the partitions for reads and over allocating resources for writes, making writes fast but reads slow. The team then tried using Tenant ID as the partition key but found that the vast difference in the amount of report data for each tenant made some partitions too hot, which would have required throttling, while others remained cold.

The solution lay in creating a synthetic partition key. In the end, the team solved both the slow read and too hot and too cold issues by grouping 100 documents per tenant ID into a bucket and then using a combination of tenant IDs and bucket IDs as the partition key. The bucket ID loops from 1 to n, where n is a variable and can be adjusted for each report.

Handling four terabytes of new data every day

In one region alone, more than 6 TB of data is stored in Azure Cosmos DB, with 4 TB of that written and refreshed daily. Both of those numbers are continuing to grow. The database consists of more than 50 different collections, and the largest is more than 300 GB in size. It consumes an average of 150,000 request units per second (RU/s) of throughput, scaling this number up and down as needed.

The different collections map closely to the different reports that the system serves, which in turn have different throughput requirements. This design enables the Microsoft 365 usage analytics team to optimize the number of RU/s that are allocated to each collection (and thus to each report), and to elastically scale that throughput up or down on a per-collection and per-report basis.

Built-in, cost-effective scalability and performance

With Azure Cosmos DB, the Microsoft 365 usage analytics team is delivering real-time customer insights with less maintenance, better performance, and improved availability—all at a lower cost. The new usage analytics system can now easily scale to handle future growth in the number of Office 365 commercial customers. All that was accomplished in less than five months, without any service interruptions. “The benefits of moving from MongoDB to Azure Cosmos DB more than justify the effort that it took,” says Guo Chen, Principal Software Development Manager on the Microsoft 365 usage analytics team.

Improved performance and service availability

The team’s use of built-in, turnkey geo-distribution provided a way to easily distribute reads and writes across two regions. Combined with the other work done by the team, such as rewriting the data access layer using the Azure Cosmos DB Core (SQL) API, this enabled the team to reduce the time for the majority of reads from 12 milliseconds to 3 milliseconds. The image below illustrates this performance improvement.

Although this difference may seem negligible in the context of viewing a report, it resulted in significant service improvements. “There are two ways to access reporting data in the usage analytics system: through the Microsoft 365 admin center, and through Microsoft Graph,” explains Xiaodong Wang, a Software Engineer on the Microsoft 365 usage analytics team. “In the past, people complained that the Graph API was too slow. That’s no longer an issue. In addition, service availability is better now because the chances of any query timing-out are reduced.”

The image below shows just how much service availability is improved. The graph illustrates successful API requests divided by the total API requests and shows that the system is now delivering a service availability level of greater than 99.99 percent.

Zero maintenance and administration

Because Azure Cosmos DB is a fully managed service, the Office 365 development team no longer needs to devote one full-time person to database maintenance and administration. Annual certificate maintenance is no longer a burden, and VMs no longer need to be restarted weekly to protect against any compromises in service availability.

“In the past, with MongoDB, we had to allocate core developer resources to administrative management of the data store,” says Shilpi Sinha, Principal Program Manager on the Microsoft 365 usage analytics team. “Now that we are running on a fully managed service, we are able to repurpose developer resources towards adding new customer value instead of managing the infrastructure.”

Elastic scalability

The Microsoft 365 usage analytics team can now scale database throughput up or down on demand, as needed to accommodate a fluctuating workload that on average, is growing at a rate of 8 percent every three months. By simply adjusting the number of RU/s allocated to each collection, which can be done in the Azure portal or programmatically, the team can easily scale up during heavy data-ingestion periods to handle new reports, and most importantly, to accommodate continued overall growth of Office 365 around the world.

“Today, all we need to do is keep an eye on request unit usage versus what we have budgeted,” says Wang. “If we’re reaching capacity, we can allocate more RU/s in just a few minutes. We don’t have to pay for spare capacity until we need it and more importantly, we no longer need to worry whether we can handle future growth in data volumes or report usage.”

Lower costs

On top of all of those benefits, the Microsoft 365 usage analytics team increased data and reporting volumes while reducing its monthly Microsoft Azure bill for the usage analytics system by more than 13 percent. “After we cut over to Azure Cosmos DB, our monthly Azure expenses decreased by almost 20 percent,” says Chen. “We undertook this project to better serve our customers. Being able to save close to a quarter-million dollars per year—and likely more in the future—is like icing on the cake.”

“Usage analytics are offered as part of the base capability to all Microsoft 365 customers, irrespective of the type of subscription they purchase," said Sinha. "Keeping the costs of operating this service as low as possible contributes to our goal of running the overall Microsoft 365 service as efficiently as possible while at the same time giving our customers new and improved insights into how their people are using our services.”

Learn more about Microsoft usage analytics and Azure Cosmos DB today.
Quelle: Azure