Greg Beech's Website

Retrieving a row exactly once with multiple polling processes in SQL Server

If you're writing a message processing application based around SQL Server, then it might be tempting to use the service broker functionality. However if all you need to do is allow your host processes to periodically retrieve a batch of messages from a table, then using service broker could be like using a sledgehammer to crack a walnut. All you really need is a stored procedure which ensures that each message (row) is picked up exactly once by exactly one processing host, and doesn't cause the other hosts to block.

Lets assume we have a really simple message table like the following (though of course in real life we'd have a primary key and default values defined for things):

CREATE TABLE Messaging.Message
(
    MessageId UNIQUEIDENTIFIER NOT NULL
    ,MessageBody VARBINARY(MAX) NOT NULL
    ,EnqueueTime DATETIME NOT NULL
    ,DequeueTime DATETIME NULL
);

With that sorted we can begin to write the stored procedure, and the first statement in it is going to declare a cursor. This might seem odd given the general recommendations against using cursors when you want high performance, but there is method in this madness. And at least the cursor will be declared as FAST_FORWARD which means it's forward-only and read-only, thus allowing SQL Server to optimise it fairly well.

Something else you should normally steer clear of is locking hints, but here they are critical to the concurrency of the procedure when called from multiple processes. UPDLOCK is used to ensure that the rows selected by the cursor can be updated without deadlocking, and READPAST means that we'll skip over any rows that are currently locked and won't try to dequeue the same rows as another process. ROWLOCK is a strong hint to the optimiser that we'll only be updating individual rows rather than entire pages or tables and should limit the extent of the update locks taken to just those rows.

CREATE PROCEDURE Messaging.Message_DequeueBatch
(
    @BatchSize INT
)
AS

DECLARE MessageCursor CURSOR FAST_FORWARD FOR
SELECT
    MessageId
    ,MessageBody
FROM
    Messaging.Message WITH (UPDLOCK ROWLOCK READPAST)
WHERE
    DequeueTime IS NULL
ORDER BY
    EnqueueTime ASC;

As we're going to be iterating through a cursor we need somewhere to store this data until we're ready to return it to the client. One option is to use a batch identifier and update the rows with that, then after the cursor has been deallocated select the rows with the batch identifier; however here I've gone with a table variable. There are also a few variables declared to hold the cursor values and how many items we've got in the batch.

DECLARE @MessageTable TABLE
(
    MessageId UNIQUEIDENTIFIER NOT NULL
    ,MessageBody VARBINARY(MAX) NOT NULL
);

DECLARE 
    @DequeueTime DATETIME
    ,@InBatch INT
    ,@MessageId UNIQUEIDENTIFIER
    ,@MessageBody VARBINARY(MAX);

SELECT
    @DequeueTime = GETUTCDATE()
    ,@InBatch = 0;

Now comes the interesting part of iterating through the cursor. First we try to fetch the values and if we get a status of -2 (missing row) then we continue to the next row as it's assumed that row was deleted by some housekeeping process; a non-zero status of -1 (the only other defined failure value) indicates the end of the cursor so we break out of the loop.

Next to ensure each message is only dequeued once, we use sp_getapplock with the message identifier to take an exclusive lock, continuing to the next row if the lock is not granted immediately. This absolutely guarantees that the row cannot be dequeued by multiple processes concurrently, as even if SQL Server decides to ignore the locking hints (which it is entitled to do - they are only hints after all) it cannot ignore sp_getapplock. You can see now why this procedure has to be done with a cursor.

Finally we store the details of the message to retrieve in our table variable, update the row to indicate it has been dequeued, and increment the loop counter.

OPEN MessageCursor;
WHILE @InBatch < @BatchSize
BEGIN

    FETCH NEXT FROM MessageCursor INTO @MessageId, @MessageBody;
    IF @@FETCH_STATUS = -2
        CONTINUE;
    ELSE IF @@FETCH_STATUS <> 0
        BREAK;

    DECLARE @LockStatus INT;
    EXEC @LockStatus = sp_getapplock @MessageId, 'Exclusive', 'Transaction', 0;
    IF @LockStatus <> 0
        CONTINUE;
    
    INSERT INTO @MessageTable VALUES (@MessageId, @MessageBody);

    UPDATE Messaging.Message
    SET DequeueTime = @DequeueTime
    WHERE MessageId = @MessageId;

    SET @InBatch = @InBatch + 1;
END
CLOSE MessageCursor;
DEALLOCATE MessageCursor;

With all the hard work done, we can now simply return the result set by selecting the values from the table variable.

SELECT
    MessageId
    ,MessageBody
FROM
    @MessageTable;

Although there's some interesting locking in this procedure, it's actually pretty efficient and scales well across many polling servers. We use a very similar procedure for our encoding host servers at blinkBox and it has been called frequently from multiple processes on multiple machines in the production environment for the last few months without a single deadlock, timeout, missed message or duplicated message, so I'm confident about its correctness and effectiveness.


Posted Jun 11 2008, 11:58 PM by Greg Beech
Filed under:

Add a Comment

(required)  
(optional)
(required)  
Remember Me?

Enter the numbers above:
Copyright (C) Greg Beech. All rights reserved.
Powered by Community Server (Non-Commercial Edition), by Telligent Systems