This article contains only one line of code 🙂
There are many patterns to help with software design. The Gang-of-Fourth provides us with patterns to organize the code better. Enterprise Integration patterns (EIP) are the other list of patterns. They helps us with communications between applications.
There are two kinds of communications between a couple of applications:
1. Synchronous — HTTP, Sockets
2. Asynchronous — Message Queues, Databases, Files, E-Mail, Cloud storage
One of EIP is Request-Reply.
It is very simple. When one service needs in some data it sends a Request to the other service which is responsible of such data. Then responsible service prepares an Response and provides the Requestor with it.
In Synchronous communications
It is not a secret, that when two applications communicate each other using sockets or HTTP, responses are sent back to Requestor right after request is caught. No mess, no miscommunications.
But sometimes there are no possibility to choose which kind of communication to use.
In Asynchronous communications
The Asynchronicity bring us some sort of complexity.
There are no worries in case when an application does not required in response just-in-time (event driven).
But sometimes it needs to gather some data from others services for further processing (e.g. in transaction). I call such behavior “Sync through Async”. Such cases are very interesting in their deviations.
My case I want to tell you about is such one.
The company where I work at is to provide E-Wallets and E-Payments for people. A few weeks ago I have implemented the service for risk scoring of incoming payments. What is it? It is a service for qualitative assessment of any payment and making the verdict — should the payment be processed or not.
Payment risk scoring in our case is to calculate a value depending on three incoming parameters:
- Risk scoring of user (wallet)
- Risk scoring of country where the payment is going to (according to political verdicts)
- Risk scoring of payment type
So if resulting value (after calculating) becomes greater than the critical bordered value, the payment should be declined.
We have had the service responsible to user risk scoring already when I started. So my goal was to implement the service to calculate the very payment risk, but the service should have gathered user score from the responsible service. We used Apache Kafka for service communications only.
Here are a principle scheme of the microservice with the name RMS (risk management system).
- when the main payment processing service meet a payment having the status “antifraud is to be checked for”, it send Payment ID into Kafka topic with name “PAYMENT-READY-FOR-ANTIFRAUD”
- Antifraud service catch this payment ID from Kafka, enrich it with full payment data reading it from database, then sends payment model into Kafka topic named “PAYMENT-DETAILS”
- PAYMENT DETAILS Kafka topic is read by RMS in order to:
a) receives payment
b) asks user risk score from Know-Your-Customer (KYC) service sending the Request into Kafka topic with name “WALLET-KYC-REQUEST”
c) Reply for such Request is waited from Kafka topic having name “WALLET-KYC-RESPONSE” by RMS . Here is Request-Reply pattern — each request waits its reply to proceed further calculations.
d) country score and payment type score are gathered from local database by RMS both
e) combining all three risk score values altogether RMS is to calculate payment risk score
f) payment risk score is sent into Kafka topic with name “PAY-DECISION”
- Antifraud service listens to payment decisions from Kafka topic, processes them in order to approve or decline the payment
The described above solution was successfully tested onto two of our independent QA stands. But it started to fail right after it was deployed onto production infrastructure (we use Kubernetes).
Checking application logs I noted that RMS service sent user score Request into Kafka topic “WALLET-KYC-REQUEST”, then KYC service sent the Reply into Kafka topic “WALLET-KYC-RESPONSE”, but RMS service did not use them together in order to calculate decision. All Requests failed with timeout while were waiting their Responses. All Responses did not “see” corresponding Requests and failed with timeout as well.
I decided that Request was missing corresponding Response due to they had different correlation keys (please see Aggregate EIP implemented in Apache Camel for details). But actually it was not true. We have checked it up by adding more logging messages.
And I have revealed the cause right after I have noticed that RMS service was running in two instances in one Kubernetes cluster. It explained all we have seen in logs.
One RMS instance sent Request and another instance received corresponding Response. Obviously, they could not be aggregated together.
Usually such a problem arises because of the microservice architecture. It is important to keep in mind that application can be run in more than one instance in parallel. And when data needs to be processed inside a single instance of an application (in single transaction or without any) from start to end, it really important to make sure that the application does not consume data is not intended for itself. Being more precise, the application should consume the data intending only to itself.
After understanding the cause I had redesigned RMS service to make it consuming Responses in the instance which has the corresponding Request only.
In order to explain how it should work I should tell about Kafka managing.
All Kafka topics are divided into partitions.
Kafka clients are of types: producer and consumer. First one puts the data into Kafka topic and the latter reads the data listening (polling) it from Kafka topic accordingly.
When the Kafka producer sends a message into Kafka topic, this message is placed in only one partition of the topic. The magic is how the producer chooses the partition to put message in. Every message might have the Key (it might be null at all). All Producers have their Hash-Function which helps to select a concrete partition to put the message to. So that Hash-Function takes the message Key as an argument to calculate the Result. Then this Result visits the Partition number where the message will be put to.
partitionNumber = messageKey % partitionsCount
The partitions allow writing messages in parallel. If a message is published with a key, then, by default, the producer will hash the given key in order to determine the destination partition. This provides a guarantee that all messages with the same key will be sent to the same partition. In addition, a consumer will have the guarantee of getting messages delivered in order for that partition.
In other side the Consumer does not read the data along. It have to work in Consumer Group (remember group-id in connection properties?). Kafka guarantee every message is delivered every consumer group. Kafka guarantee that every message is delivered to only one consumer in each consumer group.
It is complex scheme how consumers work in their groups — there is one leader and the others interacting by communication protocol having such steps as connecting, leader selecting, partitions distribution and others. All is need to know now — partition distribution uses a Strategy which could be selected on application startup.
RMS service had one producer to write into WALLET-KYC-RESPONSE and one consumer to read from WALLET-KYC-RESPONSE. KYC service was not scaled at that moment — it listened all the partitions and wrote in one of partitions of topic WALLET-KYC-RESPONSE. All RMS consumers (in all instances of RMS) worked into one consumer group. Please take a look at the scheme below.
Our problem became how to make the consumer listen the very partition which the producer will put the message into.
The good solution would be to adjust all producers to put messages right into the topics that was listened to by consumers which was in need of the very data. It became a mess! The other side — such scenario is very expansive to maintain. They must to keep in mind the whole scheme of interactions between four microservices and to tune them all if only one of them needs scaling. If they do not, the data would have might be lost.
The better solution
Our solution takes for about two hours to be implemented. And the maintaining will be inexpensive.
We use Kafka polices to administer all Kafka connections — Kafka users and Consumer groups both.
So we added into the Kafka polices a rule to allow consumer group described with a mask in group-id. It allows us to set up consumer group in RMS service into runtime right after it starts.
We literally have written:
String groupId = "rms-" + UUID.random().toString();
It allows not to have a headache while scaling RMS service. All RMS instances receives all Responses. It does not matter whether the instance has sent Request right before and is awaiting the Response or not.
Therefore, whereas Responses are gathered by every running instance of RMS service, not all of they could be paired with according Request. And it is normal for us, because it is a good scenario! We have the Aggregator to pair Request with Reply. And if any of them becomes not paired it failed with timeout, writing a log message. Much better — we get guarantee that every Request with master-data will be paired with Response and the process will not be interrupted.
- The design of asynchronous interaction should take into account not only aspects of the interaction itself, but also the stages of architecture expansion.
- Synchronous interaction would be cheaper to support the solution we found.
- When implementing Request-Reply on Kafka, it is important that the retention periods of messages in the topic in the Kafka broker are set to values greater than the time of a possible restart of applications — this will reduce the risk of data loss during restarts.