[MQ message idempotent (de-duplication) general solution]

Hits: 0

source:

jaskey.[github].io/blog/2020/06/08/rocketmq-message-dedup/

[Message middleware] is a commonly used component of distributed systems, and it has a wide range of application values ​​in terms of asynchrony, decoupling, and peak shaving. We usually think that the message middleware is a reliable component – the so-called reliability here means that as long as I successfully deliver the message to the message middleware, the message will not be lost, that is, the message will at least guarantee that the message can be consumed by consumers Successful consumption once is one of the most basic features of message middleware, which is what we often call “AT LEAST ONCE”, that is, the message will be “successfully consumed once” at least.

For example, a message M is sent to the message middleware, the message is delivered to the consumer program A, A receives the message, and then consumes it, but the program restarts when the consumption is halfway through. At this time, the message is not marked as successful consumption. , the message will continue to be delivered to the consumer, and the message middleware will stop delivery until its consumption is successful.

However, due to this reliability, messages may be delivered multiple times. For example, in the example just now, after program A receives the message M and completes the consumption logic, when it wants to notify the message middleware “I have successfully consumed”, the program restarts, then for the message middleware , this message has not been successfully consumed, so he will continue to deliver. At this time, for application A, it seems that the message has been successfully consumed, but the message middleware is still repeatedly delivered.

In the RockectMQ scenario, the message with the same messageId is repeatedly delivered.

Reliable delivery based on messages (messages are not lost) has a higher priority, so tasks with less heavy messages will be transferred to the application self-implementation, which is why the [RocketMQ] has a higher priority, so tasks with less heavy messages will be transferred to the application self-implementation, which is why the [RocketMQ] documentation emphasizes that the consumption logic needs to be self-implemented idempotent. The logic behind is actually: no loss and no weight are contradictory (in a distributed scenario), but there is a solution to message duplication, and message loss is very troublesome.

Simple message deduplication solution

For example: Suppose the message consumption logic of our business is: insert the data of an order table, and then update the inventory:

insert into t_order values .....
update t_inv set count = count-1 where good_id = \'good123\';

To achieve idempotency of messages, we might take such a scheme:

select * from t_order where order_no = \'order123\'

if (order != null) {
     return  ;//The message is repeated, return directly
}

This does work well in many cases, but in concurrent scenarios, there are still problems.

Concurrent duplicate messages

Assuming that it takes 1 second for all the consumed codes to add up, and there are duplicate messages arriving within this 1 second (assuming 100 milliseconds) (such as the producer quickly resending, the broker restarting, etc.), then it is very likely that the above deduplication code You will find that the data is still empty (because the previous message has not been consumed and the order status has not been successfully updated),

Then it will penetrate the check baffle, and finally cause the repeated message consumption logic to enter the non-idempotent safe business code, thus causing the problem of repeated consumption (such as the primary key conflict throws an exception, the inventory is repeatedly deducted without the release, etc.)

One of the solutions for concurrent deduplication

To solve the problem of message idempotency in the above concurrent scenario, a desirable solution is to start a transaction and change the select to a select for update statement to lock the record.

select * from t_order  where  order_no = \ 'THIS_ORDER_NO\'  for  update //Open the transaction
 if (order.status != null) {
     return  ;//The message is repeated, return directly
}

However, the logic of such consumption will cause the entire message consumption to become longer and the concurrency to decrease due to the introduction of transaction packages.

Of course, there are other more advanced solutions, such as optimistic locking for updating the order status, and re-consumption of messages if the update fails. However, this requires more complex and detailed code development and library table design for specific business scenarios, which is beyond the scope of this article.

But whether it is the solution of select for update or optimistic locking, it is actually based on the business table itself to deduplicate, which undoubtedly increases the complexity of business development, and most of the request processing in a business system is dependent on For MQ, if each consumption logic itself needs to be deduplicated/idempotent based on the business itself, this is a tedious workload. This paper hopes to explore a general message idempotent processing method, so as to abstract a certain tool class to apply to various business scenarios.

Exactly Once

In the message middleware, there is a concept of delivery semantics, and there is a semantics called “Exactly Once”, that is, the message will definitely be successfully consumed, and it will only be consumed once. The following is the explanation of Exactly Once in Alibaba Cloud:

Exactly-Once means that the message sent to the message system can only be processed by the consumer and only once. Even if the producer retries the message sending and a message is repeatedly delivered, the message is only consumed once at the consumer.

In the field of idempotent processing of our business messages, it can be considered that the code of the business message will definitely be executed and only executed once, then we can think of it as Exactly Once.

But it is almost impossible to find a general solution in a [distributed] scenario. However, if it is for consumption logic based on database transactions, it is actually feasible.

Insert message table based on relational database transaction

Suppose the message consumption logic of our business is: update the status of an order table in the MySQL database:

DefaultMQPushConsumer consumer =  new  DefaultMQPushConsumer( "TEST-APP1" );
consumer.subscribe("TEST-TOPIC", "*");

String  appName = consumer.getConsumerGroup(); // In most cases, the consumer group name can be used directly 
StringRedisTemplate stringRedisTemplate =  null ; // The process of obtaining StringRedisTemplate is omitted here
DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
DedupConcurrentListener messageListener = new SampleListener(dedupConfig);

consumer.registerMessageListener(messageListener);
consumer.start();

To achieve Exaclty Once, that is, the message is only consumed once (and must be guaranteed to be consumed once), we can do this: add a message consumption record table to the database, insert the message into this table, and update the original order Commit together with this insert action in the same transaction to ensure that the message will only be consumed once.

  1. open transaction

  2. Insert the message table (to deal with the problem of primary key conflict)

  3. Update order table (original consumption logic)

  4. commit transaction

illustrate:

  1. At this time, if the message consumption is successful and the transaction is committed, then the message table is inserted successfully. At this time, even if RocketMQ has not received the update of the consumption site and re-delivered it, the message will be inserted and it will be regarded as having been consumed. Update the consumption site. This guarantees that our consuming code will only be executed once. 2. If the service hangs (such as restart) before the transaction is committed, the local transaction is not executed, so the order is not updated, and the message table is not inserted successfully; and for the RocketMQ server, the consumption site is not updated, so the message is still It will continue to be delivered, and it is found that the message is successfully inserted into the message table after delivery, so it can continue to be consumed. This ensures that messages are not lost.

In fact, the implementation of the EXACTLY-ONCE semantics of Alibaba Cloud ONS is based on the transaction characteristics of the database. For more details, please refer to: https://help.aliyun.com/document_detail/102777.html

Based on this method, it is indeed capable of extending to different application scenarios, because his implementation scheme has nothing to do with the specific business itself – but relies on a message table.

But here it has its limitations

  1. Message consumption logic must depend on relational database transactions. If the consumption process also involves modification of other data, such as Redis, a data source that does not support transactional features, these data cannot be rolled back.

  2. The data of the database must be in one library, cross-library cannot be resolved

Note: In terms of business, the design of the message table should not use the message ID as the identification, but should use the business primary key of the business as the identification, which is more reasonable to deal with the retransmission of the producer. The message deduplication on Alibaba Cloud is only the messageId of RocketMQ. In the scenario where the producer manually resends for some reason (for example, the upstream repeats the request for a transaction), the effect of deduplication/idempotency cannot be achieved (due to different message ids). ).

More complex business scenarios

As mentioned above, the implementation of Exactly Once semantics in this way actually has many limitations, which make this solution basically not worth the extensive application. And because it is based on transactions, it may cause performance problems such as long lock table time.

For example, we take a relatively common message of an order application as an example, there may be the following steps (hereinafter collectively referred to as step X):

  1. Check Inventory (RPC)

  2. Lock Inventory (RPC)

  3. Open transaction, insert order table (MySQL)

  4. call some other downstream service (RPC)

  5. Update order status

  6. commit transaction (MySQL)

In this case, if we adopt the implementation method of message table + local transaction, many sub-processes in the message consumption process do not support rollback, that is to say, even if we add a transaction, the operation behind it is not atomic. of. How to say it, that is to say, it is possible that the service restarts when the first small item has undergone the second step of locking the inventory. At this time, the inventory is actually locked in another service, which cannot be rolled back. Of course, the message will be delivered again, and it is necessary to ensure that the message can be consumed at least once. In other words, the RPC interface that locks the inventory itself must still support “idempotency”.

Furthermore, if the transaction package is added in this time-consuming long chain scenario, the concurrency of the system will be greatly reduced. So usually, the way we deal with the message deduplication in this scenario is to use the way of implementing the deduplication logic for the business as mentioned at the beginning, such as adding select for update in front, or using optimistic locking. In addition, we recommend Java selected public accounts, reply to java interviews, get the latest interview materials, and support online questions anytime, anywhere.

So is there a way we can extract a common solution that can take into account deduplication, generality, and high performance?

Dismantling the message execution process

One of the ideas is to decompose the above steps into several different sub-messages, for example:

  1. Inventory system consumption A: check inventory and lock inventory, send message B to order service

  2. Order system consumes message B: insert order table (MySQL), send message C to itself (downstream system) for consumption

  3. Downstream system consumes message C: processes part of the logic and sends message D to the order system

  4. Order system consumption message D: update order status

Note: The above steps need to ensure that the local transaction and message are a transaction (at least eventually consistent), which involves topics related to distributed transaction messages and is not discussed in this article.

It can be seen that such a processing method will make each step of the operation more atomic, and atomic means a small transaction, and a small transaction means that the scheme of using the message table + transaction is feasible.

However, this is too complicated! This splits an originally continuous code logic into multiple message interactions of multiple systems! It is better to implement locking at the business code level.

more general solution

The reason why the above message table + local transaction scheme has its limitations and concurrency shortcomings is that it depends on the transaction of the relational database, and the transaction must be wrapped in the entire message consumption link.

If we can achieve message deduplication without relying on transactions, the solution can be extended to more complex scenarios such as RPC, cross-database, etc.

For example, we still use the message table, but do not rely on transactions, but increase the consumption status for the message table. Can the problem be solved?

Non-transactional scheme based on message idempotent table

The above is the process of the message idempotent scheme after de-transactionalization. It can be seen that this scheme is non-transactional, but distinguishes the status of the message table itself: consumption, consumption completed. Only messages that have been consumed will be processed idempotently. For messages that are already being consumed, the subsequent repeated messages will trigger delayed consumption (in the RocketMQ scenario, it will be sent to RETRY TOPIC). The reason for triggering delayed consumption is to control the concurrency scenario. When the message is not completed, control the message not to be lost (if it is directly idempotent, then the message will be lost (the same message id), because if the previous message is not consumed, the second message you have already told the broker that it was successful. If the first message fails at this time, the broker will not re-deliver it)

The above process will not be described in detail. The address of the github source code is given later. Readers can refer to the implementation of the source code. Here we look back to see if the problem we wanted to solve at the beginning has been solved:

  1. The message has been successfully consumed, and the second message will be directly processed idempotently (consumption is successful).

  2. The message in the concurrent scenario can still satisfy the problem of no message duplication, that is, the problem of penetrating the idempotent baffle.

  3. Supports the idempotent problem of repeated business messages resent by upstream business producers.

Regarding the first question, it is obvious that it has been solved, so it will not be discussed here.

How is the second question solved? It is mainly controlled by the action of inserting the message table. Suppose we use MySQL as the storage medium of the message table (set the unique ID of the message as the primary key), then only one message will be inserted successfully, and the subsequent message insertion will be due to the primary key. Conflict and failure, go to the branch of delayed consumption, and then when the consumption is delayed later, it will become the problem of the first scenario above.

Regarding the third question, as long as we design the deduplicated message key to support the primary key of the business (such as order number, request serial number, etc.), not just messageId. So it’s not a problem either.

Is there a risk of message loss with this scenario?

If you are careful readers, you may find that there is actually a logical loophole here. The problem lies in the second problem (concurrency scenario) of the three problems discussed above. In the concurrent scenario, we rely on the message state to do concurrency control. Messages that make 2nd message duplicates are continually delayed in consumption (retry). But what if the first message is also not successfully consumed due to some abnormal reasons (such as machine restart, external exception causing consumption failure)? That is to say, at this time, the delayed consumption actually sees the state of consumption every time it comes down , and the final consumption will be regarded as a consumption failure and will be delivered to the dead letter topic (RocketMQ can repeat consumption 16 times by default).

It is right to have this concern! For this, our solution is that the inserted message table must have a maximum consumption expiration time, such as 10 minutes, which means that if a message is in consumption for more than 10 minutes, it needs to be deleted from the message table (requires the program itself accomplish). So the flow of the final message will be like this:

More flexible message table storage medium

Our solution actually has no transactions and only needs a central storage medium, so naturally we can choose a more flexible storage medium, such as Redis. Using Redis has two benefits:

  1. Lower loss in performance

  2. The timeout period we mentioned above can be implemented directly by using the ttl of Redis itself

Of course, the reliability and consistency of data stored by Redis are not as good as those of MySQL, and users need to make their own choices.

Source code: RocketMQDedupListener

The Java implementation of RocketMQ for the above solutions has been open-sourced and put on Github. For specific documentation, please refer to https://github.com/Jaskey/RocketMQDedupListener ,

The following is just a use example of using Redis to deduplicate in a Readme, to show how simple it is to use this tool to add message deduplication in the business:

DefaultMQPushConsumer consumer =  new  DefaultMQPushConsumer( "TEST-APP1" );
consumer.subscribe("TEST-TOPIC", "*");

String  appName = consumer.getConsumerGroup(); // In most cases, the consumer group name can be used directly 
StringRedisTemplate stringRedisTemplate =  null ; // The process of obtaining StringRedisTemplate is omitted here
DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
DedupConcurrentListener messageListener = new SampleListener(dedupConfig);

consumer.registerMessageListener(messageListener);
consumer.start();

Most of the above code is the necessary code of the original RocketMQ , the only thing that needs to be modified is to create an DedupConcurrentListenerexample, in this example, specify your consumption logic and deduplicated business key (the default is messageId).

For more usage details, please refer to the instructions on Github.

Is this implementation once and for all?

It seems that the solution is perfect. All messages can be quickly accessed and deduplicated, and they are completely [decoupled] from the specific business implementation . So does this perfectly complete all the tasks of deduplication?

Sadly, it’s not. The reason is very simple: because it is necessary to ensure that the message is successfully consumed at least once, then there is a chance that the message will fail to retry when half of the message is consumed. Or take the above order process X:

  1. Check Inventory (RPC)

  2. Lock Inventory (RPC)

  3. Open transaction, insert order table (MySQL)

  4. call some other downstream service (RPC)

  5. Update order status

  6. commit transaction (MySQL)

When the message is consumed to step 3, we assume that the MySQL exception fails and triggers the message to retry. Because we will delete the records of the idempotent table before retrying, so when the message is retried, it will re-enter the consumption code, and then steps 1 and 2 will be executed again. If step 2 itself is not idempotent, then the business message consumption is still not fully idempotent.

The value of this implementation?

So since this does not completely complete the message idempotency, what is the value? The value is great! Although this is not a silver bullet for message idempotence (in fact, there is almost no silver bullet in software engineering), it can be solved in a convenient way:

  1. Various duplication problems of message redelivery due to Broker, load balancing and other reasons

  2. Business-level message duplication caused by various upstream producers

  3. The control window problem of concurrent consumption of repeated messages, even if repeated, it is impossible for the repetition to enter the consumption logic at the same time

Some other message deduplication suggestions

That is to say, using this method can ensure that in normal consumption logic scenarios (no exception, no exception exit), all idempotent work of messages can be solved, whether it is business duplication or duplication brought by the rocketmq feature.

In fact, this can already solve 99% of the message duplication problem, after all, abnormal scenarios must be rare. Then if you want to handle the idempotent problem well in abnormal scenarios, you can do the following to reduce the problem rate:

  1. If message consumption fails, rollback is done. If the message consumption failure itself has a rollback mechanism, then the message retry will naturally have no side effects.

  2. Consumers do a good job of graceful exit processing. This is to avoid message retries caused by program exit after half of message consumption.

  3. Some operations that cannot be idempotent should at least terminate consumption and warn. For example, in the operation of locking inventory, if the unified business process locks the inventory once successfully, then the inventory lock is triggered. If the idempotent processing cannot be done, at least the message consumption should trigger an exception (for example, the primary key conflict causes the consumption to be abnormal, etc.)

  4. On the premise of #3, do a good job of monitoring the consumption of messages, and when you find that the message retry fails continuously, manually roll back #1, so that the next retry consumption is successful.

You may also like...

Leave a Reply

Your email address will not be published.