I det här inlägget beskrivs hur du kan göra säkra uppdateringar av dokument i Cosmos DB med optimistisk samtidighetskontroll genom att använda etaggen i ett dokument. Cosmos DB implementerar optimistisk samtidighet och du kan inte låsa dokument medan du läser ifrån ett dokument eller skriver till ett dokument.
Optimistisk samtidighet innebär att om två samtidiga operationer försöker uppdatera ett dokument inom en logisk partition så kommer en operation att vinna och den andra operationen kommer att misslyckas. När detta händer så innehåller ditt dokument sannolikt felaktiga data och det kan vara viktigt att ha korrekt data i dokumentet.
Optimistisk samtidighetskontroll (OCC) ger dig möjlighet att se till att alla uppdateringar genomförs och detta innebär att din data förblir korrekt. OCC kan implementeras genom att använda etaggen i ett dokument. Värdet för etaggen genereras automatiskt och uppdateras på servern varje gång ett dokument uppdateras. Genom att kontrollera om etaggen har förändrats mellan en läsning och en uppdatering kan du se till att du uppdaterar den senaste versionen av dokumentet.
Cosmos DB klient
Jag har redan skrivit ett inlägg om en generisk cosmos db klient som innehåller de metoder vi behöver. Vi måste hämta ett dokument med en etagg och uppdatera ett dokument genom att kontrollera etaggen. Jag inkluderar också kod för ModelItem och ModelPage.
public async Task<ModelItem<T>> GetByIdWithEtag<T>(string id, string partion_key)
{
// Create variables to return
ModelItem<T> model = new ModelItem<T>();
try
{
// Get the response
ResourceResponse<Document> response = await this.client.ReadDocumentAsync(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id),
new RequestOptions { PartitionKey = new PartitionKey(partion_key) });
// Get the document
Document document = response.Resource;
// Get the etag
model.etag = document.ETag;
// Get the post
model.item = (T)(dynamic)document;
}
catch (DocumentClientException de)
{
// Log the exception
this.logger.LogDebug(de, $"GetByIdWithEtag", null);
model.error = true;
}
// Return the model
return model;
} // End of the GetByIdWithEtag method
public async Task<bool> Update<T>(string id, T item, string etag)
{
try
{
// Create an access condition
AccessCondition ac = new AccessCondition { Condition = etag, Type = AccessConditionType.IfMatch };
// Update the document
await this.client.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id), item, new RequestOptions { AccessCondition = ac });
}
catch (DocumentClientException de)
{
// Check for exceptions
if (de.StatusCode == HttpStatusCode.PreconditionFailed)
{
return false;
}
else
{
// Log the exception
this.logger.LogError(de, $"Update, id: {id}, etag: {etag}", null);
}
}
// Return a success response
return true;
} // End of the Update method
public class ModelItem<T>
{
#region Variables
public T item { get; set; }
public string etag { get; set; }
public bool error { get; set; }
#endregion
#region Constructors
public ModelItem()
{
// Set values for instance variables
this.item = default(T);
this.etag = "";
this.error = false;
} // End of the constructor
public ModelItem(T item, string etag, bool error)
{
// Set values for instance variables
this.item = item;
this.etag = etag;
this.error = false;
} // End of the constructor
#endregion
} // End of the class
public class ModelPage<T>
{
#region Variables
public IList<T> items { get; set; }
public string ct { get; set; }
public bool error { get; set; }
#endregion
#region Constructors
public ModelPage()
{
// Set values for instance variables
this.items = new List<T>();
this.ct = "";
this.error = false;
} // End of the constructor
#endregion
} // End of the class
Uppdatera med etagg
Metoden nedan används för att hämta en lista med orter och för att spara en fortsättningstoken i ett dokument så att vi kan fortsätta att hämta fler orter vid en senare tidpunkt. Vi måste implementera optimistisk samtidighetskontroll när vår token sparas. Vi erhåller först en fortsättningstoken och sedan loopar vi tills vi har lyckats att uppdatera dokumentet.
public async Task<ModelPage<LocationDocument>> GetLocations(Int32 page_size)
{
// Create the locations page to return
ModelPage<LocationDocument> page = new ModelPage<LocationDocument>();
// Get the job locations page
ModelItem<JobLocationsPage> job_tuple = await this.cosmos_database_repository.GetByIdWithEtag<JobLocationsPage>("xx481cd9-7961-4c6e-960e-7cb6e5cde5e8", "xx481cd9-7961-4c6e-960e-7cb6e5cde5e8");
// Loop until a successful update
while (true)
{
// Get locations
page = await this.location_repository.GetChunk("page_name", "ASC", page_size, job_tuple.item.continuation_token);
// Update the job locations page
job_tuple.item.continuation_token = page.ct;
bool success = await this.cosmos_database_repository.Update<JobLocationsPage>(job_tuple.item.id, job_tuple.item, job_tuple.etag);
// Check if the update failed
if (success == false)
{
// Get the tuple again
job_tuple = await this.cosmos_database_repository.GetByIdWithEtag<JobLocationsPage>("xx481cd9-7961-4c6e-960e-7cb6e5cde5e8", "xx481cd9-7961-4c6e-960e-7cb6e5cde5e8");
// Continue the loop
continue;
}
// Break out from the loop
break;
} // End of the while(true) loop
// Return the page with locations
return page;
} // End of the GetLocations method