I. Les collections : le problème de l'accès concurrent aux données▲
I-A. Rappels▲
Attention tous les exemples de code de cet article sont écrits pour le framework 4.0. Il existe de petites différences entre la version CTP des parallel extension et leur implémentation dans le Framework 4.0 notamment au niveau des noms des collections et de certaines méthodes.
Dans le précédent article, nous avons abordé les « parallel extensions ». Au cours de cette introduction, nous avons évoqué les problèmes que l'exécution parallèle pouvait soulever dans le cadre de l'accès en écriture à des objets. Revenons sur ces problèmes d'accès concurrent.
Prenons le cas d'une liste d'objets simples (System.Collection.Generics.List). Imaginons que vous souhaitiez remplir cette liste avec les résultats d'un calcul effectué de façon parallèle. Dès ce moment nous avons une situation épineuse : d'un coté une liste qui n'est pas thread-safe, et donc qui ne sait pas gérer les accès concurrents, et de l'autre une fonction exécutée de manière parallèle qui remplit cette collection.
La première question qui vient à l'esprit est : « quelles conséquences a l'exécution parallèle sur l'ajout d'élément dans une collection non thread-safe ? »
La réponse est assez simple.
En prérequis il faut savoir que l'ajout d'élément n'est pas une opération atomique c'est-à-dire que l'ajout d'un élément n'est pas fait en un cycle de processeur. Dès lors que plusieurs cycles processeur doivent se succéder pour effectuer l'action voulue, le risque est que l'action soit interrompue et remplacée par une autre. Tout ceci est un peu abstrait, nous allons donc illustrer le concept avec un exemple.
Voici l'implémentation de la fonction Add de la collection List<T> du Framework DotNet (merci à Reflector :) )
public
void
Add
(
T item)
{
if
(
this
.
_size ==
this
.
_items.
Length)
{
this
.
EnsureCapacity
(
this
.
_size +
1
);
}
this
.
_items[
this
.
_size++]
=
item;
this
.
_version++;
}
Comme le montre le code, l'ajout d'un élément se fait en trois temps :
- Vérification de la taille de la collection et agrandissement si nécessaire (fonction EnsureCapacity) ;
- Ajout de l'item à l'index donné ;
- Incrémentation de l'index.
La dernière partie (this._version++) ne sert « qu'à » la création et le contrôle d'intégrité de la collection pour les énumérateurs.
Maintenant que nous savons comment fonctionne l'ajout d'un élément dans une liste, essayons d'imaginer ce qui se passerait si nous insérions deux éléments en « même temps » ?
Pour le savoir, je vous invite à visionner le pdf ci-dessous dont le code d'ajout a été simplifié. Ajout d'un élément dans une instance de List(Of T) (simplifié)Ajout d'un élément dans une instance de List(Of T) (simplifié)
À partir de là il est très facile de comprendre les risques liés à l'utilisation de collection qui ne gère pas les accès concurrents.
Pour pallier ce manque, les équipes de développement des « parallel extensions » ont introduit trois nouvelles collections :
Collections de l'espace de nom System.Threading.Collections
- BlockingCollection
- ConcurrentQueue
- ConcurrentStack
Voyons rapidement chaque collection.
I-B. BlockingCollection<T>▲
Cette collection est la plus simple des trois. Toute opération effectuée sur la collection est forcément bloquante. Ainsi il ne peut pas y avoir de lecture en même temps qu'une écriture ou même qu'une autre lecture.
I-C. ConcurrentQueue<T>▲
Cette collection est du type Fifo (First In, First Out) et son fonctionnement est le même la classe Queue<T>. L'ajout et le retrait d'élément sont thread-safe.
I-D. ConcurrentStack<T>▲
Cette collection est du type Lifo (Last In, First Out) et son fonctionnement est le même que celui de la classe Stack<T>. L'ajout et le retrait d'élément est thread-safe.
Ces collections sont standard dans leur fonctionnement, elles n'ont comme intérêt que d'être Thread-Safe.
Par contre il est plus intéressant de regarder l'interface IProducerConsumerCollection<T>. Toute classe qui implémente cette interface est Thread-Safe. Afin d'illustrer le fonctionnement d'une collection Thread-Safe nous allons créer la notre
I-E. ConcurrentList<T> : Notre propre liste Thread-Safe !▲
Nous allons nous lancer dans le développement de notre liste Thread-Safe. Cette classe aura comme contrainte de fonctionner comme la classe List<T>, mais elle sera Thread-Safe.
Afin de respecter les « standards », nous allons implémenter l'interface IProducerConsumerCollection<T>.
using
System;
using
System.
Collections.
Generic;
using
System.
Collections.
Concurrent;
using
System.
Linq;
namespace
Dvp
{
public
class
ConcurrentList<
T>
:
IProducerConsumerCollection<
T>
{
private
object
sync;
private
List<
T>
_internalList;
public
ConcurrentList
(
)
{
this
.
_internalList =
new
List<
T>(
);
this
.
sync =
new
object
(
);
}
public
void
CopyTo
(
T[]
array,
int
index)
{
if
(
array.
Length -
index <
this
.
Count)
throw
new
ArgumentException
(
"Le tableau est trop petit"
);
lock
(
sync)
{
for
(
int
i =
index;
i <
this
.
_internalList.
Count;
index++
)
{
array[
i -
index]
=
this
.
_internalList[
i];
}
}
}
public
T[]
ToArray
(
)
{
T[]
items =
null
;
lock
(
sync)
{
items =
this
.
_internalList.
ToArray
(
);
}
return
items;
}
public
bool
TryAdd
(
T item)
{
try
{
lock
(
sync)
{
this
.
_internalList.
Add
(
item);
}
return
true
;
}
catch
(
Exception) {
return
false
;
}
}
public
bool
TryTake
(
out
T item)
{
try
{
lock
(
sync)
{
T lastItem =
this
.
_internalList.
Last
(
);
item =
lastItem;
return
true
;
}
}
catch
(
Exception)
{
item =
default
(
T);
return
false
;
}
}
public
IEnumerator<
T>
GetEnumerator
(
)
{
List<
T>
list =
null
;
lock
(
sync)
{
list =
new
List<
T>(
this
.
_internalList);
}
return
(
IEnumerator<
T>
)list;
}
System.
Collections.
IEnumerator System.
Collections.
IEnumerable.
GetEnumerator
(
)
{
List<
T>
list =
null
;
lock
(
sync)
{
list =
new
List<
T>(
this
.
_internalList);
}
return
(
System.
Collections.
IEnumerator)list;
}
public
void
CopyTo
(
Array array,
int
index)
{
throw
new
NotImplementedException
(
);
}
public
int
Count
{
get
{
lock
(
sync)
{
return
this
.
_internalList.
Count;
}
}
}
public
bool
IsSynchronized
{
get
{
throw
new
NotImplementedException
(
);
}
}
public
object
SyncRoot
{
get
{
return
this
.
sync;
}
}
}
}
Cette collection est maintenant Thread-Safe. Ceci est rendu possible grâce à la mise en place de la synchronisation à l'aide du mot clef lock. Le mot clef lock permet de mettre en place un contexte d'exclusion total dans le bloc d'instructions défini. Tant que les instructions à l'intérieur n'ont pas fini de s'exécuter, il est impossible d'y accéder.
Toutefois cette synchronisation impose un sacrifice au niveau des performances. En effet la pose et la suppression d'un verrou sont des opérations très couteuses en terme de temps processeur et implique forcément un passage en mode noyau au niveau du système d'exploitation.
Voici un petit bout code permettant de se faire une idée des performances entre ConcurrentList<T> et List<T> :
private
static
readonly
int
MAXLOOP =
100000
;
static
void
Main
(
string
[]
args)
{
ConcurrentList<
int
>
list =
new
ConcurrentList<
int
>(
);
Stopwatch stopwatch =
Stopwatch.
StartNew
(
);
Parallel.
For
(
0
,
MAXLOOP,
i =>
{
list.
Add
(
i);
}
);
Console.
WriteLine
(
"Temps de remplissage avec ConcurrentList : "
+
stopwatch.
ElapsedMilliseconds);
List<
int
>
listNormale =
new
List<
int
>(
);
stopwatch =
Stopwatch.
StartNew
(
);
Parallel.
For
(
0
,
MAXLOOP,
i =>
{
listNormale.
Add
(
i);
}
);
Console.
WriteLine
(
"Temps de remplissage avec List : "
+
stopwatch.
ElapsedMilliseconds);
Console.
WriteLine
(
"Appuyez sur une touche"
);
Console.
ReadLine
(
);
}
Voici le résultat de la première exécution sur ma machine :
Le code responsable de l'exception n'est pas le code écrit plus haut dans cet article. C'est tout simplement la manifestation de ce que nous avons évoqué au début de l'article : la collection List<T> n'est pas Thread-Safe et lorsque deux threads ont essayé d'ajouter un élément en même temps cela a conduit à un débordement de la capacité du tableau interne. Lorsque l'on a enfin réussi à exécuter ce code correctement on observe une perte de performance assez nette. En effet nous passons de 2 ms à 146 ms ! Notre collection est extrêmement mauvaise en termes de performance, mais à côté de cela elle permet un accès sécurisé aux données à partir de plusieurs threads. C'est d'ailleurs le plus grand dilemme du développeur parallèle : Est-ce vraiment une bonne idée de paralléliser une boucle faisant des écritures ?
II. Tasks▲
II-A. Les tasks : ça ressemble à des threads, ça sent le thread, mais ce n'est pas des threads !▲
Disons-le franchement dès le début : Oui les tasks sont semblables à des threads et non ce ne sont pas des threads déguisés.
Les tasks vous permettent de faire tout ce que vous faites déjà avec des threads, mais apportent également quelques nouveautés très intéressantes :
La distribution des threads (sic !) sur tous les processeurs, la notion de hiérarchie et les fonctions de contrôle.
Les tâches peuvent donc être vues comme un ThreadPool amélioré permettant l'exploitation du parallélisme.
II-B. Premières tâches▲
Les tâches s'utilisent grâce à la classe Task. Cette classe expose une méthode statique StartNew. Cette méthode permet de créer et de démarrer immédiatement une nouvelle tâche.
Task.
StartNew
(
obj =>
{
//TODO ajouter le code de la tâche.
}
);
La méthode StartNew renvoie une instance de la classe Task permettant de contrôler la tâche nouvellement créée.
La classe Task possède des méthodes très intéressantes : Cancel, CancelAndWait, ContinueWith ainsi que des propriétés tout aussi intéressantes : IsCanceled, IsCancellationdRequested
Vous l'aurez compris, ces méthodes permettent de faire du contrôle de tâche et d'arrêter proprement une tâche avant la fin de son exécution. En cela les tâches sont plus proches du BackgroundWorker que des threads ou vous deviez implémenter vous-même le mécanisme d'arrêt. Un petit exemple de code :
int
obj =
1
;
Task task =
Task.
StartNew
(
i =>
{
Console.
WriteLine
(
i);
obj++;
Task.
StartNew
(
j =>
{
Thread.
Sleep
(
1000
);
Console.
WriteLine
(
j);
if
(
Task.
Current.
IsCancellationRequested)
return
;
Thread.
Sleep
(
1000
);
Console.
WriteLine
(
j);
},
obj,
TaskCreationOptions.
RespectCreatorCancellation);
obj++;
Task.
StartNew
(
k =>
{
Thread.
Sleep
(
1000
);
Console.
WriteLine
(
k);
if
(
Task.
Current.
IsCancellationRequested)
return
;
Thread.
Sleep
(
1000
);
Console.
WriteLine
(
k);
},
obj);
obj++;
Task.
StartNew
(
l =>
{
Thread.
Sleep
(
1000
);
Console.
WriteLine
(
l);
if
(
Task.
Current.
IsCancellationRequested)
return
;
Thread.
Sleep
(
1000
);
Console.
WriteLine
(
l);
},
obj);
},
obj);
Console.
WriteLine
(
task.
Status);
task.
CancelAndWait
(
);
Pour que l'annulation fonctionne de manière hiérarchique, c'est-à-dire que si vous annulez une tâche qui en a lancé d'autres, ces dernières sont également annulées, il faut que les tâches soient créées avec le paramètre TaskCreationOptions.RespectCreatorCancellation.
III. Conclusion▲
Cet article conclut la série sur les « parallel extensions ».
J'espère vous avoir convaincu que l'utilisation des « parallel extensions » vous permettra de gagner en vitesse d'exécution et de mieux utiliser les ressources des PC sur lesquels votre programme s'exécute tout cela de manière simple à mettre en œuvre. De plus j'espère avoir réussi à démystifier le développement parallèle et ainsi sortir cette technique de programmation des seules mains des experts.
IV. Téléchargements▲
Le code source des exemples (Attention Visual Studio 2010 requis !)Le code source des exemples (Attention Visual Studio 2010 requis !)
La version CTP de Visual Studio 2010La version CTP de Visual Studio 2010
V. Remerciements▲
Je tiens à remercier ram-000 pour sa relecture orthographique.
Je tiens également à remercier toute l'équipe de la rédaction dotnet pour son soutien.