Parallel extensions : Tâches parallèles et collections concurrentes

Dans cet article nous allons nous pencher sur deux aspects des "parallel extensions" que nous n'avons pas abordés dans le précédent article :
- Les collections gérant les accès concurrents.
- Les tâches parallèles.

N'hésitez pas à commenter cet article ! Commentez Donner une note à l'article (5)

Article lu   fois.

L'auteur

Profil ProSite personnel

Liens sociaux

Viadeo Twitter Facebook Share on Google+   

I. Les collections : le problème de l'accès concurrent aux données

I-A. Rappels

Attention tous les exemples de code de cet article sont écrit pour le framework 4.0. Il existe de petite différence entre la version CTP des parallel extension et leur implémentation dans le Framework 4.0 notamment au niveau des noms des collections et de certaines méthodes

Dans le précédent article nous avons abordé les "parallel extensions". Au cours de cette introduction nous avons évoqué les problèmes que l'exécution parallèle pouvait soulever dans le cadre de l'accès en écriture à des objets. Revenons sur ces problèmes d'accès concurrent.
Prenons le cas d'une liste d'objets simples ( System.Collection.Generics.List ). Imaginons que vous souhaitiez remplir cette liste avec les résultats d'un calcul effectué de façon parallèle. Dès ce moment nous avons une situation épineuse : d'un coté une liste qui n'est pas thread-safe, et donc qui ne sait pas gérer les accès concurrent, et de l'autre une fonction exécutée de manière parallèle qui remplit cette collection.
La première question qui vient à l'esprit est : "quelles conséquences a l'exécution parallèle sur l'ajout d'élément dans une collection non thread-safe ?"

La réponse est assez simple.
En pré-requis il faut savoir que l'ajout d'élément n'est pas une opération atomique c'est à dire que l'ajout d'un élément n'est pas fait en un cycle de processeur. Dès lors que plusieurs cycles processeur doivent se succéder pour effectuer l'action voulue, le risque est que l'action soit interrompue et remplacée par une autre. Tout ceci est un peu abstrait, nous allons donc illustrer le concept avec un exemple.
Voici l'implémentation de la fonction Add de la collection List<T> du Framework DotNet (merci à Reflector :) )

 
Sélectionnez

public void Add(T item)
{
    if (this._size == this._items.Length)
    {
        this.EnsureCapacity(this._size + 1);
    }
    this._items[this._size++] = item;
    this._version++;
}

Comme le montre le code, l'ajout d'un élément se fait en 3 temps :

  1. Vérification de la taille de la collection et agrandissement si nécessaire (fonction EnsureCapacity)
  2. Ajout de l'item à l'index donné
  3. Incrémentation de l'index

La dernière partie (this._version++) ne sert "qu'à" la création et le contrôle d'intégrité de la collection pour les énumérateurs.
Maintenant que nous savons comment fonctionne l'ajout d'un élément dans une liste, essayons d'imaginer ce qui se passerait si nous insérerions deux éléments en "même temps" ?
Pour le savoir je vous invite à visionner le pdf ci-dessous dont le code d'ajout a été simplifié. Ajout d'un élément dans une instance de List(Of T) (simplifié)Ajout d'un élément dans une instance de List(Of T) (simplifié)

A partir de là il est très facile de comprendre les risques liés à l'utilisation de collection qui ne gère pas les accès concurrents.
Pour pallier ce manque les équipes de développement des "parallel extensions" ont introduit 3 nouvelles collections :

Collections de l'espace de nom System.Threading.Collections
  • BlockingCollection<T>
  • ConcurrentQueue<T>
  • ConcurrentStack<T>

Voyons rapidement chaque collection.

I-B. BlockingCollection<T>

Cette collection est la plus simple des 3. Toute opération effectuée sur la collection est forcément bloquante. Ainsi il ne peut pas y avoir de lecture en même temps qu'une écriture ou même qu'une autre lecture.

I-C. ConcurrentQueue<T>

Cette collection est du type Fifo (First In, First Out) et son fonctionnement est le même la classe Queue<T>. L'ajout et le retrait d'élément sont thread-safe.

I-D. ConcurrentStack<T>

Cette collection est du type Lifo (Last In, First Out) et son fonctionnement est le même que celui de la classe Stack<T>. L'ajout et le retrait d'élément est thread-safe.

Ces collections sont standard dans leur fonctionnement, elles n'ont comme intérêt que d'être Thread-Safe.
Par contre il est plus intéressant de regarder l'interface IProducerConsumerCollection<T>. Toute classe qui implémente cette interface est Thread-Safe. Afin d'illustrer le fonctionnement d'une collection Thread-Safe nous allons créer la notre

I-E. ConcurrentList<T> : Notre propre liste Thread-Safe !

Nous allons nous lancer dans le développement de notre liste Thread-Safe. Cette classe aura comme contrainte de fonctionner comme la classe List<T> mais elle sera Thread-Safe.
Afin de respecter les "standards" nous allons implémenter l'interface IProducerConsumerCollection<T>.

 
Sélectionnez

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;


namespace Dvp
{
    public class ConcurrentList<T> : IProducerConsumerCollection<T>
    {

        private object sync;
        private List<T> _internalList;

        public ConcurrentList()
        {
            this._internalList = new List<T>();
            this.sync = new object();
        }
   
        public void CopyTo(T[] array, int index)
        {
            if (array.Length - index < this.Count)
                throw new ArgumentException("Le tableau est trop petit");

            
            lock (sync)
            {
                for (int i = index; i < this._internalList.Count; index++)
                {
                    array[i - index] = this._internalList[i];
                }
            }
        }
        public T[] ToArray()
        {
            T[] items = null;
            lock (sync)
            {
                items = this._internalList.ToArray();
            }

            return items;
        }

        public bool TryAdd(T item)
        {
            try
            {
                lock (sync)
                {
                    this._internalList.Add(item);
                }

                return true;
            }
            catch (Exception) { return false; }
        }

        public bool TryTake(out T item)
        {
            try
            {
                lock (sync)
                {
                    T lastItem = this._internalList.Last();
                    item = lastItem;
                    return true;
                }
            }
            catch (Exception) 
            {
                item = default(T);
                return false; 
            }
        }

        public IEnumerator<T> GetEnumerator()
        {
            List<T> list = null;
            lock (sync)
            {
                list = new List<T>(this._internalList);
            }
            return (IEnumerator<T>)list;
        }

        System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
        {
            List<T> list = null;
            lock (sync)
            {
                list = new List<T>(this._internalList);
            }
            return (System.Collections.IEnumerator)list;
        }

        public void CopyTo(Array array, int index)
        {
            throw new NotImplementedException();
        }

        public int Count
        {
            get
            {
                lock (sync)
                {
                    return this._internalList.Count;
                }
            }
        }

        public bool IsSynchronized
        {
            get { throw new NotImplementedException(); }
        }

        public object SyncRoot
        {
            get { return this.sync; }
        }
    }
}

Cette collection est maintenant Thread-Safe. Ceci est rendu possible grâce à la mise en place de la synchronisation à l'aide du mot clef lock. Le mot clef lock permet de mettre en place un contexte d'exclusion total dans le bloc d'instructions défini. Tant que les instructions à l'intérieur n'ont pas fini de s'exécuter, il est impossible d'y accéder.
Toutefois cette synchronisation impose un sacrifice au niveau des performances. En effet la pose et la suppression d'un verrou sont des opérations très couteuses en terme de temps processeur et implique forcément un passage en mode noyau au niveau du système d'exploitation.
Voici un petit bout code permettant de se faire une idée des performances entre ConcurrentList<T> et List<T> :

 
Sélectionnez

private static readonly int MAXLOOP = 100000;

static void Main(string[] args)
{

	ConcurrentList<int> list = new ConcurrentList<int>();

	Stopwatch stopwatch = Stopwatch.StartNew();
    Parallel.For(0, MAXLOOP, i =>
    {
    	list.Add(i);
	});

	Console.WriteLine("Temps de remplissage avec ConcurrentList : " + stopwatch.ElapsedMilliseconds);

	List<int> listNormale = new List<int>();
    stopwatch = Stopwatch.StartNew();
    Parallel.For(0, MAXLOOP, i =>
    {
    	listNormale.Add(i);
    });
    Console.WriteLine("Temps de remplissage avec List : " + stopwatch.ElapsedMilliseconds);

	Console.WriteLine("Appuyez sur un touche");
    Console.ReadLine();
}

Voici le résultat de la première exécution sur ma machine :

Une magnifique exception ...
Une magnifique exception ...

Le code responsable de l'exception n'est pas le code écrit plus haut dans cet article. C'est tout simplement la manifestation de ce que nous avons évoqué au début de l'article : La collection List<T> n'est pas Thread-Safe et lorsque deux threads ont essayé d'ajouter un élément en même temps cela à conduit à un débordement de la capacité du tableau interne. Lorsque l'on a enfin réussi à exécuter ce code correctement on observe une perte de performance assez nette. En effet nous passons de 2 ms à 146 ms ! Notre collection est extrêment mauvaise en termes de performance, mais à coté de cela elle permet un accès sécurisé aux données à partir de plusieurs threads. C'est d'ailleurs le plus grand dilemme du développeur parallèle : Est-ce vraiment une bonne idée de paralléliser une boucle faisant des écritures ?

II. Tasks

II-A. Les tasks : ça ressemble à des threads, ça sent le thread, mais ce n'est pas des threads !

Disons le franchement dès le début : Oui les tasks sont semblables à des threads et non ce ne sont pas des threads déguisés.
Les tasks vous permettent de faire tout ce que vous faites déjà avec des threads mais apportent également quelques nouveautés très intéressantes :
La distribution des threads (sic !) sur tous les processeurs, la notion de hiérarchie et les fonctions de contrôle.
Les tâches peuvent donc être vues comme un ThreadPool amélioré permettant l'exploitation du parallélisme.

II-B. Premières tâches

Les tâches s'utilisent grâce à la classe Task. Cette classe expose une méthode statique StartNew. Cette méthode permet de créer et de démarrer immédiatement une nouvelle tâche.

 
Sélectionnez

Task.StartNew(obj => 
{
	//TODO ajouter le code de la tâche.
});

La méthode StartNew renvoie une instance de la classe Task permettant de contrôler la tâche nouvellement créée.
La classe Task possède des méthodes très intéressantes : Cancel, CancelAndWait, ContinueWith ainsi que des propriétés tout aussi intéressantes : IsCanceled, IsCancellationdRequested
Vous l'aurez compris, ces méthodes permettent de faire du contrôle de tâche et d'arrêter proprement une tâche avant la fin de son exécution. En cela les tâches sont plus proches du BackgroundWorker que des threads ou vous deviez implémenter vous même le mécanisme d'arrêt. Un petit exemple de code :

 
Sélectionnez

int obj = 1;

Task task = Task.StartNew(i =>
{
	Console.WriteLine(i);
                
	obj++;
	Task.StartNew(j =>
	{
		Thread.Sleep(1000);
		Console.WriteLine(j);

		if (Task.Current.IsCancellationRequested)
			return;

		Thread.Sleep(1000);
		Console.WriteLine(j);

	}, obj, TaskCreationOptions.RespectCreatorCancellation);

	obj++;
	Task.StartNew(k =>
	{
		Thread.Sleep(1000);
		Console.WriteLine(k);
		if (Task.Current.IsCancellationRequested)
			return;
		Thread.Sleep(1000);
		Console.WriteLine(k);
	}, obj);

	obj++;
	Task.StartNew(l =>
	{
		Thread.Sleep(1000);
		Console.WriteLine(l);
		if (Task.Current.IsCancellationRequested)
			return;
		Thread.Sleep(1000);
		Console.WriteLine(l);
	}, obj);
	
}, obj);

Console.WriteLine(task.Status);
            
task.CancelAndWait();

Pour que l'annulation fonctionne de manière hiérarchique, c'est à dire que si vous annulez une tâche qui en a lancé d'autres, ces dernière soient également annulées, il faut que les tâches soient créées avec le paramètre TaskCreationOptions.RespectCreatorCancellation.

III. Conclusion

Cet article conclut la série sur les "parallel extensions".
J'espère vous avoir convaincu que l'utilisation des "parallel extensions" vous permettra de gagner en vitesse d'exécution et de mieux utiliser les ressources des PC sur lesquels votre programme s'exécute tout cela de manière simple à mettre en œuvre. De plus j'espère avoir réussi à démystifier le développement parallèle et ainsi sortir cette technique de programmation des seules mains des experts.

IV. Téléchargements

V. Remerciements

Je tiens à remercier ram-000 pour sa relecture orthographique.
Je tiens également à remercier toute l'équipe de la rédaction dotnet pour son soutien.

Vous avez aimé ce tutoriel ? Alors partagez-le en cliquant sur les boutons suivants : Viadeo Twitter Facebook Share on Google+   

  

Copyright © 2008 Vincent Lainé. Aucune reproduction, même partielle, ne peut être faite de ce site et de l'ensemble de son contenu : textes, documents, images, etc. sans l'autorisation expresse de l'auteur. Sinon vous encourez selon la loi jusqu'à trois ans de prison et jusqu'à 300 000 € de dommages et intérêts. Droits de diffusion permanents accordés à Developpez LLC.