Relationnel vs NoSQL : l’écosystème de la base de données à l’heure du « BigData » (2/2)

A la recherche de la performance

Si les systèmes critiques ont longtemps été concentrés dans mes infrastructures informatiques des grands groupes (Industriels, Bancaires, ….), les ordres de grandeurs portés par internet ont bousculé les pratiques de gestion de la performance.

Dans le cas de systèmes fermés (clients et serveurs en interne) la performance est majoritairement prise en compte a posteriori : validation par des tests de charge, optimisations des structures, dénormalisation éventuelle, utilisation de mécanismes spécifiques (index bitmap…). L’intervention ponctuelle d’un expert est requise pour tenter de comprendre un problème de performance lors d’un pic inexpliqué ou lors d’un changement brusque et imprévu de la nature des données (calcul des statistiques obsolète) ou encore lors d’une augmentation de la volumétrie suite à des nouvelles fonctionnalités.
Pour les acteurs du Web ouverts à la diversité et aux volumes astronomiques de données, la performance et la continuité de service font partie des enjeux vitaux de l’entreprise.
Elles doivent faire face à une augmentation exponentielle des accès concurrents en base couplée à une explosion de la volumétrie. De plus, cette volumétrie regroupe désormais tous types de données (texte, image, vidéo, … ) et ce phénomène n’est pas prêt de s’essouffler avec le mobile …
Pour faire face à ces nouveaux défis, les architectures ont du devenir ‘hyper-scalables’, avec des impacts sur chaque élément du système d’information qui doit être aligné sur cet enjeu. Il s’agit de créer une ‘composition’ technique intégrant les critères suivants :

  • La capacité du système à monter en charge pour pouvoir gérer toutes les demandes client lors des pics d’activité ;
  • La capacité d’un nœud à pouvoir paralléliser les traitements ;
  • La capacité à « pousser » les IO au plus près de la CPU ;
  • La capacité du système à dépasser les limites d’un réseau classique (TCP/IP) ;
  • La capacité du système à masquer les limites des disques classiques (disques magnétiques).

Une architecture « scalable » repose sur un ensemble de nœuds, le nombre de nœuds pouvant être augmenté au fil du temps. De plus, la clusterisation d’un système permet de supporter un grand nombre d’accès concurrent de manière continue. Cette haute disponibilité permet de faire face à la défaillance d’un nœud via du failover. Ce type d’architecture peut être « shared nothing » (partitionnement des données sur plusieurs disques, chaque nœud ayant accès à un seul disque ) ou « shared everything » (données stockées sur un système de stockage commun à chaque nœud ).

Une architecture de type «shared nothing» est par exemple implémentée par Teradata. Elle s’adapte bien pour des applications de type datawarehouse car la « scalabilité » est linéaire. En revanche, la gestion d’un nœud défaillant est moins transparente (les données non partagées doivent être réappropriées par les autres noeuds) et le load-balancing est impossible. Enfin, l’ajout d’un nouveau nœud nécessite de redéfinir l’architecture en particulier au niveau de la distribution des données.
Une architecture de type « shared everything » est par exemple mise en œuvre par Oracle (option RAC). Elle est intéressante pour les applications de type OLTP notamment en terme de cohérence des données. Elle assure un load-balancing de grande qualité, un failover performant pour la lecture et s’améliorant à chaque nouvelle version pour la mise à jour. Cependant, la scalabilité est moindre du fait notamment de la cohérence des données à assurer au niveau des caches de chaque nœud via un gestionnaire distribué de verrous ce qui induit un trafic réseau plus important entre chaque nœud. L’addition d’un nœud, quant à elle, peut être faite en mode « online ».

Les architectures de type big data reposent sur le concept de sharding. Il consiste en un partitionnement des données sur plusieurs machines. Il a été introduit par Google afin de disposer d’un modèle plus « scalable » que les solutions de type cluster proposées par les grands éditeurs tout en restant le moins onéreux possible au fur-et-à-mesure de l’ajout des machines. Ce partitionnement est géré automatiquement par le système via des tables de hachage distribuée (DHT). Suivant la nature de la distribution (maître ou non), cette table se trouve sur un nœud ou sur plusieurs nœuds. Elle peut être aussi partitionnée sur plusieurs nœuds.

Pour illustrer le concept de sharding, considérons l’architecture de MongoDB, une des solutions les plus utilisées dans le big data avec Cassandra. Le choix de la répartition se fait par collection. Les documents de cette collection (format BSON ) sont distribués le mieux possible entre les différents nœuds, au plus 1 000 en théorie, afin d’équilibrer les traitements. A chaque opération, il faut identifier le nœud qui contient les données. Ce rôle est assuré par un processus dédié qui reçoit les requêtes et les reroute sur le bon nœud. De plus, la continuité de service est assurée par un mécanisme de réplication de type maître-esclave à l’intérieur de chaque nœud, ce dernier pouvant définir un groupe de réplicas avec basculement automatique.

Perf
Mémoire, CPU, Disque… : La performance traquée sous toutes ses formes

Sur les architectures modernes, une partition sur un châssis peut disposer de plusieurs cœurs, chaque cœur pouvant traiter plusieurs threads. Cette nouvelle donne peut être utilisée par les moteurs de base de données quels qu’ils soient afin de paralléliser au maximum les traitements.

L’arrivée de nouvelles bases mémoire est en train de changer la localisation des données au niveau des bases. Jusqu’à maintenant, les données les plus usitées sont chargées dans des « buffer cache » en RAM. Désormais, des solutions comme MonetDB ou Hana de SAP sont capables de charger directement les données compressées non plus en RAM, mais dans le cache CPU. Il en découle une accélération des traitements car les accès RAM sont minimisés comme auparavant les accès disque : l’accès à la RAM est de l’ordre de 60-100 ns, l’accès au cache CPU de l’ordre de quelques ns suivant le niveau du cache.

Les systèmes critiques souffrent parfois de problèmes de latence au niveau réseau lors de l’utilisation du protocole TCP/IP. Pour dépasser ce type de limite, le réseau Infiniband a été créé ce qui permet aux différents éditeurs de proposer des protocoles plus performants, par exemple le protocole RDS. Il a pour but d’optimiser le transfert des données entre le système de stockage, par exemple une baie de disque et l’instance traitant ces données sur un serveur. Il utilise un bus bi-directionnel qui se caractérise par une latence faible (environ 1 microseconde) et des débits de plusieurs dizaines de Gbits/s.

Enfin, les serveurs de stockage permettent de filtrer les données récupérées sur les disques flash ou magnétiques. L’objectif est de diminuer au maximum le trafic réseau entre le système de stockage et le serveur de bases de données. Par exemple, Oracle dans son appliance Exadata a introduit un serveur de storage et une technologie appelée « smart scan » qui permettent de réduire la bande passante dans le cas des full scan table. Si la requête contient une condition, elle est prise en compte en amont au niveau du serveur de storage ce qui permet de ne retourner que quelques MB de données au lieu de quelques GB au serveur de bases de données.


Nous poursuivrons cette série par quelques fiches techniques sur les technologies ExaLytics, ExaData, MongoDB , Sap HANA… restez connectés.

Cet article a été rédigé par Stéphane NOTTER, Expert architecture et performance des données à Infotel.



Stéphane anime des Brown Bag Lunch (ou pique nique technique) sur les technologies liées au bases de données sous toutes leur formes:

  • « MongoDB vu par un expert Oracle »
  • « NoSQL vs SQL : l’écosystème de la base de données à l’heure du BigData »

Contactez-Nous !



Hadoop User Group de Février [Compte-Rendu]

Le HUG de Paris se réunissait ce jeudi 13 février dans les locaux de Google. Le succès ne se dément pas de session en session avec un public toujours plus nombreux.

Au menu, deux présentations de solutions techniques et un retour d’expérience décliné sur plusieurs projets représentatifs.

1re session technique : Hadoop sur Google Compute Engine

La présentation et les démonstrations de Google portent un même message: GCE est une plateforme mature pour faire tourner Hadoop. Ceci inclut les opérations de lancement / monitoring / extinction un cluster. La couverture fonctionnelle se veut aussi complète que celle de Amazon MapReduce (nous avons été invités faire la comparaison) et surtout plus performante. Cela se traduit notamment en supprimant les opérations de migration des données entre le stockage pur et l’espace d’exécution de Hadoop. En plus clair : les instructions Pig, Hive, MapReduce s’exécutent directement sur le Google Cloud Storage.

2ème partie : le retour d’expérience d’EDF

C’est une équipe du département R&D de l’opérateur national qui présente ses travaux. Oui, c’est le département Recherche et développement, mais les travaux engagés depuis 3 ans sur hadoop et son écosystème ont donné des garanties suffisantes pour en faire une des approches technologiques de choix pour une grande diversité de sujets présents et à venir:

  • Modélisation, Analyse et Prévision des consommations électriques en captant les informations de compteurs intelligents,
    o L’approche astucieuse de transformation de séries temporelles (très complexes à traiter en l’état) en chaines de caractères permet de bénéficier de tous les mécanismes d’analyse de modèles (corrélation, identification d’anomalies, prédictions…)
    o EDF a créé une librairie spécifique pouvant être exploité dans Hive, l’idée étant comme souvent de donner un niveau fonctionnel aux manipulations de données qui les rendent facilement utilisable pour des équipes ‘métier’ ou ‘statistique’.
    o Les débuts de l’architecture Hadoop ont été modestes, comme souvent, mais même à partir d’un cluster de petite taille on arrive à valider un modèle qui montera facilement en capacité. A ceci s’ajoute que le premier défi était de trouver une solution qui offrirait une capacité de stockage en ligne avec les futures avalanches de données ; désormais c’est la polyvalence de la solution et le coût bien moindre que des solutions d’éditeurs qui sont les atouts mis en avant par l’équipe R&D.
  • Analyse de l’e-réputation de EDF par acquisition de données textuelles qui cheminent dans un processus : Analyse sémantique, regroupement par thèmes (clusterisation) avec du Machine Learning (Mahout), puis visualisation (http://sigmajs.org/).
  • Prise en main de Storm et application à la prévision de consommation en mixant des sources de données diverses (Consommation électrique, profil client issu du CRM, données météorologiques, réseaux sociaux) : le cas d’application est parlant, avec un ROI potentiel très palpable… la mise en œuvre n’est pas si simple.

3ème et dernière ‘session’ : Apache Phoenix, un projet de ‘portage’ du SQL sur HBase.

L’ambition du projet est de bénéficier de la puissance de HBase sans souffrir de la technicité requise encore aujourd’hui pour vraiment exploiter cette base de données ‘Colonne’ posée sur Hadoop.

Au vu de la technicité de la présentation, on perçoit bien que c’est un sacré tour de force et que certains compromis sont néanmoins nécessaires pour transformer une base NoSQL en un système exploitable avec un SQL presque standard… A suivre, sachant que certaines questions émergent naturellement:

  • Jusqu’à quel point les briques Hadoop doivent-elles intégrer des problématiques plus habituellement liées au transactionnel ?
  • Les bases NoSQL (colonne, document, clé-valeur) pensées pour des usages précis ne vont-elles pas perdre en crédibilité à force d’être utilisées hors de leur champ de prédilection, souvent au travers de casse-têtes de développement et d’exploitation?


Livre Blanc ‘BigData au service du Système d’information’

Voici un billet qui sort des habituelles présentations de cas techniques, mais Il en représente d’une certaine manière l’aboutissement. Il a en effet pour objectif de donner la vision que nous avons, à Infotel, des enjeux relatifs au « BigData pour le Système d’Information ».

Nous travaillons auprès de grands groupes dans une diversité de technologies et de cas métiers très importante. Nous sommes impliqués de ce fait sur des projets sensibles, au coeur des SI, qui associent bien souvent des exigences très élevées en performance, en volume de données à traiter, et en rapidité d’évolution fonctionnelle.

Le BigData commence à bouleverser dans des proportions nouvelles nos habitudes. Enjeux, opportunités, stratégies de mise en oeuvre… voici ce que nous vous proposons de parcourir au travers de ce livre blanc.

Vous pouvez le télécharger ici : http://infotel.com/services/big-data-360/


Nous vous en souhaitons une lecture profitable et sommes à votre disposition pour en discuter avec vous.

Hubert STEFANI



Le petit déjeuner MongoDB …et BigData.

Ce Jeudi 19 septembre 2013 a eu lieu une présentation en deux temps autour de MongoDB, la base de données NoSQL « taillée pour le BigData » la plus populaire au monde !

1- Les généralités sur MongoDB

Une présenation des avantages indéniables de MongoDB sur toutes les solutions du marché… En dépassant l’accroche ‘on-a-la-meilleure-réponse -technique-à –vos-problèmes-de-volumétrie-variété-vitesse‘, MongoDB dispose de véritables atouts et de références de choix. Pour en profiter à plein, il faut cibler le besoin métier qui pilote et la façon dont les données vont être exploitées donc structurées (par opposition à des approches guidées par la technique):

  • L’enseignement majeur est que le ‘BigData’ (terme abusivement employé) n’offre pas de solution magique répondant à : « je déverse toutes mes données à disposition et ma nouvelle solution saura m’offrir toutes la souplesse et performance requise pour mes applications en lignes et mes analyses business intelligence ».. Attention aux mirages
  • Il faut donc s’attacher à « quelle est la manière dont je vais interroger mon système » pour chaque besoin métier ; quitte à découper en autant de réponses (donc de groupes de données et d’applications) que d’exigences métier. Ce sont les stratégies adoptées par tous les géants (Google, Yahoo…).

2- Le retour d’expérience de Criteo.

Cette présentation met en avant l’aspect symptomatique lorsqu’on parle d’une base de données NoSQL (ou autre brique technique connotée BigData) qui est de tirer le fil des technologies voisines qui viennent dialoguer avec:

  • a. On ne construit pas sa solution métier sur un socle monolithique, mais comme un assemblage des meilleurs pratiques :
    i. MongoDB pour stocker le catalogue des produits,
    ii. MemCache pour encaisser les milliards de hits par jour,
    iii. Hadoop pour les traitements de fonds qui envoient les données traitées et agrégées vers les analystes BI,
    iv. Etc …
  • b. MongoDB est depuis la v2.4 une solution qui a atteint un stade de maturité qui en fait une solution extrêmement robuste et performante sur ses qualités (lecture, requête géospatialles, réplications…).
  • c. L’intégration de MongoDB et Hadoop requiert encore un peu d’huile de coude, notamment quand il s’agit de déverser de très gros volumes en BSON vers HDFS.

Egalement intéressant à noter, la check liste de Critéo somme-toute classique que l’on déroule (éventuellement à base de prototype) pour choisir sa solution technique,en l’occurrence pour stocker ses données en remplacement du SQL Server qui saturait:

  • i. Capacité à monter en capacité sur des machines ‘normales’ (serveurs intel de base),
  • ii. Faculté de réplication (sans nécessairement assurer une transaction pure et dure),
  • iii. Haute Disponibilité,
  • iv. Un cluster unique,
  • v. Capacité à accepter des requêtes dépassant la simple interrogation par clé,
  • vi. TCO (absence de coûts de licences en production),
  • vii. Existence d’un véritable support commercial,
  • viii. Faciliter la rapidité de développement et de déploiement.

Avec des volumétries atteignant les 800Tb en base et progressant toujours de manière soutenue, le retour d’expérience est significatif et quelques bonnes pratiques techniques se dégagent :

  • MongoDB est très performant tant que les I/O disques ne sont pas trop nombreux,
  • Placer ses données en RAM est un facteur de performances essentiel,
  • Il faut « sharder » (c’est à dire partitionner par serveur) avant même que les limites physiques soient atteintes, sous peine de souffrir à lisser la volumétrie en ajoutant de nouveaux nœuds (c’est effectivement ce que nous avons observé sur nos propres projets MongoDB),
  • Il faut établir un ‘modèle de données’ qui limite autant que possible les accès sur disque (notamment l’emploi d’index qui sont coûteux en temps d’écriture).

Voici quelques remarques complémentaires venues après un échange de questions/réponses soutenu :
– Le cas de Criteo est appliqué à des données :

  • Dont la structure qu’on peut qualifier de « simple » : Un catalogue de produit ; avec une hétérogénéité des catalogues qui est réduite en amont du chargement dans MongoDB. Ceci a grandement facilité la migration de données depuis SQL Server. Elle sera plus douloureuse à partir d’un modèle riche et éclaté.
  • Requêtées selon des approches légèrement plus évoluée que du clé/valeur. Comme le mentionne Julien Simon de Critéo, avec un patrimoine technique embarquant des procédures stockées, de nombreuses jointures, la transition n’en sera que plus complexe.


– Pour l’architecture technique, elle repose sur :

  • MongoDB,
  • RabbitMQ,
  • Cloudera,
  • Kafka,
  • Hadoop, Hive,

Le sentiment qui s’en dégage est que ces technologies sont performantes, robustes, pensées pour être faciles d’emploi même si leur intégration est un projet en soi. Leur déploiement et leur utilisation se démocratisent et permettent de construire des solutions qui suivent les problématiques métier dans leur rythme d’évolution toujours plus soutenu.

Bonne nouvelle ! Ce sont des technologies que nous avons mises en œuvre dans des projets clients pour transformer leur SI (et MongoDB en première place), confirmation que les patterns des géants du Web peuvent parfaitement s’appliquer en dehors de leur contexte premier…. Et si on en parlait?

Hubert STEFANI


Qualité de données avec Hadoop : ROI immédiat !

Nous avons mené depuis deux ans des projets internes autour d’hadoop pour améliorer les traitements que nous opérons sur les grandes bases de données dont nous avons la maîtrise : extraction, migration totale ou partielle vers des bases NoSQL.

Il s’est présenté récemment un cas d’utilisation représentatif des services rendus par les technologies connexes sur lequel nous avons mené une action poussée d’optimisation pour comprendre en profondeur les possibilités de Hive ainsi que les améliorations apportées par une approche de niveau Map / Reduce.

Contexte :

Des fichiers de plusieurs Go servent à la synchronisation d’applications dans les SI de différents clients. Ils obéissent à un format de données propriétaire (en SGML) et variable dans sa structure. Historiquement, des incohérences se sont glissées et induisent des erreurs lors de l’intégration des données et nécessitaient un traitement de purge.

L’approche classique aurait été de créer un développement spécifique, basé sur un parsing des fichiers de structuration dans un système de fichier ou une base de données, puis le développement des règles d’épuration avec une gestion fine de la cohérence.

Pour être le plus réactif possible, nous avons opté pour l’utilisation de notre cluster ‘Labo’ sous Cloudera : il est modeste (4 noeuds seulement), mais efficace. Le cheminement a été le suivant:

  1. Montée des données dans HDFS,
  2. Définition des meta-données qui permettront d’extraire les informations pertinentes :
  3. L’utilisation de balises comme élément de découpage ne permet par de conserver toutes les données pertinentes,
  4. Introduction d’un pré-traitement d’enrichissement du fichier pour préparer les extractions constitué comme suit :
    1. 1- Identification d’une chaîne de traitement,
    2. 2- Création d’une boîte à outils Java Map /Reduce pouvant être exploitée à plus haut niveau (i.e. dans Hive)

L’exécution des traitements par une approche naïve nous a conduit au constat de performances relativement médiocre : 41 minutes. Elle correspond à la séquence suivante :

Enchainement traitements Hive

En détail : L’objectif reste celui de réaliser une jointure entre les lignes possédant une donnée reconnue comme viable et celles similaires possédant un indicateur d’incohérence nommé L019EP.

Deux tables sont ainsi crées :

  • rightD : lignes contenant une donnée valide pour la colonne B (requête SELECT avec clause WHERE)
  • wrongD : lignes contenant une donnée non valide pour la colonne B (requête SELECT avec clause WHERE)
Temps de création des tables
(minutes)
Volumétrie des table (GB)
rightD 4 2
wrongD 17 8

Le résultat final est obtenu à travers la jointure entre les deux tables de données rightD et wrongD. Une jointure simple est alors réalisée sur l’égalité de leurs colonnes A et C et le résultat final est stocké dans la table resultSorted. Le contenu de cette table est alors l’ensemble des lignes ayant été en doublons et possédant une donnée valide pour la colonne B.
Voici la requête utilisée (très simple comme on peut en juger) :

3
4
5
INSERT OVERWRITE TABLE resultSorted SELECT T1.A, T1.B, T1.C  
FROM rightD T1 
JOIN wrongD T2 ON T1.A = T2.A  AND T1.C = T2.C;
Query time (minutes) Table size (GB)
resultSorted 20 15

Contrat rempli : nous avons une solution opérationnelle en moins de deux jours, avec un résultat permettant de purger les applications cibles.

Phase d’optimisation Hive – Map / Reduce

Dans l’optique de rendre récurrent l’exécution de ces traitements, il est nécessaire de se pencher sur les performances et obtenir un meilleur réactivité et une exploitation plus juste des ressources.
Il existe plusieurs leviers d’optimisation :

  • Travailler sur les requêtes,
  • Implémenter des algorithmes plus ‘bas-niveau’,
  • Optimiser son infrastructure (Réseau, disque, mémoire).

(on peut trouver des éléments techniques détaillées à : http://blog.cloudera.com/blog/2009/12/7-tips-for-improving-mapreduce-performance/ )

1- Monter en mémoire les données de faible volumétrie :

Le principe consiste à indiquer lors de la requête la petite table concernée. L’instruction MAPJOIN(petiteTable) permet cela :

INSERT OVERWRITE TABLE resultSorted 
     SELECT /*+ MAPJOIN(T2) */  T1.A, T1.B, T1.C      
     FROM rightD T1 
     JOIN wrongD T2 ON T1.A = T2.A  AND T1.C = T2.C;

Attention au dimensionnement de sa JVM et la mémoire allouée.

2- Implémenter notre traitement en Map / Reduce

Nous allons travailler au plus près des données manipulées en tirant profit de :

  • La rétention et organisation des seules données pertinentes dans notre donnée unitaire,
  • Maximiser la phase Map qui travaille en local pour soulager la phase de Shuffling et de Reduce qui fait ‘souffrir’ notre réseau inter-noeuds.

Algorithme Map Reduce

3- Ajustement des paramètres :

a. On positionne le nombre de reducers dans chacun des jobs pour exploiter au mieux le cluster.

public class RmvDblConf extends Configured implements Tool{
 
        public int run(String[] args) throws Exception {
 
    Configuration conf = getConf;
 
    JobConf job = new JobConf(conf, RmvDblConf.class);
 
    job.setJobName("RmvDblConf");
 
    //Positionnement du nombre de tâches de réduction
    job.setNumReduceTasks(12);
 
    //reutilization de l aJVM
    job.setNumTasksToExecutePerJvm(-1);
 
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
...

En répartissant le traitement de la phase Reduce sur l’ensemble du cluster, le temps de traitement total a été divisé par deux.

Process time (minutes) Output file size (MB)
Job ThreeReduceTasks 16 15

4- Paramétrages Hadoop & Système complémentaires

Voici la liste des autres axes d’optimisation qui doivent être mis en regard avec les spécificités de vos traitements (ils ne sont pas pertinents dans tous les cas !) :

  • Distinction des FS d’entrée et de sortie pour limiter la contention,
  • Réutilisation de JVMs,
  • Augmentation de l’espace mémoire attribué à chaque tâche Map et Reduce,
  • Augmentation des threads de copie du disque vers le buffer Reduce,
  • Augmentation du pourcentage de la mémoire libre à consacrer au buffer Reduce,
  • Augmentation de la taille du buffer Map,
  • La compression de données : les fichiers manipulés ne sont pas assez volumineux pour tirer profit de cette approche.

Cette optimisation s’ajuste en effectuant un monitoring précis de l’utilisation des ressources (CPU/ Réseau / Mémoire); voici un des éléments graphiques restitués par Ganglia :

Monitoring Phase Reduce

Consommation des ressources lors de la phase reduce

Le temps de traitement a été divisé par 6 entre l’approche naïve et celle mettant en œuvre l’ensemble des optimisations retenues :

Process time (minutes) Output file size (MB)
Job Map reduce optimized 7 15

Dans le cas où nous devrions travailler sur un autre cluster, l’optimisation passerait par deux étapes:

  • Premièrement, réaliser un audit du cluster afin d’entre autre de déterminer la mémoire totale disponible, connaitre la puissance totale ainsi que le nombre de machines (= nombre de tasktrackers) constituant le cluster et ainsi connaitre le nombre de tâches maximales pouvant être traitées au même moment.
  • Dans un deuxième temps, les jobs développés prendraient en compte ces paramètres avant même de chercher à pousser l’optimisation via les procédures à notre disposition, compression des données en sortie de la phase Map, optimisation d’écriture du code…