Reading notes
Designing Data-Intensive Applications

Designing Data-Intensive Applications

1 - Reliable, scalable and maintainable applications

  • Data-intensive désigne le fait que les données soient le bottleneck, par opposition à compute-intensive qui fait référence au CPU.
  • Les frontières entre les différentes catégories (base de données, cache, système de queuing etc.) deviennent parfois floues. Par ex : Redis est un cache utilisé comme système de queuing, ou encore Kafka qui est un système de queuing avec une garantie de persistance comme une BDD.
  • Il y a 3 enjeux principaux auxquels on répond quand on conçoit un système de données :
    • La fiabilité (reliability) consiste à fonctionner correctement malgré les fautes matérielles, logicielles, ou humaines.
      • Les disques durs sont connus pour faire des fautes tous les 10 à 50 ans, ce qui veut dire que sur un parc de 10 000 disques, il y en a un qui saute tous les jours. On peut prévenir ce genre de problème par de la redondance (RAID par ex).
      • Les fautes logicielles sont beaucoup plus insidieuses, et peuvent causer des dégâts en chaîne. Pour les prévenir on peut mettre en place du monitoring, prévoir des restarts de processus en cas de crash etc. Mais ça reste bien maigre en soi.
      • Les fautes humaines sont inévitables, il faut concevoir les systèmes de manière à décourager les actions problématiques, faire beaucoup de tests automatisés, rendre facile le fallback etc.
    • La scalabilité consiste à accompagner le système dans sa montée en charge en termes de données, de trafic ou de complexité.
      • Parler de scalabilité tout court n'a pas vraiment de sens, il faut préciser sur quel aspect on scale.
        • Il faut d'abord décrire le load sur lequel on veut scaler. Par ex (page 11) : pour twitter le load clé c'est le nombre de followers par personne :
          • la 1ère solution consiste à recréer la timeline de tweets de chaque utilisateur depuis la base de données
          • la 2ème à constituer des timelines à jour dans un cache, et de mettre à jour les timelines des followers à chaque tweet. Du coup avec la solution 2 tout dépend du nombre de followers.
          • Twitter a fini par adopter une solution hybride : la 2ème solution par défaut, et la 1ère pour les comptes avec énormément de followers. Par défaut la timeline est dans le cache, mais si une célébrité est suivie, une requête sera faite pour récupérer les tweets.
        • Ensuite il faut décrire la métrique de performance. Il s'agit d'augmenter le load qu'on a décrit pour voir jusqu'où on tient.
          • Si notre métrique concerne un service en ligne, on va en général prendre le temps de réponse.
            • (Le temps de réponse et la latence sont différents : la latence concerne le temps pendant lequel la requête est latente, c'est-à-dire qu'elle attend d'être traitée. Le temps de réponse est plus long que ça.)
            • Il faut reproduire la requête un grand nombre de fois, et prendre la médiane pour avoir une idée du temps que ça prend. Dans la même idée on peut prendre les percentiles pour voir par ex. si on arrive à rester sous un certain seuil pour 99.9% de nos requêtes (appelé p999).
        • Pour répondre aux problématiques de scalabilité :
          • Une réponse à un certain load ne marchera pas pour un load beaucoup plus important : il faut repenser régulièrement son architecture si on scale vraiment.
          • Il y a le scale vertical (machine plus puissante) et le scale horizontal (plus de machines, qu'on appelle aussi shared-nothing architecture).
            • En réalité, on utilise souvent un mix des deux : des machines puissantes pour certaines tâches, et du scaling horizontal pour d'autres.
          • La création de machines supplémentaires peut être manuelle ou “élastique”. La version élastique permet d'adapter aux grandes variations mais est plus complexe aussi.
          • Habituellement, avoir une application stateful qui est sur plusieurs machines est difficile à gérer, donc on essaye de garder la BDD sur une seule machine jusqu'à ce que ce ne soit plus possible. Avec l'évolution des outils, ceci sera sans doute amené à changer.
          • Il n'y a pas de magic scaling sauce : chaque application de grande échelle a ses propres contraintes, ses propres bottlenecks, et donc sa propre architecture.
          • Quand on crée un produit, il vaut au début passer surtout du temps à développer les fonctionnalités qu'à penser son hypothétique scaling.
    • La maintenabilité consiste à pouvoir à la fois perpétuer le système et le faire évoluer en un temps de travail raisonnable.
      • Pour qu'un système soit maintenable dans le temps, il faut travailler sur ces aspects :
        • operability : la facilité pour les ops de faire tourner le système.
          • Il faut faciliter la vie au maximum pour les ops. Ex : fournir un bon monitoring, permettre d'éteindre une machine individuellement sans affecter le reste, avoir de bonnes valeurs par défaut et un comportement auto-réparateur, tout en permettant aux ops de prendre la main.
        • simplicity : que le système soit le moins complexe possible pour le comprendre rapidement et pouvoir travailler dessus.
          • On peut par exemple réduire la complexité accidentelle, c'est-à-dire la complexité non nécessaire liée seulement à l'implémentation mauvaise.
          • Sinon globalement une bonne chose à faire c'est d'introduire des abstractions pour appréhender le système plus facilement. Par ex. les langages haut niveau sont des abstractions de ce qui se passe dans la machine.
        • evolvability : la facilité à changer ou ajouter des fonctionnalités au système.
          • Il s'agit ici de l'agilité mais appliquée à tout un système, et pas à de petites fonctionnalités.

2 - Data Models and Query Languages

  • Le modèle de données relationnel a dominé le stockage depuis les années 70, en apportant de l'abstraction autour de la manière dont les données étaient structurées, contrairement aux autres alternatives.
    • Toutes les tentatives de détrôner SQL ont échoué, la hype est retombée.
  • NoSQL arrive dans les années 2010 et regroupe tout un ensemble de technologies qui permettent de pallier aux problématiques de scalabilité, et d'offrir une plus grande flexibilité que les BDD relationnelles
    • Parmi elles, il y a notamment les BDD basées sur le modèle de document.
    • Il est probable que les BDD relationnelles et NoSQL soient utilisées conjointement dans le futur.
  • Il y a un décalage entre la POO et le format de BDD relationnel, qui oblige à une forme de conversion. Pour certaines données on pourrait utiliser une structure en document comme JSON par exemple au lieu du relationnel. Par ex pour le cas des infos d'un CV, on pourrait la ville d'un job autant de fois qu'elle apparaît.
    • On répète alors éventuellement plusieurs fois certaines informations dans les entrées, ou alors on les met dans une table à part mais on fait les jointures à la main depuis le code applicatif.
      • En réalité, ce problème est apparu dès les années 70. Le modèle hiérarchique (proche du modèle sous forme de document qui a fait résurgence récemment donc) faisait face à 2 autres modèles : le modèle relationnel et le modèle en réseau (network model) qui a fini par être abandonné.
        • Le modèle en réseau consistait à avoir un modèle hiérarchique mais avec la possibilité pour chaque donnée d'avoir plusieurs parents. Mais ça rendait le code applicatif difficile à maintenir.
    • La normalisation consiste justement dans les BDD relationnelles à trouver ce genre de répétition, et à les factoriser en une nouvelle table. Le but est d'éviter la duplication, et donc de renforcer la consistance des données. Ça permet aussi de les modifier facilement en un seul endroit.
  • Comparaison aujourd'hui du modèle relationnel et du modèle de document :
    • Simplicité du code applicatif :
      • Le modèle de document mène à un code applicatif plus simple dans le cas où il y a peu de relations many to many ou many to one (pour les one to many c'est ok puisqu'on répète de toute façon la donnée dans la table du modèle de document).
      • Dans le cas contraire il faudrait faire les jointures à la main donc le modèle relationnel serait meilleur (code applicatif plus simple et jointures par la BDD plus efficaces).
      • Dans le cas où il y a une forte interconnexion entre les données (de nombreuses relations many to many), c'est alors le modèle en graphe qui serait le plus pertinent.
    • Flexibilité du schéma de données :
      • C'est un peu comme la différence entre le typage statique et dynamique des langages de programmation : le modèle relationnel force à déclarer un type de données et à s'y conformer ou faire une migration. Le modèle de document permet de changer de type de données en cours de route et donc la gestion des données est entièrement confiée à l'application, qui gagne en liberté et du coup en responsabilité.
      • Le modèle de document est vraiment meilleur quand les données sont de type hétérogène, ou encore si elles sont déterminées par un système extérieur sur lequel la BDD n'a pas le contrôle.
    • Localité des données :
      • Vu que dans le modèle de document les données sont copiées dans chaque entrée, elles sont locales à celles-ci. On peut donc les avoir avec juste une requête, et on utilise moins le disque dur qu'avec le modèle relationnel. En revanche on va chercher le document entier, donc si on a souvent besoin d'un tout petit morceau ça n'en vaut peut être pas le coup.
      • Certaines BDD relationnelles permettent aussi de localiser des tables vis-à-vis d'autres (ex : Spanner database de Google, Oracle, ou encore Bigtable data model (utilisé dans Cassandra et Hbase).
  • Les différentes implémentations de BDD ont tendance à converger : la plupart des BDD relationnelles supportent les opérations dans du contenu XML ou JSON, et RethinkDB et MongoDB permettent de faire une forme de jointure automatique, même si moins efficace.
  • Le modèle relationnel offre un langage déclaratif, alors que le modèle hiérarchique n'offre qu'un langage impératif. L'avantage du déclaratif c'est que ça abstrait des détails qui peuvent être laissés à la discrétion de l'outil de BDD qu'on utilise pour faire des optimisations.
    • MapReduce, qui est un modèle popularisé par Google et disponible dans MongoDB, CouchDB et Hadoop est entre le déclaratif et l'impératif. Il abstrait certaines opérations mais permet aussi d'ajouter du code en plein milieu d'une requête qui aurait été atomique en SQL.
  • Dans les bases de données de graphes, les données sont représentées sous forme d'entités reliés par des traits.
    • Ex : Facebook utilise un graphe géant où sont présentes des entités variées (personne, lieu, commentaire), reliés entre eux avec des types de liens différents.
    • Modèle property graph (implémenté par Neo4j, Titan, InfiniteGraph) :
      • Il y a deux tables : les entités (vertices) et les traits (edges) avec chacun leurs propriétés, et pour les edges la liste des couples d'entités reliés par son biais.
      • On peut facilement créer de nouveaux types de liens, sans avoir besoin de vraiment modifier la structure de la BDD.
      • Le langage Cypher est un langage déclaratif inventé pour Neo4j.
      • L'avantage c'est que le langage de graphe permet de trouver des données en parcourant un nombre indéterminé de chemins, et donc de faire un nombre non connu à l'avance de jointures. C'est possible en SQL mais avec une syntaxe beaucoup plus longue.
    • Modèle triple-store (implémenté par Datomic, AllegroGraph) :
      • Il s'agit de la même chose que le property graph, mais présenté différemment : on a un groupe de 3 données qui sont (sujet, prédicat, objet).
      • Turtle et SPARQL sont des langages qui permettent d'utiliser le triple-store.

3 - Storage and retrieval

  • Un des moyens d'organiser les données dans une BDD est d'utiliser un système de log : l'ajout de données est fait en ajoutant le contenu à la fin d'un fichier (ce qui est très rapide), et la lecture est faite en parcourant l'ensemble des données (ce qui est très lent O(n)).
    • Pour accélérer la lecture, on peut créer des index sur les champs dont on estime qu'ils vont souvent servir à faire des recherches. Ça accélère la lecture, mais ça ralentit l'écriture puisqu'il faudra mettre à jour l'index à chaque fois.
      • On peut utiliser des Hash index tels que implémentés dans Bitcast, le moteur de stockage de Riak. Il s'agit d'avoir une structure associant une clé à un offset en mémoire vive. A chaque recherche on n'a qu'à trouver la clé et on peut directement lire la donnée sur disque.
        • Pour des raisons pratiques (consistance des données, performance grâce aux opérations séquentielles et non pas random), les fichiers de BDD ne sont jamais modifiés. On écrit les nouvelles données séquentiellement (donc pas de concurrence pour l'écriture) toujours à la fin du fichier, et on fait du ménage dans le fichier dans un nouveau fichier de BDD régulièrement. Pareil pour supprimer une donnée : on insère une commande dans le fichier et ce sera supprimé à la prochaine copie / optimisation du fichier de BDD.
        • Les limitations c'est qu'il faut que les clés tiennent en mémoire vive sinon c'est foutu, et que les recherches de “ranges” de clés ne sont pas efficaces, ça revient à chercher les clés une par une.
  • On peut aussi stocker les données sous forme triée dès le début. On a alors les Sorted String Table (SSTable). Ca consiste à avoir une structure d'arbre triée en mémoire où vont les nouvelles données (qu'on va appeler la memtable). Et tous les quelques Mo on écrit ça sur DD. Puis régulièrement on va faire des opérations en tâche de fond pour grouper les arbres triés en un seul. Lors d'une recherche, on va d'abord chercher dans le bloc le plus récent, puis de moins en moins récent, jusqu'à arriver au gros bloc, sachant que tous les blocs sont déjà triés.
    • Un des avantages c'est qu'on n'a plus à faire entrer toutes les clés en RAM. On peut avoir en mémoire un nombre de clés beaucoup plus épars qui indique les offsets.
    • Autre avantage aussi qui répond au problème du hash index : on peut faire des recherches de “range” d'index, vu que tout est déjà trié.
    • Et aussi, comme tout est déjà trié et qu'on a les offsets des données groupe par groupe, on peut compresser des groupes de données ensemble.
    • Ce mécanisme est aussi appelé Log Structure Merge Tree (LSM Tree) en référence à un papier décrivant le mécanisme.
    • De nombreux moteurs de BDD utilisent ce principe :
      • LevelDB (peut être utilisé dans Riak) et RocksDB
      • Cassandra et HBase, inspirés du papier Bigtable et Google.
      • Lucene (moteur d'Elasticsearch et de Solr) utilise un mécanisme similaire pour indexer le texte.
    • En terme d'optimisations :
      • La recherche peut être lente : on cherche dans la structure en mémoire, puis dans le premier bloc en BDD et ainsi de suite tant qu'on ne trouve pas, jusqu'à avoir cherché dans le bloc déjà compacté. Pour remédier à ça on peut approximer la recherche avec des structures efficaces appelées Bloom Filters.
      • Il y a 2 types de stratégies de compaction :
        • size-tiered : les nouveaux et petits blocs sont régulièrement fusionnés avec les anciens et plus gros blocs.
          • HBase utilise cette technique, alors que Cassandra supporte les deux.
        • leveled : les blocs sont plus petits et la compaction se fait de manière plus incrémentale, utilisant moins d'espace disque.
          • LevelDB tient son nom du fait qu'il utilise cette technique. On a aussi RocksDB ici.
  • La structure de BDD la plus utilisée et depuis longtemps est le B-Tree. La plupart des BDD relationnelles l'utilisent, mais aussi une bonne partie des BDD NoSQL.
    • Il s'agit d'avoir des pages de taille fixe (en général 4 ko), organisés en couches (rarement plus de 4 couches). Chaque page contient des clés et des références vers des zones physiques de disque pour aller chercher les clés entre deux clés indiquées (sorte de dichotomie donc). On descend de couche en couche jusqu'à arriver à une page qui contient des données et pas de références vers d'autres pages.
    • Comme avec les LSM-Tree, pour que les B-Tree survivent à un crash sans perte de données, on va écrire toutes les opérations dans un fichier de log avant de modifier la BDD elle-même. Ensuite on peut détruire ce fichier de log.
    • Il peut y avoir des problèmes de concurrence avec les B-Tree, on va alors utiliser des locks locaux pour bloquer correctement une partie de la BDD pour le thread qui écrit dedans. Ce problème n'existe pas avec les LSM-Tree puisque les opérations de restructuration sont faites en arrière plan.
  • Comparaison B-Tree / LSM-Tree :
    • Chacun a des avantages et inconvénients, le mieux selon Kleppmann c'est de tester empiriquement dans notre cas particulier lequel a la meilleure performance.
    • A priori, la plupart du temps l'écriture serait plus rapide sur les LSM-Tree (à priori parce qu'il y aurait souvent une write amplification moins importante), alors que la lecture serait plus rapide sur les B-Tree (parce que les LSM-Tree ont besoin de lire plusieurs groupes de données triées jusqu'à ce que la compaction soit faite en arrière-plan).
    • Les LSM-Tree sont meilleurs en particulier sur les disques durs mécaniques étant donné qu'ils organisent leurs données séquentiellement et ne font pas d'accès random.
    • Les LSM-Tree stockent leurs données sur moins d'espace grâce à la compression, mais en même temps au moment où les données arrivent, ils les stockent dans un autre fichier que la BDD principale. Jusqu'à ce que les opérations d'arrière-plan soient exécutées, il y a des copies plus ou moins récentes des données qui coexistent sur le disque.
    • Les B-Tree offrent plus de prédictibilité. Même si une opération d'écriture peut prendre plus de temps, on reste constant et évite des pics dans les hauts percentiles, qui peuvent arriver avec les LSM-Tree dans le cas où le disque serait par exemple surchargé et que les opérations d'arrière plan prendraient du retard.
  • En plus des index primaires il est possible de faire des index secondaires, qui vont indexer en fonction d'une autre colonne dont on estime qu'elle sera utile pour la recherche de données. La différence avec l'index primaire c'est qu'on n'a pas besoin d'avoir une unicité sur les données de la colonne indexée.
    • Cet index peut soit contenir une référence vers l'endroit où est stockée la donnée (qu'on appelle heap file), soit une version dupliquée de la donnée elle-même (on parle de clustered index). Il y a des avantages et inconvénients évidents à le faire et ne pas le faire (rapidité de recherche vs temps d'écriture et consistance).
  • On peut aussi faire des index multi-colonnes. Ça permet de chercher par plusieurs champs en même temps.
    • Le plus connu est l'index concaténé, qui consiste à accoler plusieurs champs ensemble dans l'index, par ex “NomPrénom”, qui permet de chercher par “Nom”, ou par “NomPrénom”, mais pas par “Prénom”.
    • Il y a aussi les index multi-dimensionnels, qui permettent de pouvoir chercher avec plusieurs colonnes indépendamment, utile par exemple pour la recherche de coordonnées géospatiales longitude / latitude.
      • C'est implémenté par ex par PostGIS dans PostgreSQL, qui utilise des R-trees en interne.
  • Lucene permet de faire des recherches de termes avec des distances (une distance de 1 signifie qu'avec une lettre différente dans le mot, il sera retenu) grâce à sa structure de clés en mémoire particulière.
  • La RAM étant de moins en moins chère, on peut imaginer des BDD entièrement en RAM.
    • Pour pallier le risque de perte de données, on peut écrire sur disque en parallèle, avoir de la RAM avec batterie, ou encore faire des réplications en mémoire.
    • Plusieurs moteurs de BDD fonctionnent comme ça :
      • VoltDB, MemSQL et Oracle TimesTen, ainsi que RAMCloud qui est open source.
      • Redis et Couchbase offrent une durabilité faible en écrivant sur disque de manière asynchrone.
    • Contrairement à ce qu'on pourrait penser, le gain de performance d'utiliser des BDD in-memory ne vient pas forcément de l'écriture/lecture sur DD en elle-même, puisque l'OS met de toute façon les données récemment manipulées en cache dans la RAM. En fait, le gain vient surtout du temps de conversion des données dans un format qu'on peut écrire sur DD.
  • Ce qu'on appelle transaction n'a pas forcément besoin d'être ACID (atomic, consistant, isolé, durable). Il s'agit simplement d'un terme désignant des lectures/écritures avec faible latence, par opposition à batch, qui lui désigne les jobs faits périodiquement dans le temps.
  • La transaction classique est appelée OLTP (OnLine Transaction Processing). Il existe un autre type de transaction : OLAP (OnLine Analytic Processing) qui consiste à agir sur peu de colonnes mais un très grand nombre d'entrées, pour faire des analyses de données (par exemple des comptages, statistiques etc.).
  • Depuis les années 80 les grandes entreprises stockent une copie de leur BDD dans un Data Warehouse : une base de données structurée de manière à optimiser les requêtes d'analyse, et ne risquant pas d'affecter la prod.
    • SQL permet d'être performant sur l'OLTP comme sur l'OLAP, globalement c'est ça qu'on va utiliser sur les data warehouses. Par contre les BDD sont structurées bien différemment pour optimiser l'analyse.
      • Un certain nombre d'acteurs proposent des data warehouses avec des licences commerciales onéreuses : Teradata, Vertica, SAP HANA, ParAccel (ainsi que Amazon RedShift qui est une version hostée de ParAccel)
      • D'autres acteurs open source de type SQL-on-Hadoop concurrencent les premiers : Apache Hive, Spark SQL, Cloudera Impala, Facebook Presto, Apache Tajo, Apache Drill.
    • De nombreux data warehouses sont organisés selon un star schema. On a la fact table au centre avec en général des dizaines voir centaines de champs, et représentant les événements étudiés. Et autour on a les dimension tables, répondant aux questions who, what, where, when, how, why et liées à la fact table par des foreign keys, ils représentent en quelques sortes les metadata.
      • Une variation du star model s'appelle snowflakes, il s'agit d'une version plus normalisée, où on va davantage découper les dimention tables en sous-tables.
    • La plupart du temps, les data warehouses utilisent un column-oriented storage plutôt qu'un row-oriented. Il s'agit de stocker les données des colonnes physiquement côte à côte, parce que les requêtes vont avoir besoin de lire en général quelques colonnes entières.
      • La plupart du temps les bases en colonne sont relationnelles, mais il y a par ex Parquet, basé sur Google Dremel, qui est orienté colonne mais non-relationnel.
      • Cassandra et HBase ont un concept de column families, mais ça consiste seulement à stocker toutes les colonnes d'une entrée ensemble, elles sont en réalité essentiellement row oriented.
      • Dans le cas où les valeurs dans les colonnes se répètent (en particulier s'il y a beaucoup plus de valeurs que de valeurs possibles), on peut faire une compression sur les colonnes. Par exemple une compression de type bitmap encoding
      • Un des avantages des column oriented storages c'est que ça se prête bien à un traitement optimal entre la RAM et le cache du CPU, avec de petits cycles de traitement de données compressées provenant de la même colonne.
      • On peut profiter du mécanisme des LSM-Trees avec les données en mémoire et le reste de la BDD sur disque, pour trier les entrées d'une façon particulière. Par exemple, on peut choisir la colonne qui est souvent la plus recherchée, et trier les entrées de manière à avoir toutes les entrées avec la même valeur dans cette colonne côte à côte. Et ainsi de suite pour les colonnes secondaires. Ça permet une meilleure recherche mais aussi une meilleure compression pour ces colonnes-là.
        • On peut également choisir de trier différemment chaque copie de la BDD qu'on possède, pour choisir celle qui nous arrange le plus au moment de faire une requête. C-Store et Vertica font ça.
      • Pour optimiser les requêtes dans les column oriented databases, à la place d'un inde on peut mettre en place une materialized view, qui consiste à ajouter une valeur ou une colonne de valeurs contenant des calculs (MIN, MAX, SUM etc.).
        • Un cas particulier s'appelle le data cube, il s'agit de prendre deux colonnes comme composant les deux dimensions d'une valeur qu'on cherche à analyser, et d'ajouter une colonne représentant un agrégat (par ex la somme des valeurs sur une des dimensions).
        • Cette pratique permet d'accélérer les requêtes parce que certaines choses sont pré-calculées, mais ça offre aussi moins de flexibilité. Donc en général on s'en sert comme boost de performance tout en laissant aux data analyst la possibilité de faire les requêtes qu'ils veulent.

4 - Encoding and evolution

  • Quand on change les fonctionnalités, y compris la BDD, il est utile de pouvoir faire une rolling upgrade (ou staged rollout). Il s'agit de déployer le code sur certains nœuds, s'assurer que tout va bien, puis déployer progressivement sur les autres.
    • Cela implique que le code et la BDD doivent être backward-compatibles (supporter les fonctionnalités du code précédent) mais aussi forward-compatibles (que le code précédent ignore les nouvelles fonctionnalités).
  • Les données ont besoin d'au moins 2 représentations : une en mémoire avec des pointeurs vers des zones mémoire, et une quand on veut transmettre la donnée sur disque ou à travers le réseau. Il faut alors que tout soit contenu dans le bloc de données. La conversion de la mémoire vers la version transportable s'appelle encoding (ou serialization ou marshalling).
    • Il existe des formats liés à des langages, comme pickle pour python, mais ces formats ne sont ni performants, ni ne gèrent bien la rétro (et forward) compatibility. Et le format de données est trop centré sur un langage particulier.
    • On a les formats plus standards comme XML, JSON et CSV.
      • XML et CSV ne distinguent pas les nombres des strings, alors qu'en JSON on distingue les nombres mais pas les flottants des entiers par exemple.
      • XML et JSON ont la possibilité d'avoir des schémas associés mais ceux-ci ne font pas consensus.
      • Globalement ces formats sont suffisants pour une communication entre organisations, tant que celles-ci s'accordent sur des conventions. En réalité, la plus grande difficulté est que des organisations différentes s'accordent sur quoique ce soit.
    • On a enfin les formats binaires non spécifiques à un langage et conçus pour la performance.
      • Il existe des versions binaires (ne faisant pas consensus) pour JSON et XML. Par exemple pour JSON il y a MessagePack, BSON, BJSON, UBJSON, BISON et Smile. Le souci de ces formats c'est qu'ils sont assez peu compacts parce qu'ils embarquent le nom des champs répété à chaque entrée de donnée.
      • Apache Thrift, développé par Facebook, et Protocol Buffers (ou protobuf), développé par Google sont deux formats binaires apparus en open source en 2007/2008. Leur particularité est qu'ils ont besoin d'un schéma, et qu'ils ne répètent pas le nom des champs pour gagner de la place. Ils ont tous les deux des adaptations dans la plupart des langages pour sérialiser / désérialiser des données dans ce format à partir des structures du langage.
        • A propos des formats :
          • Concernant Thrift : Il a deux formats différents :
            • BinaryProtocol qui remplace le nom des champs par des chiffres faisant référence aux champs du schéma.
            • CompactProtocol qui possède des optimisations supplémentaires pour gagner de la place en encodant le numéro du champ et le type de champ sur un seul byte, et en utilisant par ex des entiers de longueur variable.
          • Concernant Protobuf : il est globalement assez similaire au CompactProtocol de Thrift, avec des petites différences dans la manière d'encoder les bytes.
        • A propos de l'évolution des schémas (Protobuf et Thrift) :
          • Ajout de champ : comme les champs sont représentés par un numéro reporté dans le schéma, on peut facilement ajouter un champ. Ce sera backward-compatible puisque le nouveau code pourra toujours lire les données qui n'ont pas les nouveaux champs, et ce sera forward-compatible parce que l'ancien code pourra juste ignorer les champs ayant un numéro qu'il ne connaît pas.
            • On ne peut juste pas ajouter une donnée obligatoire après une donnée optionnelle, parce que le nouveau code ne pourrait plus lire les données anciennes qui n'auraient pas ce champ obligatoire.
          • Suppression de champ : c'est possible à condition qu'ils soient optionnels (les obligatoires ne pourront jamais être enlevés).
          • Modification de type de données : c'est parfois possible, mais il y a parfois le désavantage que notre donnée peut être tronquée par le code ancien qui ne lit pas toute la longueur prise par la donnée.
            • Protobuf ne possède pas de tableaux mais demande à ajouter un même champ plusieurs fois si on le veut dans un tableau. Cela permet de pouvoir transformer un champ unique en tableau du même type et inversement.
              • Thrift ne fournit pas cette flexibilité de transformation puisqu'il a un type pour le tableau, mais il supporte les tableaux imbriqués.
      • Apache Avro a été développé en 2009 pour Hadoop.
        • Il est similaire à Thrift et Protobuf, et a deux schémas : un (Avro VDL) lisible par les humains, et un autre plus pratique pour les machines.
        • Pour aller chercher un format encore plus compact, Avro ne mentionne pas de numéros pour les champs, il les met simplement les uns à la suite des autres dans le bon ordre, avec juste leur type et le contenu.
        • Le support de l'évolution du schéma dans Avro se fait en considérant que la machine qui a écrit la donnée a son schéma, et la machine qui lit a le sien. A partir de ces 2 schémas, et à condition qu'ils soient compatibles, Avro calcule exactement la conversion nécessaire pour que les données lues soient correctement interprétées.
          • Tous les champs avec une valeur par défaut dans le schéma peuvent être ajoutés, supprimés ou changés d'ordre d'apparition.
          • On peut modifier les types de champs, avec les mêmes problématiques que pour Protobuf et Thrift.
        • L'information du schéma de celui qui a écrit la donnée étant centrale dans l'encodage / décodage, elle doit être fournie avec la donnée.
          • Pour un grand fichier contenant plein de données, on met le schéma au début du fichier et il concerne toutes les données.
          • Pour une base de données qui a potentiellement des données avec des schémas différents, on peut ajouter un nombre faisant référence au schéma à chaque entrée de donnée, et avoir une table avec tous les schémas. C'est ce que fait Espresso (la base de données de document de Linkedin, qui utilise Avro)
          • Pour une connexion par réseau, les deux entités connectées peuvent se communiquer le schéma au début de la connexion et le garder tout au long de celle-ci. C'est ce qui se fait pour le Avro RPC protocol.
        • Un de principaux avantages d'Avro sur Thrift et Protobuf est que comme le numéro des champs n'est pas dans les données, on peut facilement générer des données organisées dans n'importe quel ordre, ou avec des champs en plus etc. les mettre au format Avro avec un schéma associé, et ils pourront être lus sans problèmes. Avro a été conçu pour gérer des données générées dynamiquement, ce qui n'est pas le cas de Thrift et Protobuf.
  • Il est intéressant de noter que les données associées à des schémas sont pratiques à bien des égards, que ce soit pour la flexibilité, le faible espace occupé, la documentation vivante que ça fournit. Et ces données se couplent bien avec les bases de données “schemaless” (les non relationnelles principalement donc) qui permettent de gérer les schémas au niveau de l'application.
  • Quand on passe les données d'un processus à un autre il faut s'assurer que la donnée est bien comprise malgré les versions des programmes tournant sur ces processus. Il y a 3 manières de passer les données encodées d'un processus à un autre :
    • 1- Dataflow through databases
      • Dans le cas des bases de données, le processus qui écrit encode la donnée, et le processus qui lit la décode. Ces processus peuvent embarquer des versions différentes du code, et donc il faut une backward compatibility pour pouvoir lire les données avec l'ancien schéma, et éventuellement une forward compatibility dans le cas où un noeud avec le nouveau code aurait écrit des données, et que ces données doivent être lues avec un noeud dont le code est ancien.
      • Il y a aussi une autre chose à laquelle il faut penser dans le code applicatif : pour le cas de la forward compatibility, si un vieux code traite des données possédant de nouveaux champs, il pourra ignorer ceux-ci, mais il faut absolument qu'il pense à les garder s'il veut mettre à jour ces données, sinon elles pourraient être supprimées sans le vouloir.
      • Contrairement au code qui finit par être chargé à la version la plus récente sur tous les nœuds, la base de données a en général diverses versions des données, certaines de plusieurs années. Dans la mesure du possible on ne remplit pas le contenu des nouveaux champs dans les anciennes entrées, mais on met juste null dedans. Le format Avro fournit une bonne manière de travailler avec des données nouvelles et anciennes de manière transparente.
    • 2- Dataflow through services
      • La communication par réseau se fait souvent avec une architecture client / serveur. Par exemple, le navigateur est client et le serveur fournit une API sur laquelle le navigateur va faire des demandes.
        • On a le même principe côté serveur avec l'architecture orientée services (SOA) (contrairement au nom SOAP n'est pas spécifiquement lié à SOA) ou plus récemment avec quelques changements ce qu'on appelle l'architecture microservices. Il s'agit d'avoir des entités indépendantes qui communiquent entre-elles via messages, et qui peuvent être mises à jour de manière indépendante, tout en communiquant avec le même format de données qui assure leur compatibilité.
        • Quand des services utilisent le protocole HTTP, on appelle ça des web services. On peut les trouver entre les utilisateurs et les organisations, entre deux services d'une même organisation, ou entre deux organisations avec par exemple les systèmes de carte de crédit ou le protocole OAuth pour l'authentification.
        • Il y a 2 approches populaires pour les web services : REST et SOAP (et GraphQL qui a été open sourcé en 2015 et ne figure donc pas dans le livre ?).
          • REST : principes de design généraux utilisant à fond les fonctionnalités du protocole HTTP et collant à son fonctionnement (par exemple pour le contrôle du cache, pour le fait d'identifier les ressources avec des URLs).
            • OpenAPI (Swagger) permet de documenter convenablement les API REST.
          • SOAP : protocole basé sur XML appelé Web Service Description Language (WSDL). SOAP se base beaucoup sur la génération de code et les outils. Les messages sont en eux-mêmes difficiles à lire par un être humain. Malgré les efforts ostentatoires, l'interopérabilité n'est pas très bonne entre les diverses implémentations de SOAP. SOAP est surtout utilisé dans les grandes entreprises parce qu'il est plus ancien.
        • En plus des web services, il y a un autre groupe de protocoles de communication à travers le réseau appelé Remote Procedure Call (RPC). Il s'agit en RPC d'appeler des méthodes sur des objets, et d'attendre une réponse de ces appels.
          • Il y a d'anciens protocoles spécifiques à un langage ou super complexes comme EJB (Java), DCOM (Microsoft), COBRA (trop complexe et non backward compatible). Mais il y a aussi de nouveau protocoles comme gRPC utilisant Protobuf, Finagle qui utilise Thrift, Rest.li qui utilise JSON sur HTTP, et Avro et Thrift qui ont leur propres implémentations de RPC.
          • Les protocoles RPC essayent de faire passer les appels réseau pour des appels à des méthodes, mais ces choses sont de nature complètement différente : un appel réseau est imprédictible, il peut prendre un temps variable, il peut finir en timeout, il peut réussir tout en n'envoyant pas de réponse, il ne peut pas stocker de valeurs en mémoire liés par des pointeurs etc. REST quant à lui assume que les appels réseau sont de nature bien différente en les présentant comme tels.
            • Ceci est à tempérer un peu avec les implémentations récentes de RPC qui sont plus explicites sur la nature différente en fournissant par exemple des promesses pour encapsuler les appels asynchrones.
          • Bien que les protocoles RPC avec un encodage binaire permettent une plus grande performance que du JSON par dessus REST, REST bénéficie d'un débuggage plus facile avec la possibilité de tester à travers les navigateurs, il est supporté partout, et il est compatible avec un large panel d'outils construits autour (serveurs, caches, proxies etc.). Pour toutes ces raisons, le RPC est utilisé seulement au sein d'une même organisation, typiquement dans un même datacenter.
    • 3- Message-passing dataflow
      • Il existe une manière de transmettre des données entre 2 processus qui se situe entre les appels RPC et les messages passés par une base de données : il s'agit de la transmission de messages asynchrone. On ne passe pas par le réseau mais par un message broker (ou message queue).
        • Les avantages de cette approche comparé à RPC sont :
          • Le broker peut faire buffer le temps que le(s) consommateur soit disponible.
          • Le message peut être redélivré en cas d'échec ou de crash.
          • Celui qui envoie et qui reçoit ne se connaissent pas, il y a un découplage à ce niveau.
          • Il peut y avoir plusieurs consommateurs d'une même queue.
        • Un des inconvénients potentiels c'est que le receveur n'est pas censé répondre. L'envoyeur envoie puis oublie.
        • Les message brokers sont historiquement des logiciels propriétaires comme TIBCO, IBM WebSphere, et webMethods. Plus récemment on a des brokers open source comme RabbitMQ, ActiveMQ, HornetQ, NATS et Apache Kafka.
        • Les message brokers n'imposent en général pas de format de données, donc on peut très bien utiliser les formats Avro / Thrive / Protobuf, et profiter de leur flexibilité pour pouvoir déployer indépendamment les producteurs et consommateurs.
  • L'actor model consiste à se débarrasser de la problématique de concurrence avec la gestion de threads et de ressources partagées en créant des actors indépendants, ayant chacun leurs états encapsulés, et communiquant avec les autres actors via messages asynchrones.
    • Dans la version distribuée, ces interlocuteurs peuvent alors être sur le même nœud ou sur un nœud différent, auquel cas le message sera sérialisé pour être transmis via le réseau de manière transparente.
      • Il y a une plus grande transparence vis-à-vis du fait que les messages peuvent être perdus qu'avec RPC.
      • Un framework actor distribué inclut un broker pour transmettre les messages. Il y en a 3 qui sont populaires :
        • Akka qui utilise la sérialisation de Java, mais peut être utilisé avec par exemple Protobuf pour permettre une meilleure backward/forward compatibilité.
        • Orleans supporte les rolling upgrades avec son propre système de versionning.
        • Erlang OTP supporte les rolling upgrades mais il faut les planifier avec attention.

5 - Replication

  • Il y a 3 raisons pour vouloir faire de la réplication :
    • Garder une copie des données proche des utilisateurs et donc avoir une faible latence.
    • Permettre au système de fonctionner même si certains composants sont foutus.
    • Pour scaler le nombre de machines et donc le nombre de requêtes qu'on peut traiter.
  • Tout l'enjeu de la réplication réside dans le fait de propager les changements dans tous les réplicats. Il y a 3 algorithmes pour ce faire : single-leader, multi-leader, et leaderless.
  • La réplication des BDD distribuées a été étudiée depuis les années 70 et n'a pas changé depuis parce que la nature des réseaux n'a pas changé. En revanche, l'utilisation industrielle de ces techniques est quant à elle récente.
  • La réplication la plus évidente est la leader-based replication. Pour écrire une donnée il faut le faire auprès du nœud leader, qui va mettre à jour sa copie de la BDD et envoyer un log (ou stream) de mise à jour à tous les nœuds suiveurs qui vont l'appliquer.
    • Ce type de réplication est intégré au sein :
      • des BDD relationnelles suivantes : PostgreSQL, MySQL, Oracle Data Guard, SQL Server's AlwaysOn Availability Groups.
      • des BDD non relationnelles suivantes : MongoDB, RethinkDB, Espresso.
      • Et même au sein de message brokers distribués comme : Kafka et RabbitMQ.
    • La réplication peut être synchrone ou asynchrone. Si elle est synchrone, alors le nœud leader doit attendre que tous les suiveurs aient répondu “ok” de leur côté pour répondre à son tour que la transaction s'est bien passée. Si elle est asynchrone, il répond tout de suite même s' il y a eu un problème du côté des followers.
      • Dans la pratique on choisit rarement le mode synchrone parce que n'importe lequel des nœuds pourrait mettre plusieurs minutes à répondre à cause de problèmes réseau.
      • On choisit parfois une réplication semi-synchrone, qui consiste à avoir un seul nœud suiveur synchrone, et le reste asynchrones. De cette manière on est assurés d'avoir les données à jour sur au moins 2 nœuds. Et si le nœud suiveur synchrone ne répond plus, on promeut un autre nœud suiveur comme synchrone pour le remplacer.
      • La réplication asynchrone est également souvent choisie, surtout si les suiveurs sont nombreux ou distribués géographiquement.
    • L'ajout d'un noeud suiveur supplémentaire se fait en faisant un snapshot de la base de données du noeud leader, en copiant ça sur le nouveau noeud, puis en demandant au leader tous les logs de mise à jour (toutes les transactions) qui ont eu lieu depuis le snapshot. Le nouveau nœud peut alors rattraper son retard et devenir un nœud suiveur normal.
    • En cas d'échec :
      • d'un nœud suiveur : le nœud sait à quel log il s'est arrêté, donc dès qu'il va mieux, il peut demander au nœud leader l'ensemble des transactions qu'il a ratées, et se remettre à jour. Ça s'appelle le catch-up recovery.
      • d'un nœud leader : c'est plus compliqué à gérer. Il faut un timeout pour déterminer qu'un nœud leader est en échec, et passé ce timeout on entame un processus de failover c'est-à-dire de remplacement du leader.
        • La procédure peut être automatique ou manuelle. Un timeout trop long peut mener à une interruption du service trop longue, et un timeout trop court dans un contexte de surcharge peut mener à gérer encore moins bien la charge. Pour cette raison, certaines organisations préfèrent la méthode manuelle.
        • Le choix du nouveau leader est un problème de consensus, discuté plus tard dans le livre. A priori le nœud le plus à jour serait le meilleur choix.
        • Il est possible que l'ancien leader revienne et pense qu'il est toujours le leader en acceptant les opérations en écriture. C'est une situation dangereuse qu'il faut prévoir correctement.
        • Dans le cas de réplication asynchrone, certaines transactions peuvent ne pas avoir été passées aux suiveurs. Si l'ancien leader revient en tant que suiveur ensuite, que faire de ces transactions ? En général on les supprime, mais c'est pas trop trop.
          • Et ça peut être même problématique si les transactions sont en lien avec d'autres outils. Par exemple chez Github, un suiveur asynchrone MySQL était devenu leader avec des transactions manquantes. Il se trouve que la clé primaire était aussi utilisée dans un cache Redis qui lui avait les nouvelles transactions. Comme les nouvelles entrées ont été assignées à des valeurs de la clé primaire qui avaient existé auparavant, des utilisateurs ont pu avoir accès à des clés privées d'autres utilisateurs, issus du cache.
    • Fonctionnement de la réplication au niveau des messages (logs) :
      • Statement-based replication : il s'agit de faire suivre toutes les instructions de base de données aux suiveurs. Par exemple un INSERT, un UPDATE, un DELETE etc.
        • Dans le cas d'instructions non déterministes comme RAND(), on va se retrouver avec des valeurs différentes dans les suiveurs.
        • Pour les champs auto-incrémentés, il faut absolument que l'ordre des requêtes soit exactement le même, ce qui limite les transactions concurrentes.
        • Il est possible de travailler à rendre déterministe toutes les instructions qui pourraient poser problème, en envoyant une valeur plutôt qu'une instruction dans ces cas-là, mais il y a plein d'edge cases à traiter.
          • En général cette approche n'est pas très utilisée pour cette raison-là. MySQL l'utilisait jusqu'à une certaine version, mais utilise l'approche row-based replication depuis. VoltDB utilise en revanche cette approche.
      • Write-ahead log (WAL) shipping : il s'agit d'envoyer aux suiveurs le log des messages bas niveau (tel qu'il est utilisé par les BDD LSM-Tree, ou tel qu'il est utilisé par les B-Tree le temps que l'opération se fasse, et pour pouvoir la refaire à partir de ce log en cas d'échec).
        • L'avantage c'est que c'est déterministe, mais l'inconvénient c'est que les messages sont couplés à une implémentation bas niveau de la BDD. Ce qui veut dire qu'on ne peut pas faire tourner une version différente entre le leader et les followers. Et donc exit les zero-downtime rolling updates : il faut une période de downtime.
        • Ce mécanisme est utilisé par PostgreSQL et Oracle.
      • Logical (row-based) log replication : il s'agit de faire un peu la même chose qu'avec le WAL, mais on utilise un format de log indépendant de la BDD, avec les fonctionnalités minimales pour pouvoir mettre à jour correctement la BDD.
        • On est donc découplé du format bas niveau utilisé par la BDD, et on peut faire du zero-downtime.
        • Ça permet aussi d'envoyer les logs à une autre base de données de type data warehouse en temps réel.
        • MySQL binlog peut être configuré pour utiliser ce mécanisme.
      • Trigger-based replication : dans le cas où on recherche plus de flexibilité, plutôt que d'utiliser les mécanismes built-in des BDD, on peut bouger la logique de réplication au niveau applicatif.
        • Oracle GoldenGate permet de mettre à disposition les logs de la BDD pour le code applicatif et implémenter ce mécanisme.
        • Un autre moyen de l'implémenter est d'utiliser les triggers et stored procedures qui existent dans la plupart des BDD relationnelles. On peut grâce à ça exécuter du code applicatif à chaque transaction. Le résultat est placé dans une table à part et lu par un processus à part.
          • Databus for Oracle et Bucardo for Postgres font ça par exemple.
        • Ce mécanisme arrive avec son lot de bugs, et est moins performant. Mais il offre de la flexibilité.
    • Dans le cas où on choisit la leader-based replication avec des followers asynchrones parce qu'on cherche à scaler en lecture en ayant plein de followers, les followers vont se retrouver régulièrement en retard. En général c'est une fraction de seconde, mais dans la montée en charge ou avec des problèmes de réseau ça peut devenir des minutes. Ce retard s'appelle le replication lag. Et on parle d'eventual consistency pour désigner ce problème de consistance momentané des données.
    • Parmi les problèmes survenant il y a :
      • read-after-write consistency : le fait, pour un utilisateur, de pouvoir lire ses propres writes juste après : s'il écrit un message et recharge la page, et qu'il ne voit pas son message, il pourrait se mettre à paniquer.
        • On peut lire toute donnée qui a potentiellement été modifiée par l'utilisateur depuis le leader, et les autres depuis les suiveurs. Par exemple, le profil d'un utilisateur ne peut être modifié que par lui, donc on le lit depuis le leader.
        • Dans le cas où la plupart des données sont potentiellement modifiables par l'utilisateur, on perdrait l'intérêt du scaling à tout lire depuis le leader. On peut alors par exemple ne lire depuis le leader que les données qui ont été modifiées dans les dernières minutes, ou encore monitorer le replication lag pour ne pas lire depuis les suiveurs qui sont trop en retard.
        • Le client peut retenir le timestamp (temps ou donnée d'ordre logique) de la dernière écriture, et l'envoyer avec la requête. Le serveur peut alors n'utiliser que les suiveurs qui sont à jour jusqu'à ce timestamp, ou attendre qu'ils le soient avant de répondre.
        • Difficulté supplémentaire dans le cas de dispersion géographique : toute requête envoyée au leader ne sera pas forcément proche de l'utilisateur.
        • Autre problématique : si on veut que l'utilisateur puisse voir ses écritures depuis tous ses outils (navigateur, mobile etc.).
          • Ça rend inopérant la technique de se souvenir de la dernière modification côté client puisqu'il y a alors plusieurs clients.
          • On a aussi des problèmes supplémentaires dans le cas où il y a plusieurs datacenters. Les deux appareils pourraient être dirigés vers des datacenters différents.
      • monotonic reads : un utilisateur pourrait obtenir des données récentes depuis un replica à jour, puis recharger la page et obtenir des données anciennes depuis un replica moins à jour. Ça donne l'impression d'aller dans le passé.
        • Pour éviter ce phénomène on peut servir un même utilisateur toujours avec le même nœud suiveur tant que celui-ci est vivant.
      • Consistent prefix reads : il s'agit ici de respecter l'ordre causal des choses. Il faut que les données écrites en BDD le soient toujours dans le bon ordre. C'est un problème qui survient quand on partitionne la BDD (on en parlera au chapitre suivant).
        • On peut alors essayer de mettre les informations liées entre-elles dans la même partition.
        • On peut aussi utiliser des algorithmes qui empêchent les données d'être dans un ordre non causal.
      • Concernant le replication lag et l'eventual consistency qui en résulte, une des solutions pour y répondre s'appelle les transactions. Leur but est d'abstraire tout l'aspect distribué du code applicatif, et de s'occuper de répondre aux problèmes décrits ici (read-after etc.). Certaines personnes disent d'abandonner les transactions qui seraient trop coûteuses, mais on nuancera ça par la suite.
  • On a ensuite la multi-leader replication. On a plusieurs leaders qui mettent à jour des suiveurs, et qui se mettent aussi à jour entre eux.
    • En général la complexité supplémentaire induite par le fait d'avoir plusieurs leaders n'en vaut pas la peine si on n'a qu'un seul datacenter.
    • Les avantages du multi-leader dans un environnement multi datacenter :
      • On peut par exemple avoir un leader par datacenter, ce qui permet d'éviter de traverser la terre pour faire des requêtes d'écriture, donnant une perception de performance aux utilisateurs.
      • Un datacenter entier et son leader peut avoir un problème, puis rattraper son retard sur les autres datacenters dès que c'est bon.
      • Les erreurs réseau hors du datacenter impactent moins ce qui se passe dans le datacenter, étant donné qu'on n'est pas obligé d'aller chercher un nœud leader d'un autre datacenter pour écrire.
    • Certaines BDD supportent le multi-leader nativement, mais en général il faut un outil externe. C'est le cas pour Tungsten Replicator pour MySQL, BDR pour PostgreSQL et GoldenGate pour Oracle.
      • Il y a également CouchDB qui a été conçu pour permettre de résoudre facilement les situations de multi-leader.
    • Ces deux exemples illustrent le même principe que la réplication multi-leader :
      • Une application de calendrier pour mobile, desktop etc. pourrait fonctionner en maintenant une copie de la BDD dans chaque client, les laissant ajouter des événements même en étant hors ligne, et synchroniser les BDD quand les clients sont à nouveau en ligne. On a bien plusieurs leaders qui peuvent écrire, une possibilité d'eventual consistency le temps que le réseau revienne, et un travail de résolution des conflits à faire.
      • Les applications d'édition collaborative de texte comme Etherpad ou Google Docs fonctionnent comme si plusieurs leaders pouvaient faire des changements sur leur propre version locale, et propager ces changements de manière asynchrone. Ca aurait pu être du single-leader si chaque personne prenait un lock avant de faire un changement (un peu comme dans dropbox), mais là chaque changement est ajouté à un niveau vraiment atomique au document.
    • Le problème principal de la multi-leader replication c'est les conflits entre leaders ayant eu chacun une transaction en écriture sur une même donnée.
      • On pourrait demander aux leaders d'attendre que l'autre leader ait fini sa transaction avant d'en accepter une, mais alors on reviendrait à la position de single-leader, on perdrait l'avantage d'avoir plusieurs leaders acceptant des connexions en même temps.
      • Dans la mesure du possible, vu que la résolution de conflit est complexe et en général mal gérée, il vaut mieux éviter les conflits. On peut par exemple rediriger les requêtes d'un même utilisateur toujours vers le même datacenter.
      • Parfois un datacenter est hors d'usage, ou un utilisateur peut se déplacer et se rapprocher d'un autre datacenter, et on va vouloir le rediriger vers un autre leader. Il faut alors faire converger le conflit vers un état consistent :
        • On peut donner un identifiant à chaque transaction, basé sur un timestamp, un nombre aléatoire etc. et se dire que le plus grand nombre gagnera lors du conflit pour faire valider sa transaction, annulant l'autre. Dans le cas du timestamp c'est du last write wins (LWW). C'est très populaire mais on perd des transactions.
        • On peut donner un identifiant à chaque leader, et se dire que celui qui a le plus grand gagne toujours la résolution de conflit. Mais c'est pareil qu'avec l'identifiant de transaction : on va perdre des données.
        • On peut choisir une stratégie de fusion des données des 2 requêtes, par exemple ordonner le texte alphabétiquement et le concaténer.
          • Bucardo par exemple permet d'écrire un bout de code en perl pour choisir quoi faire des requêtes en conflit dès que le conflit apparaît au moment de l'écriture.
        • Enregistrer le conflit avec les 2 données quelque part, et laisser le code applicatif gérer ça par exemple en demandant à l'utilisateur quoi faire pour ce conflit.
          • CouchDB fonctionne de cette manière-là : il stocke les 2 données, puis à la lecture les envoie toutes les deux à l'application..
      • Un conflit peut être de manière évidente la modification d'un même champ, mais ça peut aussi être plus subtile et difficile à détecter. Par exemple une vérification au niveau applicatif qu'une chambre d'hôtel ne peut être réservée et donc mentionnée que par une seule réservation. Si le code applicatif a validé la requête, mais qu'on en l'a faite une dans chaque leader, on va réserver deux fois.
    • Quand on a 2 leaders, ils vont forcément s'envoyer chacun des updates. Mais si on en a plus, alors on peut avoir diverses topologies de propagation des mises à jour entre leaders :
      • La star topology consiste à avoir un leader au centre qui va mettre à jour tous les autres, et prendre des mises à jours d'eux.
      • La circular topology consiste à ce que chaque leader mette à jour son voisin, et finissant en boucle. Une information au niveau de la requête permet alors de savoir si elle a déjà été traitée par le nœud courant pour arrêter la boucle.
      • La plus générale est le all-to-all topology, où tous les leaders mettent à jour tous les autres.
        • Un des avantages du all-to-all est que si un nœud ne fonctionne plus, c'est transparent. Pour le circular et star il pourrait bloquer l'information et il faut alors reconfigurer la topologie.
        • L'inconvénient des all-to-all est que certaines connexions peuvent être plus rapides que d'autres, et alors si on se basait sur des timestamp pour l'ordre des transactions par exemple, cet ordre pourrait ne pas être bon.
          • Pour pouvoir faire quand même respecter la causalité dans ce cas, on peut utiliser la technique des version vectors.
        • Globalement la résolution de conflits est plutôt mal gérée dans les solutions existantes : par exemple PostgreSQL BDR ne fournit pas de garantie causale des écritures, et Tungsten Replicator pour MySQL ne détecte même pas les conflits.
  • On a enfin la leaderless replication, qui consiste à ce que tous les nœuds puissent accepter les requêtes en lecture et écriture.
    • Cette idée était tombée dans l'oubli depuis longtemps et a été remise au goût du jour quand Amazon l'a implémentée dans sa base de données Dynamo system. Elle est depuis utilisée dans les BDD open source Riak, Cassandra et Voldemort. Elles sont connues pour être les “BDD Dynamo-style”.
    • En cas de problème dans un des nœuds, celui-ci va rater des requêtes. Et quand il reviendra, ses données seront anciennes et le client risque d'obtenir des données pas à jour en lisant depuis ce nœud.
      • Pour résoudre le problème chez le client, le client peut envoyer la requête à tous les nœuds, et à la réception utiliser la version la plus à jour parmi ceux reçus, grâce à des numéros de version dans ces messages.
      • Pour s'assurer que le noeud se remet à jour il y a 2 moyens implémentés dans les systèmes Dynamo-style :
        • Read repair : Quand le client lit une valeur en parallèle depuis tous les réplicas, s'il constate une différence chez l'un d'entre eux qui aurait une version de transaction plus ancienne, il le met à jour avec une requête d'écriture. Ceci permet de mettre à jour les valeurs souvent lues.
        • Anti-entropy process : Pour les valeurs peu lues, on peut avoir des tâches de fond qui tournent, et dont le but est de repérer les différences entre nœuds, et mettre à jour ceux qui sont en retard.
    • Pour savoir si une requête a réussi, on peut utiliser le quorum consistency.
      • Soit :
        • n le nombre de nœuds à qui on envoie les reads et writes.
        • w le nombre de nœuds qui doivent confirmer un write pour le considérer comme réussi.
        • r le nombre de nœuds qui doivent confirmer un read pour qu'il soit considéré comme réussi.
      • Alors pour respecter le principe du quorum il faut que w + r > n.
        • Typiquement on choisit n impair, et w = r = (n + 1) / 2 (arrondi au supérieur).
        • Si on a beaucoup de lectures et peu d'écritures, on pourra mettre r = 1, comme ça dès qu'un seul nœud valide la lecture alors la transaction est validée. Les lectures sont alors plus rapides mais un seul nœud qui est down empêche alors l'écriture en BDD pour respecter la formule w + r > n).
      • Avantages et inconvénients du quorum :
        • L'intérêt de ce quorum c'est que les reads et writes se chevauchent, et donc qu'il y ait forcément au moins un nœud qui soit complètement à jour, pour être sûr que la donnée lue qui sera gardée sera complètement à jour.
        • On peut très bien choisir de ne pas respecter la formule du quorum et avoir moins de reads et writes nécessaires pour la validation. On aura alors une plus faible latence, une plus grande availability, mais une moins bonne consistance (on aura régulièrement des lectures renvoyant des données pas tout à fait à jour).
      • Même avec le quorum, on peut se retrouver avec des données pas à jour dans certains cas :
        • Si on utilise le sloppy quorum, on peut se retrouver avec les writes sur d'autres nœuds que les reads, et donc le chevauchement n'est plus garanti.
          • Il s'agit d'une option activable sur les BDD qui permet, dans le cas où une large partie de noeuds est momentanément non disponible, de choisir de prendre quand même les writes sur d'autres noeuds partitionnés qui ne font pas habituellement partie de n pour ces valeurs-là. Et quand les nœuds sont de retour, on leur donne ces valeurs (hinted handoff). Le problème c'est que pendant le temps où ils n'étaient pas là, ils avaient peut être certaines valeurs plus à jours qu'eux seuls avaient, et les reads ont pu être servis avec des valeurs pas à jour.
        • Dans le cas de writes concurrents, on est en présence d'un conflit qu'il faut résoudre comme discuté précédemment. Si on choisit de résoudre en annulant une des requêtes, alors on perd des données.
        • Si un read se fait en concurrence avec un read, le write pourrait être effectif chez certains replicas, et on ne sait pas ce que retournera alors le read.
        • Si un write a réussi sur certains réplicas mais pas tous, et que la transaction est en voie d'annulation, les réplicas où ça a réussi peuvent renvoyer cette valeur qui sera fausse.
        • Si le nœud à jour échoue, le nombre de nœuds en écriture tombe en dessous de w, et on peut n'avoir aucun nœud qui a la version à jour par rapport aux écritures déjà validées au moment de répondre.
        • Des problèmes de timing dont on parlera plus tard peuvent aussi survenir.
      • On voit bien que les Dynamo-style databases ne garantissent qu'une eventual consistency, même en respectant le quorum. Pour avoir des garanties plus fortes comme le “read your writes”, “monotonic reads” etc. il faudra faire appel aux transactions et au consensus.
    • Malgré l'eventual consistency de la leaderless replication, il peut être important de quantifier à quel point les données sont peu à jour dans les divers nœuds. Il faudrait mettre en place du monitoring mais c'est beaucoup moins simple que pour le leader-based où on peut facilement observer le replication lag du leader vers les followers. Là on peut avoir des valeurs peu lues très anciennes.
    • La leaderless replication est tout à fait aussi adaptée au multi-datacenter :
      • Cassandra et Voldemort traitent les nœuds dans les divers datacenters comme des nœuds normaux, avec le n global et un n configurable pour chaque datacenter. En général les clients n'attendent que le quorum du datacenter le plus proche pour maximiser le temps de réponse.
      • Riak ne fait du leaderless classique qu'au sein des datacenters, la synchronisation cross-datacenter se fait de manière asynchrone, un peu à la manière du multi-leader replication.
  • A propos de la gestion des écritures concurrentes :
    • Il faut noter que malheureusement ça ne se fera pas automatiquement par les implémentations des BDD qui sont relativement mauvaises. En tant que développeur, il faut connaître ces problèmes et implémenter des solutions nous-mêmes.
    • Voici quelques éléments de réflexion :
      • Last write wins (LWW) : on en avait parlé, il s'agit d'éliminer une des deux transactions concurrentes en déterminant par une méthode arbitraire laquelle est la dernière dans le cas où il n'y a pas de relation de causalité. C'est arbitraire parce que la causalité entre des événements qui ne se connaissent pas n'a pas de sens. C'est ça qu'on appelle des transactions concurrentes.
        • C'est problématique parce que même après avoir dit au client que la transaction s'est bien passée, elle peut être annulée en arrière-plan de manière silencieuse.
        • LWW est la seule méthode de résolution de conflit supportée par Cassandra, et une feature optionnelle dans Riak. Dans Cassandra il est recommandé d'utiliser un UUID comme clé pour éviter autant que possible des écritures concurrentes.
      • Ce qu'il nous faut donc c'est pouvoir distinguer deux événements concurrents de deux événements causaux. Dans le cas où c'est causal on pourra faire respecter l'ordre. C'est seulement dans le cas de la concurrence qu'on est condamné à perdre des données, fusionner les données ou avertir l'utilisateur.
        • Pour fusionner les données, on peut utiliser des structures spéciales qui le permettent facilement comme les structures CRDT supportés par Riak.
          • En interne il s'agit de fusionner les éléments dans une liste en cas d'ajout, et de poser des tombstones dans le cas d'une suppression plutôt que de supprimer directement. Cela permet de mieux gérer la suppression au niveau de plusieurs nœuds qui en prennent connaissance au fur et à mesure, et ont besoin d'effectuer l'opération eux-aussi.
        • Pour distinguer les événements causaux des concurrents, on peut utiliser les version vectors. Chaque replica a sa version qu'il incrémente à chaque traitement, et l'ensemble de ces versions sont appelées version vector. Ces valeurs sont utilisées par chaque réplica pour déterminer s'il y a de la causalité ou si on garde les deux versions concurrentes.
          • Les version vectors sont disponibles dans Riak 2.0, et sont appelés causal context. Le version vector est envoyé aux clients quand les valeurs sont lues, et renvoyé par les clients quand une valeur est écrite.

6 - Partitioning

  • Les partitions sont appelées :
    • shard dans MongoDB, Elasticsearch et SolrCloud
    • region dans HBase
    • tablet dans Bigtable
    • vnode dans Cassandra et Riak
    • vBucket dans Couchbase
  • Un cluster shared-nothing signifie qu'il s'agit de plusieurs machines distinctes, par opposition au scaling vertical où c'est la même machine qui partage le processeur, la RAM etc. là on ne partage rien à part à travers le réseau.
  • Les partitions existent depuis les années 80, et ont été redécouvertes par les BDD NoSQL et les Data warehouses Hadoop-based.
  • Vis-à-vis de la réplication, la notion de partition vient s'y superposer. On peut par exemple avoir des nœuds (ordinateurs) avec plusieurs partitions, et chacun d'entre eux peut être soit leader soit follower pour telle ou telle copie de telle ou telle partition.
  • La raison principale de vouloir des partitions est la scalabilité. Avec la réplication on pouvait scaler pour lectures, mais le partitionnement permet de scaler aussi en écriture.
    • Cependant, pour que le scaling fonctionne bien, il faut que la charge soit équitablement répartie entre les nœuds. Pour ce faire, on peut par exemple répartir aléatoirement les données dans les partitions (mais ça nécessiterait de demander à tous les nœuds en parallèle à chaque recherche).
    • Quand la charge est mal répartie on appelle ça des partitions skewed (biaisées). Et quand un seul nœud se retrouve à tout gérer on l'appelle le hot spot.
  • Parmi les types de partitions on a :
    • La partition par key range. On va attribuer un range de clés à chaque nœud, et y stocker ces données-là. Si on connaît ce range à l'avance, on pourra même directement demander au nœud concerné pour notre recherche.
      • On pourra par exemple avoir le 1er nœud qui a les clés commençant par A et B, et le dernier les clés commençant par W, X, Y et Z.
      • Bigtable et son équivalent open source HBase, ainsi que RethinkDB et MongoDB jusqu'à la version 2.4 utilisent cette technique.
      • Dans chaque partition, on peut garder les entrées triées de la même manière que les LSM-Tree.
      • On a un risque de hot spot, par exemple dans le cas où on recherche par la clé qui serait le timestamp, et que les partitions sont groupées par journée. La partition du jour courant risque de devenir un hot spot. Dans ce cas on peut préfixer la clé par un nom ou autre chose, pour constituer un index concaténé par exemple.
    • La partition par hash of key. On a la même manière de stocker par clé qu'avant, sauf qu'on va hasher la clé avec une fonction de hash simple (mais qui ne donne pas de duplicata). Et on va assigner des ranges de hashs aux partitions. Ceci fait que les données seront aléatoirement réparties.
      • MongoDB, Cassandra et Voldemort utilisent ce mécanisme.
      • Le désavantage de hasher la clé c'est qu'on ne peut plus faire facilement de recherche par range.
        • Dans MongoDB, si on a activé les clés hashées, il faut envoyer les queries de range à toutes les partitions.
        • Riak, Couchbase et Voldemort ne supportent pas du tout les queries de range.
        • Cassandra utilise un compromis entre les deux stratégies (hash et clé normale) : on a une clé composée avec une première partie hashée déterminant la partition, et ensuite une 2ème partie permettant de faire une recherche, y compris de range, dans la SSTable. On doit donc d'abord fixer la partition et ensuite on peut chercher ce qu'on veut efficacement.
      • Hasher la clé peut parfois ne pas suffire à éliminer les hot spot : dans le cas spécifique où on a une donnée qui est accédée / écrite de manière massive (par exemple une célébrité qui est fortement suivie qui s'exprime), il faut diviser cette entrée-là en plusieurs entrées sur plusieurs machines. On peut par exemple préfixer le hash d'un nombre et le répartir sur 100 machines différentes. Mais alors les lectures devront à chaque fois faire appel à toutes ces partitions et reconstruire la bonne donnée.
        • Pour le moment les BDD ne gèrent pas automatiquement ce genre de fonctionnalité, donc il faut le faire à la main.
      • Il existe un concept appelé consistent hashing, mais il est surtout utilisé pour les caches distribués à travers le monde (type CDN) pour éviter le besoin d'entité centrale, et n'est pas efficace avec les BDD. Certaines docs de BDD l'invoquent par erreur, mais pour éviter la confusion il vaut mieux qu'on parle de hash partitioning.
  • Les indexes secondaires sont extrêmement pratiques pour faire des recherches dans la BDD, mais elles introduisent une complexité supplémentaire.
    • Par rapport à leur support :
      • HBase et Voldemort ont évité de les supporter pour éviter la complexité de l'implémentation.
      • Riak a commencé leur support.
      • Pour Elasticsearch et Solr, ils sont leur raison d'être.
    • On a 2 manières de les implémenter avec le partitionnement :
      • Document-based partitioning : on va créer un index local à la partition. Toutes les entrées de la partition seront indexées pour la colonne choisie, mais l'index n'aura aucune idée de ce qui est indexé sur une autre partition.
        • Le problème c'est que quand on veut faire une recherche par index secondaire, on va alors devoir faire une requête auprès de toutes les partitions, puisque les partitions sont séparées par clé primaire, pas par l'index secondaire. On appelle ça le scatter / gather. Ceci fait que la requête va coûter cher, et devoir attendre que tous les nœuds répondent (donc on est soumis au problème des hauts percentiles qui nous ralentissent potentiellement beaucoup).
        • Cette approche est utilisée quand même dans MongoDB, Riak, Cassandra, Elasticsearch, SolrCloud et VoltDB.
      • Term-based partitioning : on crée un index global. Mais bien entendu il est hors de question de le mettre sur un seul nœud, au risque que ça devienne un bottleneck. On va le partitionner de même qu'on a partitionné l'index primaire : les premières clés de l'index secondaires seront dans la partition 1, celles juste après dans la partition 2 etc.
        • Cette technique rend la recherche rapide : puisqu'on sait quel nœud contient l'index qu'on veut, on lui envoie la requête directement. Par contre l'écriture est plus lente puisqu'elle va impliquer des modifications dans plusieurs partitions (celle de la donnée et de l'index primaire, et celle de l'index secondaire pour le mettre à jour).
        • En pratique, la mise à jour de l'index secondaire avec le term-partitioning se fait de manière asynchrone, et tant pis si une recherche avec l'index secondaire immédiatement après une écriture ne fonctionne pas.
        • Parmi les implémentations :
          • Amazon DynamoDB met à jour son index term-partitioned de manière asynchrone.
          • Riak et Oracle data warehouse permettent de choisir la technique de partitionnement de l'index secondaire.
  • Régulièrement, pour augmenter les capacités ou remplacer une machine malade, on doit rediriger les requêtes et déplacer les données d'une machine à l'autre. On appelle ça le rebalancing entre partitions. Il y a plusieurs stratégies pour l'implémenter :
    • Une stratégie à ne pas faire : hash mod N. Si on décidait de faire le modulo du hash de nos transactions pour les répartir dans les noeuds (par exemple le hash % 12 si on a 12 noeuds), alors à chaque fois que le nombre de noeuds changerait, on devrait faire du rebalancing, ce qui est beaucoup trop coûteux.
    • Fixed number of partitions. On va choisir un grand nombre de partitions, plus grand que le nombre de nœuds qu'on imagine qu'on va avoir, et on va attribuer plusieurs partitions par nœud (par exemple 100 par nœud). De cette manière, dès qu'on ajoute ou supprime un nœud, on peut déplacer quelques partitions ici et là pour équilibrer le tout.
      • Il faut bien choisir le bon nombre de partitions, s'il y en a trop ça crée un manque de performance du fait de chercher dans trop de partitions, s'il n'y en a pas assez on va déplacer de trop gros blocs au moment du rebalancing. Ça peut être difficile à trouver si notre charge varie beaucoup.
      • Cette approche est utilisée par Riak, Elasticsearch, Couchbase et Voldemort.
    • Dynamic partitioning. Pour les BDD utilisant le partitionnement de type key range (et pas hash range), avoir un nombre de partitions fixe peut être problématique par rapport au skewing, et choisir à la main combien en mettre par nœud est fastidieux. On va donc vouloir un système qui répartir dynamiquement les partitions, par rapport à la quantité de données présente dans chaque partition.
      • Quand une partition est jugée dynamiquement trop grosse, elle est coupée en 2 et une moitié est éventuellement déplacée sur un autre nœud.
      • HBase et RethinkDB par exemple utilisent le partitionnement dynamique (puisqu'ils utilisent aussi le key range partitioning).
      • Pour le key range c'est obligatoire, mais le dynamic partitioning peut aussi être utilisé avec le hash range partitioning. MongoDB par exemple donne le choix de key range ou hash range, et dans les deux cas fait le rebalancing de manière dynamique.
    • Partitioning proportionally to nodes. Le nombre fixe de partitions et le nombre dynamique de partitions est basé sur la taille des partitions. On peut choisir plutôt de se baser sur le nombre de partitions par nœud indépendamment de leur taille. On fixe un nombre de partitions par nœud et on répartit les données dedans. Si on ajoute un nœud, les partitions existantes maigrissent pour transférer une partie de leur données dans les partitions du nouveau nœud.
      • Cassandra et Ketama utilisent cette méthode.
    • On a un peu évoqué l'aspect manuel / automatique, mais plus concrètement :
      • Couchbase, Voldemort et Riak créent des suggestions de rebalancing automatiquement, mais demandent la validation d'un administrateur humain pour opérer le rebalancing.
      • Le rebalancing complètement automatique peut être tentant, mais il faut bien voir que c'est une opération longue et coûteuse, et que faire un mauvais rebalancing dans certaines conditions peut créer une cascade d'échec, le système croyant à tort que certains noeuds surchargés sont morts ou ce genre de chose. Globalement avoir un humain dans la boucle du rebalancing est une bonne idée.
  • A propos de la question du routing de la requête, comment le client va savoir à quel nœud envoyer sa requête ?
    • Il existe plusieurs solutions open source. Globalement 3 possibilités se dégagent :
      • Le client envoie à un nœud en mode round robin (chacun son tour), et ce nœud qui connaît le bon nœud va faire lui-même la demande, va réceptionner la réponse, et la retransférer au client.
      • Le client envoie la requête à un routing tier qui connaît le partitionnement actuel, et va pouvoir envoyer la requête au bon nœud.
      • Le client connaît déjà le bon nœud, et va directement lui envoyer la requête.
    • Dans tous les cas, il y a le problème de savoir comment l'entité qui connaît le partitionnement actuel reste à jour malgré les rebalancing ? C'est un problème difficile.
      • Il y a des protocoles pour atteindre un consensus dans les systèmes distribués, mais ils sont compliqués. On en parlera au chapitre 9.
      • De nombreux systèmes utilisent un service dédié au mapping entre partition / nœud et adresse ip.
        • ZooKeeper est l'un d'entre eux : tous les nœuds s'enregistrent auprès de ZooKeeper et lui notifient les rebalancings. C'est lui qui fait autorité en matière de routing. Et il notifie les entités qui en ont besoin (par exemple le routing tier) de l'état du réseau de nœuds / partitions.
        • Espresso de Linkedin utilise Helix, qui lui-même utilise ZooKeeper.
        • HBase, SolrCloud et Kafka utilisent aussi ZooKeeper.
        • MongoDB utilise son outil maison et mongos daemons comme routing tier.
        • Cassandra et Riak utilisent un gossip protocol pour que les nodes s'échangent leurs changements de topologie. La requête peut alors arriver sur n'importe quel nœud qui la redirigera correctement vers le bon. Ça met plus de complexité sur les nœuds, mais ça évite la dépendance à un outil externe comme ZooKeeper.
        • Couchbase ne fait pas de rebalancing automatique. Et il est couplé en général avec moxi, qui est un routing tier écoutant les changements venant des nœuds.
    • Enfin concernant l'accès au routing tier par le client, son adresse ip en changeant que rarement, une configuration de nom via DNS est suffisante pour y accéder.

7 - Transactions

  • Les transactions sont des unités logiques regroupant plusieurs lectures / écritures. Soit elles réussissent, soit elles échouent et alors le client peut réessayer en toute sécurité. Il s'agit d'abstraire tout un pan d'échecs partiels qu'il faut gérer sinon à la main.
  • Presque toutes les BDD relationnelles, et certaines non relationnelles utilisent les transactions pour encapsuler les requêtes. Cependant avec la hype récente du NoSQL, on a un certain nombre de BDD qui arrivent avec l'idée que pour la scalabilité et la high availability, les transactions doivent être abandonnées ou donner des garanties beaucoup plus faibles.
  • ACID signifie Atomicity, Consistency, Isolation and Durability. Malheureusement il y a de l'ambiguïté sur chacun des termes, surtout sur l'isolation.
    • Atomicity aurait pu être appelé abortability, parce qu'il s'agit d'annuler une partie des requêtes d'une même transaction si la partie suivante échoue. Comme ça on peut recommencer la transaction entière sans soucis.
    • Consistency est ici entendu comme étant la cohérence des données du point de vue applicatif. Contrairement aux 3 autres termes, la consistency relève bien de la responsabilité du code applicatif. Il s'agit de règles liées au domaine en question, par exemple les débits et les crédits doivent s'annuler.
    • Isolation consiste à gérer les transactions concurrentes : chaque transaction doit pouvoir s'exécuter sans être parasitée par d'autres transactions en plein milieu. On parle aussi de serializability, pour dire qu'il faut la même garantie que si les transactions étaient exécutées en série les unes à la suite des autres. La plupart des BDD ne fournissent cependant pas ce niveau de garantie.
    • Durability veut dire qu'une fois la transaction commitée, elle ne peut pas disparaître toute seule mais reste dans la BDD. Ca implique par exemple la technique du log write-ahead pour les B-Tree ou LSM-Tree, pour ne pas perdre les données. Cela implique aussi la réplication dans le cas de systèmes distribués.
  • L'atomicité et l'isolation concernent les transactions avec plusieurs écritures (plusieurs objets), mais aussi les “transactions” avec une seule écriture. Si un problème survient en plein milieu de l'écriture, il faut s'assurer que la base de données ne se retrouve pas dans un état inconsistant.
    • On dit parfois qu'on supporte les transaction (et même qu'on est ACID) quand on assure l'intégrité pour une seule écriture, mais c'est une erreur, la transaction désigne principalement le groupe de plusieurs écritures.
    • La garantie pour les écritures sur un seul objet est parfois suffisante, mais dans pas mal de cas il faut une garantie sur plusieurs objets :
      • Dans les BDD relationnelles (ou de graphe), les clés étrangères (ou les edges) doivent être mises à jour en même temps que l'objet change.
      • Dans les BDD de document, les données à mettre à jour sont en général dans le même document, donc pas de besoin de multi-object transaction de ce côté. Cependant les BDD de document encouragent aussi la dénormalisation à la place des jointures, et dans ce cas les données doivent être mises à jour conjointement dans plusieurs endroits pour ne pas que la BDD devienne inconsistante.
      • Quand on a des index secondaires, alors il faut mettre à jour aussi cet index, et ces index sont des objets différents du point de vue de la BDD, donc on doit bien avoir des transaction multi-objets.
    • Concernant l'annulation des transactions, c'est dans cette philosophie qu'est construite la notion d'ACID : si ça échoue on recommence la transaction.
      • Certaines BDD ne sont pas du tout dans cette philosophie : les BDD répliquées en mode leaderless sont plutôt sur du “best effort”. La BDD exécute ce qu'elle peut, et si on est dans un état inconsistant, c'est à l'application de gérer les erreurs.
      • Certains ORM comme celui de Rails et Django ne réessayent pas les transactions automatiquement, alors que c'est là le but même de l'ACIDité de celles-ci.
        • Certains problèmes peuvent quand même survenir quand une transaction est abandonnée :
          • Il se peut qu'elle ait fonctionné mais qu'on ne reçoive pas la réponse.
          • Si l'erreur est due à une surcharge de requêtes, réessayer la transaction n'arrangera pas les choses, au contraire.
          • Il ne faut pas réessayer si l'erreur est de nature permanente (par exemple une violation de contraintes, ie. une transaction qui fait quelque chose d'interdit), mais seulement si l'erreur est de nature temporaire (réseau, crash d'un node, etc.).
          • Si la transaction a d'autres side-effects que sur la BDD (par exemple l'envoi d'un email), alors réessayer juste après peut refaire les side-effects. On parlera des Atomic commit et Two-phase commit plus tard.
          • Si en réessayant à nouveau on échoue quand même, la requête pourrait être complètement perdue.
  • L'isolation au sens strict de transactions sérialisables est quelque chose de coûteux que les BDD ne veulent souvent pas implémenter. On a donc seulement des weak isolation levels qui ne répondent pas à tous les problèmes posés par les transactions concurrentes. Il faut bien comprendre chaque problème et chaque solution proposée pour choisir ceux qu'on a besoin pour notre application.
    • Read commited est le niveau d'isolation le plus basique.
      • Ca garantit :
        • Qu'il n'y aura pas de dirty reads : si au cours d'une transaction non terminée une écriture a été faite, une autre transaction au cours de la lecture ne doit pas pouvoir lire ce qui a été écrit.
        • Qu'il n'y aura pas de dirty writes : si au cours d'une transaction non terminée une écriture a été faite mais pas encore commitée, et au cours d'une autre transaction l'écriture est écrasée, alors il on peut se retrouver avec des données inconsistantes.
      • Read commited est l'isolation par défaut dans de nombreuses bases de données, parmi elles : Oracle 11g, PostgreSQL, SQL Server 2012, MemSQL.
      • Côté implémentation :
        • Pour les dirty reads, l'objet tout entier est bloqué avec un lock par la transaction, jusqu'à ce qu'elle soit commitée ou abandonnée.
        • Pour les dirty rights, on pourrait aussi mettre un lock, mais c'est perdre beaucoup en efficacité parce que certaines requêtes lentes vont empêcher de simples lectures. Alors la plupart du temps 2 valeurs sont conservées : l'ancienne valeur de l'objet qu'on donne aux nouveaux lecteurs, et la nouvelle valeur qui sera la valeur finale quand la transaction en cours sera terminée.
    • Snapshot isolation and repeatable read. Le read committed garantit que sur une même donnée il n'y aura pas des lectures / écritures de transactions différentes, mais ça ne garantit pas que différents objets de la base de données resteront cohérents entre eux au cours d'une même transaction.
      • Problèmes :
        • On peut par exemple lire une donnée, puis le temps qu'on lise la suivante celle-ci a été modifiée, et la combinaison des deux lectures donne quelque chose d'incohérent. En général il suffit de refaire la 1ère lecture et on a quelque chose de cohérent à nouveau.
        • Plus grave : une copie de BDD peut prendre plusieurs heures, et le temps de la copie des changements peuvent être faits, de manière à ce qu'au final on ait copié au fur et à mesure quelque chose d'incohérent. Même chose avec une requête d'analyse énorme qui met beaucoup de temps à lire un grand nombre de données : si elles sont modifiées en cours de route.
      • La snapshot isolation est supportée par PostgreSQL, MySQL avec InnoDB, Oracle, SQL Server et d'autres.
      • Côté implémentation :
        • En général pour les writes on a un write lock qui bloque les autres writes sur un même objet.
        • En revanche les reads n'utilisent pas de locks, et le principe c'est que les writes ne bloquent pas les reads et les reads ne bloquent pas les writes.
        • Chaque transaction va avoir son snapshot de données en fonction des données sur lesquelles il opère, et ces données ne seront pas changées de toute la transaction. On appelle ça le multi-version concurrency control (MVVC).
      • La snapshot isolation est appelée de différentes manières en fonction des BDD :
        • Dans Oracle elle est appelée serializable.
        • Dans MySQL et PostgreSQL c'est appelé repeatable read.
          • Ce terme repeatable read vient du standard SQL qui ne contient pas la notion de snapshot isolation, vu qu'elle n'existait pas à l'époque de System R (sur lequel est basée la norme SQL).
          • Et pour compliquer le tout, IBM DB2 utilise le terme de repeatable read pour désigner la serializability, ce qui fait qu'il n'a plus vraiment de sens.
    • Preventing lost updates. Jusqu'ici on s'est intéressé aux problèmes de lecture dans un contexte d'écritures dans d'autres transactions. Mais il y a également des problèmes survenant lors d'écritures concurrentes entre-elles. Les dirty writes en sont un exemple, et les lost updates un autre.
      • Si deux transactions modifient une même valeur de manière concurrente, la dernière transaction écrasera la valeur écrite dans la première. On dit aussi qu'elle va la clobber.
      • Exemples : un compteur incrémenté deux fois mais qui se retrouve finalement incrémenté de 1, ou encore deux utilisateurs modifiant la même page wiki en envoyant la page entière, le dernier écrasant les modifications de l'autre.
      • Ce problème courant a de nombreuses solutions :
        • Atomic write operations : vu que le problème des lost updates vient du fait qu'on lit d'abord la valeur avant de la mettre à jour, certaines BDD donnent la possibilité de faire une lecture suivie d'un update avec une atomicité garantie.
          • MongoDB fournit aussi la possibilité de faire des modifications locales à un document JSON de manière atomique.
          • Redis permet de modifier par exemple des priority queues de manière atomique.
          • En général les BDD le font en donnant un lock sur l'objet concerné par l'écriture.
        • Explicit locking : on peut, en pleine requête SQL, indiquer qu'on prend un lock manuellement sur le résultat d'une partie de la requête, pour le réutiliser dans une écriture juste après.
          • On peut facilement oublier de le faire ou mal prendre en compte la logique applicative.
        • Automatically detect lost updates : de nombreuses BDD permettent de vérifier la présence de lost updates, et en cas de détection d'annuler la requête et de la retenter juste après.
          • L'avantage aussi c'est qu'on peut le faire avec la même fonctionnalité que le snapshot isolation. PostgreSQL, Oracle et SQL Server le font de cette manière. MySQL / InnoDB en revanche ne supportent pas cette fonctionnalité.
        • Compare-and-set : certaines bases de données qui ne fournissent pas de transactions permettent des opérations compare-and-set qui consistent à exécuter un changement seulement si la donnée n'a pas été modifiée depuis la dernière fois qu'on l'a lue, ce qui permet normalement d'éviter les lost updates.
        • Dans le cas des BDD avec réplication : quand on a de la réplication les locks ne servent à rien, et le compare-and-set non plus. La meilleure solution est d'exécuter les deux requêtes et de garder une copie des deux résultats, puis de faire appel à du code applicatif ou d'utiliser des structures spéciales de fusion pour résoudre le conflit.
          • Riak 2.0 fournit des structures qui permettent d'éviter les lost updates à travers les réplicas.
          • Malheureusement la plupart des BDD ont par défaut une stratégie last write wins (LWW) qui est provoque des lost updates.
    • Write skews and phantoms : on généralise ici le cas des dirty writes et des lost updates dans la mesure où on va écrire sur des objets différents. Chaque requête concurrente lit les données, puis écrit dans un objet différent, mais comme ils le font indépendamment, le code applicatif ne se rend pas compte qu'ils cassent une contrainte applicative qui devait être garantie par le code applicatif. On appelle ça des write skew.
      • Exemple : il faut au moins un docteur on-call, il en reste deux et les deux décident de cliquer sur le bouton pour se désister. Les deux transactions se font en parallèle et modifient des objets différents liés au profil de chaque docteur.
      • Les solutions sont moins nombreuses :
        • Les BDD ne fournissent pas de moyen de mettre des contraintes sur des objets différents. On peut en revanche utiliser du code custom avec les triggers ou les materialized views si c'est supporté.
        • On peut locker les objets concernés par notre logique métier à la main au moment de faire la requête.
          • Cette solution marche si on a déjà les objets dont on veut que la valeur ne change pas. Mais si dans notre cas la condition c'est qu'une entrée avec une certaine caractéristique n'existe pas pour pouvoir faire quelque chose (par ex insérer un nom d'utilisateur s'il n'est pas déjà pris), alors on ne peut pas locker à la main une absence d'objet.
            • Dans ce cas où le write skew est causé par une écriture dans une transaction, qui change le résultat d'une recherche dans une autre transaction, le phénomène est appelé un phantom.
              • Une solution (peu élégante) peut consister à matérialiser les phantoms en créant une table spéciale avec un champ pour chaque élément possible, et demander au code applicatif de faire un lock manuel sur l'élément matérialisé correspondant à chaque write. Dans la plupart des cas, il vaut cependant mieux privilégier la serializability.
        • Malheureusement la snapshot isolation ne suffit pas, il faut une vraie serializability dont on va parler un peu plus loin.
  • Serializability : il y a un niveau au-dessus de tous les autres, qui permet de garantir que les transactions vont s'exécuter avec le même niveau de garantie vis-à-vis des race conditions que s'ils étaient exécutés les uns à la suite des autres, sans parallélisme du tout. Il y a 3 techniques pour l'implémenter dans un contexte non distribué :
    • Actual serial execution : on va exécuter les transactions vraiment les uns à la suite des autres, sur un seul thread.
      • Cette option est envisagée maintenant alors qu'elle était rejetée auparavant parce que la RAM est peu chère et on peut mettre l'essentiel de la BDD dedans, ce qui permet de rendre les transactions très rapides. Et aussi parce que les transactions OLTP sont courtes et impliquent peu de requêtes, alors que les OLAP sont certes longues mais sont read-only donc peuvent se faire hors de l'execution loop.
      • Cette approche est utilisée dans VoltDB / H-Store, Redis et Datomic.
      • Pour que ce soit possible sur un seul thread, il faut qu'il ne soit pas bloqué pendant qu'on demande à l'utilisateur la suite en plein milieu de la transaction. Il faut donc collecter les données qu'il faut pour toute la transaction, et faire la transaction entière en une fois. Pour ce faire, on utilise les stored procedures.
        • Ces procédures permettent d'exécuter du code écrit dans un langage spécifique : pour Oracle PL/SQL, pour SQL Server T-SQL, pour PostgreSQL PL/pgSQL, mais ces langages sont vieux, peu testables, et n'ont pas beaucoup de fonctionnalités.
        • Des BDD modernes permettent cependant d'utiliser des langages modernes pour les stored procedures : VoltDB utilise Java et Groovy, Datomic utilise Java et Clojure, Redis utilise Lua.
      • Pour la réplication, VoltDB permet d'exécuter les stored procedures sur chaque machine. Il faut alors que ces procédures soient déterministes.
      • Dans le cas où on veut scaler en écriture on a besoin de partitionnement. On peut alors créer autant de partitions que de coeurs de processeur sur la machine, et assigner un thread par partition. Chaque partition exécutera bien les transactions de manière séquentielle.
        • Attention par contre aux requêtes qui ont besoin d'effectuer des opérations à travers plusieurs partitions (à peu près tout sauf les données key/value), ça provoque des ralentissement de plusieurs ordres de grandeur.
      • Donc les contraintes pour utiliser l'exécution en série :
        • Chaque transaction doit être petite et rapide.
        • La BDD doit entrer en RAM. Une partie peu utilisée de la BDD peut rester sur disque, mais si on doit aller la chercher dans le thread unique c'est chaud au niveau perf. Une solution pourrait être d'abandonner la transaction, mettre la donnée dont on a besoin en RAM, et la retenter.
        • La charge en écriture doit être assez faible pour être traitée par une machine, ou alors il faut un partitionnement sans requêtes qui s'exécutent sur plusieurs partitions.
    • Two-Phase Locking (2PL) : c'est l'algorithme qui a été utilisé pendant 30 ans. Il s'agit de mettre un lock sur la donnée dès lors qu'on est en présence d'une transaction qui fait un write, même vis-à-vis de transactions qui ne font que des reads. En revanche s'il n'y a que des transactions qui font des reads, pas besoin de lock.
      • Comparé au snapshot isolation où les writes ne bloquaient pas les reads, et les reads ne bloquaient pas les writes, ici les writes bloquent aussi les reads.
      • 2PL est utilisé dans MySQL (InnoDB), SQL Server et DB2.
      • Fonctionnement : il y a les shared locks et les exclusive locks. A chaque fois qu'un read est fait sur un objet, la transaction prend un shared lock, qui permet de la faire attendre au cas où l'exclusive lock serait pris. Si une transaction veut faire un write, alors elle prend l'exclusive lock dès qu'elle peut, et tout le monde doit attendre pour accéder à cet objet que sa transaction entière soit terminée (d'où le 2-phase : on prend le lock, puis on termine le reste de la transaction de manière exclusive).
        • Pour être vraiment comme des transactions sérialisées, il faut aussi résoudre le problème des phantoms (un write qui modifie le résultat d'une recherche). On le fait en créant des locks sur des prédicats : si une transaction a besoin de faire une query pour chercher quelque chose, alors elle déclare un shared lock sur un prédicat, et si un write modifie le résultat correspondant à ce prédicat, alors ils se bloqueront mutuellement.
          • Le lock sur des prédicats étant très mauvais d'un point de vue performance, on approxime souvent les prédicats sous forme de lock d'index, en s'assurant qu'on lock éventuellement plus d'objets, et pas moins pour respecter la sérialisabilité.
      • Le souci de cette méthode c'est la performance, en partie du fait de nombreux locks, mais surtout du fait que n'importe quelle transaction peut faire attendre toutes les autres. Donc on a un flow assez imprédictible, et des high percentiles mauvais.
        • Les deadlocks sont détectés et résolus en annulant l'une des transactions, mais s'ils sont nombreux, ça fait d'autant moins de performance.
    • Serializable Snapshot Isolation (SSI) : il s'agit d'un algorithme très prometteur qui fournit la sérialisabilité, et en même temps n'a que très peu de différence de performance avec la snapshot isolation.
      • La SSI est à la fois utilisée par les BDD single node (PostgreSQL depuis la version 9.1) et distribuées (FoundationDB).
      • Fonctionnement : contrairement à l'idée de faire des locks pour protéger la transaction d'un conflit éventuel, qui est une approche dite pessimiste, ici on adopte une approche optimiste et on réalise toutes les transactions dans un snapshot à part. Au moment du commit on vérifie qu'il n'y a pas eu de conflits. S'ils ont eu lieu, on annule la transaction et on laisse l'application recommencer.
        • Il y a une difficulté vis-à-vis du fait de détecter si une transaction avec lecture initiale suivie d'une écriture devient invalide parce que la donnée lue est modifiée par une autre transaction. Il y a 2 solutions pour régler ça :
          • Détecter les lectures faites sur le MVCC (multi version concurrency control) qui ne sont plus à jour au moment où la transaction veut être commitée. Si on détecte, on annule la transaction.
          • Détecter les writes qui affectent les reads d'une autre transaction en plaçant une balise sur l'index concerné pour indiquer que plusieurs transactions utilisent la donnée. Au moment de commiter, la BDD vérifie qu'il n'y a pas de conflit par rapport au write fait par la transaction qui avait été marquée. Si oui on annule la dernière qui veut commiter. Le marquage peut être enlevé quand la situation de concurrence est résolue.
      • Au niveau de la performance, plus la BDD est précise sur quelle transaction doit être annulée, et plus ça lui prend du temps. D'un autre côté si elle en annule trop ça fait plus de transactions annulées.
      • Comparé au 2PL on a quelque chose de plus performant mais aussi de plus prédictible, vu que les requêtes n'ont pas à attendre qu'une longue requête ait terminé. Et si on a une forte charge de lectures c'est parfait aussi puisqu'elles ne sont jamais bloquées.
      • Comparé à l'exécution vraiment en série, on n'est pas limité au CPU d'une seule machine, FoundationDB distribue la détection des conflits sur plusieurs machines.
      • Globalement, vu qu'une transaction peut vite voir ses prémisses invalidées par d'autres, pour qu'on n'ait pas beaucoup d'annulation de transactions, il faut que celles-ci soient assez courtes et rapides. Mais d'un autre côté, 2PL et l'exécution sériale ne font pas mieux avec les transactions longues.

8 - The trouble with distributed systems

  • Les fautes partielles :
    • Le souci avec les systèmes distribués c'est qu'ils peuvent agir de manière non déterministe, et qu'une partie du système peut être en échec alors que le reste fonctionne. C'est une chose dont on n'a pas l'habitude dans un seul ordinateur.
    • Les superordinateurs choisissent en général d'écrire des checkpoints en DD, et d'arrêter tout le système pour réparer le composant problématique en cas de panne, pour ensuite reprendre là où ça en était à partir du checkpoint.
      • Les systèmes distribués de type “cloud” ou “web” sont à l'opposé :
        • ils sont trop gros pour tolérer d'éteindre à chaque panne, et ils ne peuvent de toute façon pas tolérer d'arrêter le service
        • ils utilisent du matériel bon marché pour scaler
        • ils sont répartis à travers le globe, utilisant le réseau internet qui est très peu fiable comparé à un réseau local.
    • Il faut que la gestion des problèmes matériels fasse partie du design de notre système.
  • Le réseau :
    • Le réseau internet (IP) est construit de manière à être peu fiable de par sa nature asynchrone. Un paquet peut à tout moment être perdu, corrompu, mettre beaucoup plus de temps à arriver etc. pour diverses raisons, parce qu'il passe par des dizaines de nœuds divers et variés qui peuvent être surchargés, débranchés, mal configurés etc.
      • On a des protocoles comme TCP construits par dessus pour corriger ça et renvoyer les paquets perdus ou corrompus.
      • Quand on envoie un paquet, on ne sait pas s'il a été reçu ou pas. Au mieux on peut demander au destinataire de répondre, mais s'il ne répond pas on ne sait pas ce qu'il s'est passé. Tout ce qu'on peut faire c'est avoir un timeout, et considérer l'échec après le timeout.
    • C'est le cas d'internet qui est peu fiable, mais le réseau ethernet local est également asynchrone. Donc les messages échangés entre les ordinateurs d'un même datacenter sont aussi prompts aux corruptions et pertes.
      • Une étude a trouvé qu'il y a 12 fautes réseau par mois dans un datacenter moyen.
      • Ajouter de la redondance ne règle pas autant de problèmes qu'on le croit puisqu'il y a aussi les erreurs humaines des ops qui sont nombreuses
    • La détection des machines en état de faute est difficile, mais il y a des moyens :
      • Si le processus applicatif est mort mais que l'OS tourne, la machine répondra peut-être par un message TCP indiquant qu'elle refuse les connexions.
      • Dans le même cas, la machine peut aussi avertir les autres nœuds que son processus applicatif est mort. HBase fait ça.
      • Dans le cas spécifique d'un datacenter, on peut avoir accès aux switches réseaux pour avoir certaines informations sur l'état connu de certaines machines qui ne répondent plus depuis un certain temps.
      • De même avec les routeurs qui peuvent immédiatement répondre que telle ou telle machine est injoignable si on les interroge.
    • La question de la valeur du timeout est une question particulièrement épineuse et pas simple. Une des manières est de tester en environnement réel et d'ajuster en fonction des performances.
      • Cet ajustement peut être automatique, Akka et Cassandra font ça.
    • La congestion du réseau est souvent causée par des problèmes de queuing diverses :
      • au niveau des switchs
      • au niveau des machines si tous les CPU sont occupés
      • TCP qui fait du queuing pour éviter la corruption de paquets, et qui retente l'envoie du paquet de manière transparente (ce qui prend du temps)
    • Ne pourrait-on pas rendre la communication fiable du point de vue matériel ?
      • Pour ce faire, il faudrait qu'elle soit synchrone. C'est le cas du réseau téléphonique à commutation de circuit, qui alloue une ligne permettant d'envoyer une quantité fixe de données de manière régulière. Les divers switch et autres éléments réseaux qui établissent cette communication allouent cette quantité pour que le transfert puisse se faire.
      • Dans le cas des communications autres que stream audio / vidéo, on ne sait pas à l'avance quelle quantité de données on voudra, ni quand on voudra faire le transfert. La commutation par paquets permet de ne rien envoyer quand il n'y a pas besoin, et d'envoyer des paquets de taille variable quand c'est nécessaire. Le prix c'est que le réseau n'est pas en train de nous allouer de la place en permanence, et qu'il y a du queuing.
      • C'est donc bien un choix d'allocation dynamique et non pas de réservation statique des ressources réseaux qui fait qu'on utilise toutes nos ressources disponibles mais avec des délais variables. On fait ce genre de choix aussi pour l'allocation dynamique des CPU vis à vis des threads.
  • Les clocks : les clocks des ordinateurs sont globalement peu fiables, et d'autant moins dans un contexte d'ordinateurs distribués.
    • Il y a 2 types de clocks sur un ordinateur :
      • Les time-of-the-day clocks : ils renvoient le temps courant, en général sous forme d'entier depuis l'epoch (1er janvier 1970).
        • Vu qu'ils sont synchronisés par NTP (network time protocol), on peut régulièrement avoir des sauts dans le temps, et donc pour mesurer des durées c'est pas le top.
      • Les monotonic clocks : ils renvoient une valeur arbitraire, mais garantissent qu'après un certain temps, la valeur renvoyée sera l'ancienne + le temps écoulé
    • A propos de la précision :
      • Google suppose que les clocks de ses machines se décalent de l'équivalent de 17 secondes pour un clock resynchronisé une fois par jour.
      • Le protocole de mise à jour des clocks NTP ne peut pas être plus précis que le temps de latence d'envoi/réception des messages (une expérimentation a montré un minimum de 35 ms pour une synchronisation via internet).. Et en cas de congestion du réseau c'est pire.
      • Dans les machines virtuelles le CPU est partagé, donc on peut se retrouver avec des sauts bizarres dans le clock à cause de ça.
      • En cas de besoin, on peut mettre en place des infrastructures de haute précision qui se mettent à jour par GPS, mais c'est coûteux. C'est ce qui est fait sur les machines de trading à haute fréquence.
      • En fait, il faudrait voir le clock plutôt comme un intervalle que comme un temps. Malheureusement la plupart des API ne le présentent pas comme ça.
        • Une exception est constituée par l'API TrueTime de Google Spanner, qui renvoie un groupe de 2 valeurs : [earliest, latest].
          • Dans le cas particulier de Google, en partant du principe que les intervalles de confiance sont fiables, si deux intervalles pour deux requêtes ne se chevauchent pas, alors on est sûrs que la requête avec l'intervalle plus récent a eu lieu après l'autre. Google utilise ça pour faire de la snapshot isolation dans un environnement distribué, mais pour ça il équipe chaque datacenter d'une réception GPS ou d'une horloge atomique, sans quoi les intervalles seraient trop grands. En dehors de Google cette solution basée sur le temps n'est pour le moment pas viable.
    • Contrairement à un CPU ou une carte réseau, quand un clock est défectueux la machine peut quand même donner l'impression que tout va bien, et faire des erreurs qui se voient beaucoup plus difficilement.
      • C'est en particulier problématique si on se sert des clocks pour faire des timestamps pour vérifier quelle transaction a eu lieu la 1ère dans un système distribué. Et c'est encore plus problématique avec du LWW (last write wins) : si on noeud a son clock qui retarde, tous ses messages finiront par être rejetés en faveur de ceux des autres nœuds parce que considérés comme anciens.
      • Plutôt que les clocks physiques, il faut utiliser des clocks logiques, c'est-à-dire des techniques pour détecter l'ordre des choses plutôt que le moment où elles ont eu lieu.
    • Un thread peut se mettre en pause pendant un temps indéterminé pour des raisons très variées : le garbage collector du langage, la machine virtuelle, l'OS qui a besoin de le mettre en pause pour faire autre chose etc. Dans un système distribué “shared nothing” il n'y a pas de mémoire partagée, donc il faut partir du principe qu'un nœud peut se retrouver arrêté pendant que le monde autour de lui aura continué.
      • Il existe des systèmes appelés temps réel (real time, ou hard real time pour bien insister sur l'aspect contrainte de temps à respecter absolument). Ces systèmes sont pensés et testés sous tous les angles pour respecter un certain nombre de contraintes de temps de réponse. On les utilise principalement dans les machines où le temps est crucial (par exemple le déclenchement d'un airbag).
      • Pour le problème spécifique du garbage collector, certains systèmes demandent à leur nœud de prévenir quand il y a un besoin de garbage collection, et au besoin redirigent le trafic vers d'autres nœuds en attendant que ce soit fait. Ça permet de réduire pas mal les problèmes de pause non voulue de l'application.
  • Savoir, vérité et mensonge :
    • Dans des conditions aussi difficiles que les systèmes distribués où on ne peut rien savoir de certain sauf à travers les messages qu'on reçoit ou ne reçoit pas, on peut quand même créer des systèmes qui fonctionnent : il est possible d'avoir quelque chose de fiable construit sur des bases offrant peu de garanties, à conditions que le modèle de système qu'on a choisi convienne.
    • La vérité dans un contexte distribué est déterminée par la majorité. Pour éviter la dépendance à un noeud particulier, et étant donné qu'un noeud, quel qu'il soit, ne peut pas faire confiance à sa propre horloge vu qu'il peut entrer en pause à tout moment sans le savoir, on décide de mettre en place des quorums pour qu'une majorité de noeuds décident par exemple si un noeud est mort ou non.
      • Il faut bien s'assurer que and on noeud pense être doté d'une responsabilité (il est le leader, il a le lock sur un objet etc.), il se fie quand même à ce que disent la majorité des noeuds : s' ils lui disent qu'il n'a plus la responsabilité en question, alors il faut qu'il accepte de se comporter comme tel, sous peine d'inconsistances dans le système.
      • Pour garantir qu'un lock soit bien respecté, on peut utiliser un lock service qui fournit un token incrémental à chaque lock. Si le nœud a son temps alloué qui a expiré, et qu'il essaye d'écrire alors qu'un autre a déjà écrit à sa place, son token sera rejeté par le lock service. On parle de fencing token.
        • ZooKeeper permet de fournir ce genre de fencing token s'il est utilisé comme lock service.
    • Jusqu'ici on est parti du principe que les noeuds peuvent de plus répondre, ne pas savoir qu'ils n'ont plus une certaine responsabilité, ou échouer. Mais qu'ils restent “honnêtes” au sens où ils ne vont pas dire qu'ils ont reçu un message alors qu'ils ne l'ont pas reçu, ou encore falsifier un fencing token. De tels cas de corruption s'appellent une Byzantine fault.
      • Ça vient du Byzantine Generals Problem où on imagine dans la ville antique de Byzance, des généraux de guerre essayent de se mettre d'accord, et communiquant par messager, mais où certains généraux mentent sans se faire découvrir.
      • On imagine donc que certains nœuds peuvent être complètement corrompus jusqu'à ne plus suivre le protocole attendu du tout, par exemple dans le cas d'un logiciel dans un contexte aérospatial soumis à des radiations.
      • Ou alors se mettent carrément à tricher intentionnellement, soit à cause d'un piratage, soit plus classiquement un contexte de communication inter-organisations, où les organisations ne se font pas confiance.
        • C'est le cas par exemple pour la blockchain où les participants ne se font pas confiance puisque n'importe lequel pourrait essayer de tricher.
      • Dans notre cas habituel de serveurs web, on part du principe que le client final derrière son navigateur pourrait être malicieux, mais sinon les serveurs de l'organisation sont fiables. Et on ne met pas en place de mécanismes contre les problèmes de fautes byzantines, parce que c'est trop compliqué.
      • La plupart des algorithmes contre les fautes byzantines comptent sur le fait que la majorité des nœuds ne vont pas être infectés par le problème, et donc pourront garder le contrôle contre la minorité du réseau corrompue. Donc ça peut être utile dans un contexte d'application peer-to-peer, mais si on charge notre version du logiciel dans tous les nœuds, ça ne nous protégera pas des bugs. Et de même si un hacker prend le contrôle d'un nœud, on imagine qu'il pourra aussi prendre le contrôle des autres nœuds.
      • On peut néanmoins se prémunir contre des formes modérées de mensonges avec quelques astuces :
        • Faire un checksum des paquets pour vérifier qu'ils n'aient pas été corrompus. TCP/UDP le font mais parfois laissent passer.
        • Vérifier la validité de toutes les données entrées par l'utilisateur.
        • Dans le cas de la mise à jour depuis des serveurs NTP, faire des requêtes auprès de plusieurs serveurs, pour que ceux qui n'ont pas une bonne valeur soient rejetés.
    • Notre système doit prendre en compte les problèmes matériels qu'on a décrits, mais il ne doit pas non plus être complètement dépendant du matériel exact sur lequel il tourne pour pouvoir changer le matériel. On va donc créer une abstraction qui est le system model.
      • Concernant les considérations liées au temps, on en a 3 :
        • Synchronous model : on part du principe que les erreurs réseau, de clock ou les pauses de processus sont limités à certaines valeurs définies; En pratique la réalité ne colle pas à de modèle.
        • Partially synchronous model : on part du principe que le système se comporte de manière synchrone la plupart du temps, sauf parfois où il déborde. Ce modèle correspond beaucoup mieux à nos systèmes web distribués.
        • Asynchronous model : on est beaucoup plus restrictif puisqu'on considère qu'aucune notion de temps ne peut être fiable. Et donc on ne peut pas utiliser de timeouts non plus.
      • Concernant les considérations liées aux échecs de noeuds, il y en a aussi 3 :
        • Crash-stop faults : on considère que si un nœud fait une faute, on l'arrête et c'en est fini de lui.
        • Crash-recovery faults : les nœuds peuvent être en faute, puis revenir en état correct un peu plus tard. C'est ce modèle qui nous est en général le plus utile pour nos systèmes web.
        • Byzantine (arbitrary) faults : les nœuds peuvent faire absolument n'importe quoi.
      • Pour définir qu'un algorithme d'un system model distribué est correct, il peut avoir deux types de propriétés :
        • safety : il s'agit d'une propriété qui dit que rien de mal ne doit se passer. Par exemple, la uniqueness de quelque chose. Si une telle propriété est rompue, c'est parce que chose a été violée et qu'il y a eu un dommage non réparable.
        • liveness : il s'agit d'une propriété qui dit qu'une chose attendue doit arriver. Par exemple l'availability, le fait de recevoir une réponse. Si une telle propriété est rompue, c'est que ce qui était attendu n'a pas eu lieu, mais pourrait avoir lieu plus tard
      • Il est courant de demander à ce que les caractéristiques de safety soient respectées dans tous les cas, même si tous les nœuds crashent, un mauvais résultat ne doit pas être retourné. Pour les caractéristiques de liveness, on peut demander à ce qu'elles soient respectées seulement dans certains cas, par exemple si un nombre suffisant de nœuds est encore en vie.
      • Enfin, il faut bien garder en tête qu'un system model n'est qu'un modèle. Dans la réalité, on sera amené à rencontrer des erreurs non prévues. Et à l'inverse, sans raisonnement théorique, on pourrait avoir des erreurs dans nos systèmes pendant longtemps sans s'en rendre compte. Les deux sont aussi importants l'un que l'autre.
        • C'est la différence entre le computer science (théorique), et le software engineering (pratique).

9 - Consistency and Consensus

  • Pour rendre un système tolérant aux fautes, il faut introduire des abstractions. C'est ce qu'on a fait avec les transactions par exemple en partant du principe qu'une transaction est atomique. Une autre abstraction intéressante est le consensus : faire en sorte que les nœuds se mettent d'accord.
  • La consistency est une question importante à laquelle on peut apporter différents niveaux de garantie. Comme avec l'isolation où il s'agissait de traiter la concurrence entre deux transactions, avec la consistance il s'agit de coordonner l'état des réplicas vis-à-vis des délais (replication lag) et des fautes.
  • La linearizability consiste en une abstraction qui donne l'illusion que le replication lag n'existe pas, qu'il n'y a en fait qu'une seule copie des données : dès qu'une copie a été faite, le système doit se comporter comme si cette donnée la plus récente était lisible depuis partout.
    • Une des conséquences c'est qu'il faut que quand une lecture a été faite avec une valeur, ce soit cette valeur qui soit retournée par tous les réplicas à partir de ce moment. Si une écriture a lieu entre temps ça peut être cette nouvelle valeur écrite, mais certainement pas une valeur plus ancienne.
      • On doit pouvoir éviter le cas où une personne recharge la page et voit que le match a été gagné par telle équipe, et juste après une autre personne affiche la page, et voit que le match est toujours en cours.
      • En revanche, il n'y a pas de contraintes de délais : si l'écriture prend du temps c'est pas grave. Et si deux transactions sont concurrentes et que l'une arrive avant l'autre c'est pas grave non plus.
    • Linearizability vs serializability : la serializability est une notion d'isolation pour pouvoir garantir la manipulation de plusieurs objets au sein d'une même transaction, sans être gêné par les autres transactions. La linearizability consiste à renvoyer systématiquement le résultat le plus récent à chaque lecture une fois que celui-ci a été lu au moins une fois.
      • Le 2-phase locking et l'actual serialization garantissent aussi la linearizability. En revanche la Serializable snapshot isolation ne la garantit pas puisqu'elle va créer des snapshots pour les transactions, et ne pas inclure les writes récents dans ces snapshots (ce qui peut facilement résulter à ce que certaines transactions aient un write et d'autres pas).
    • Parmi les applications de la linearizability :
      • L'élection d'un nouveau leader est un problème où dès que le lock a été pris, il faut que personne d'autre ne puisse le prendre.
        • Apache ZooKeeper et etcd sont souvent utilisés comme système de lock pour implémenter l'élection de leader.
          • Apache Curator ajoute des choses par-dessus ZooKeeper.
      • Le fait de garantir qu'un nom d'utilisateur ne sera pas pris deux fois, ou encore qu'un compte en banque ne va pas en dessous de 0.
      • Dans le cas où on a 2 canaux de communication, l'un des canaux peut être plus rapide que l'autre : par exemple si on écrit une image, et qu'on enqueue un message pour qu'une version thumbnail de l'image soit générée. Si le traitement du message dans la queue est plus rapide que le temps qu'on met à écrire l'image entière, la thumbnail risque d'être faite à partir d'un fichier partiel. Il faut donc s'assurer de l'ordre de ce qui est fait dans ces 2 canaux.
    • La linearizability parmi les systèmes de réplication connus :
      • Single-leader replication : ça pourrait être linearizable si la BDD n'utilise pas de snapshot isolation. Mais il reste le problème de savoir qui est le leader, et dans le cas de réplication asynchrone on peut perdre des données au failover.
      • Consensus algorithms : ces algorithmes permettent d'implémenter la linearizability en répondant aux problèmes soulevés dans la single-based replication. On va y revenir.
        • C'est comme ça que fonctionnent ZooKeeper et etcd.
      • Multi-leader replication : ces systèmes ne sont pas linearizable puisqu'il y a des écritures concurrentes qui sont résolues après coup.
      • Leaderless replication : certains affirment qu'en respectant la règle du quorum consistency, on peut obtenir une linearizability sur des BDD Dynamo-style. Mais ce n'est en général pas vrai.
        • Riak ne fait pas de read repair à cause du manque de performance de cette technique, et il la faudrait pour la linearizability.
        • Cassandra fait le read repair, mais il perd la linearizability à cause de son algo last write wins qui cause des pertes de données.
    • Si on part de l'exemple de la multi-leader replication, on constate que c'est pratique parce que si la connexion est rompue entre deux datacenters, les deux peuvent continuer indépendamment, et se resynchroniser dès que la connexion est rétablie. On a alors une grande availability du système, mais on ne respectera pas la linearizability. A l'inverse si on reste en single-leader, le datacenter déconnecté du datacenter leader se verra inopérant jusqu'à rétablissement du réseau. Mais on garde la linearizability.
      • Le CAP theorem décrit cette problématique et a permis en son temps d'ouvrir la discussion, mais il est fondamentalement inutile de nos jours.
    • Dans la pratique, de nombreuses BDD n'implémentent pas la linearizability parce que ça coûte trop cher en performance. Il n'y a malheureusement pas d'algorithme qui permette d'avoir de la linearizability sans ce problème de performance qui est d'autant plus grand qu'il y a beaucoup de délais dans le réseau.
  • Garanties d'ordre d'exécution :
    • La causal consistency (causalité) est au cœur des problématiques des systèmes distribués faisant fonctionner des applications qui ont du sens.
      • Respecter la causalité n'implique pas forcément un total order (ordonnancement total) de tous les éléments, mais uniquement de ceux liés entre eux par une relation cause / conséquence.
        • La linearizability quant à elle, implique un total order. Elle est donc une contrainte plus forte que la causal consistency.
        • C'est trop récent à l'époque du livre pour être dans des systèmes en production, mais il y a de la recherche sur des techniques permettant de détecter la causalité sans total order. Par exemple une généralisation des version vectors.
    • La causal consistency coûte quand même cher s'il faut traquer toutes les transactions et leurs relations. On peut sinon utiliser des sequence numbers pour créer un clock logique permettant de définir un ordre total.
      • Sur une configuration single-leader, il suffit d'incrémenter un compteur à chaque opération au niveau du leader.
      • Dans le cas où il y a plusieurs leaders, on a d'autres solutions:
        • Générer des sequence numbers différents pour chaque nœud (par exemple pair pour l'un, impairs pour l'autre).
        • Utiliser un clock physique.
        • Allouer des plages à chaque nœud, par exemple 0 à 1000 pour l'un, 1000 à 2000 pour l'autre etc.
        • Malheureusement certains nœuds peuvent aller plus vite que d'autres, et ces techniques ne garantissent pas la causalité dans le système.
      • La causalité peut être assurée dans un environnement multi-leader grâce aux Lamport timestamps. Il s'agit d'une idée de Leslie Lamport dans un des papiers les plus cités des systèmes distribués.
        • Le principe est d'avoir un compteur normal par nœud, et pour le rendre unique on l'associe à un chiffre représentant le nœud lui-même. Et l'astuce de la technique consiste à ce que chaque nœud et chaque client garde en mémoire la valeur la plus élevée de compteur qu'il connaisse. Et quand il a connaissance de la valeur d'un autre compteur plus élevé que celui qu'il connaissait au détour d'une opération, il met immédiatement à jour le compteur du nœud sur lequel il fera la prochaine opération avec cette valeur-là.
        • Cette technique permet de respecter la causalité, mais aussi un total ordering.
        • Malheureusement ça ne règle pas tous nos problèmes : même avec un total order, on ne peut pas savoir sur le moment si un nom d'utilisateur unique est en passe d'être pris par un autre nœud ou non pour savoir sur le moment s'il faut l'autoriser soi-même ou non. Avec le temps et les opérations, on finira par avoir un ordonnancement total, mais pour le moment non.
          • C'est l'objet du total order broadcast.
    • Le total order broadcast nécessite qu'aucun message ne soit perdu, et que tous les messages soient délivrés à tous les nœuds dans le même ordre.
      • La connexion peut être interrompue, mais les algorithmes de total order broadcast doivent réessayer et rétablir l'ordre des messages dans tous les nœuds quand le réseau est rétabli.
      • ZooKeeper et etcd implémentent le total order broadcast.
      • A noter aussi que le total order broadcast maintient l'ordre tel qu'il est au moment de l'émission des messages, donc c'est plus fort que le timestamp ordering.
      • On peut voir ça comme un log de messages transmis à tous les nœuds dans le bon ordre.
      • On peut ainsi implémenter la linearizability à partir d'un système respectant le total order broadcast.
        • Pour l'écriture :
          • On ajoute un message au log disant qu'on voudrait écrire
          • On lit le log et on attend que notre message nous parvienne
          • Si le premier message concernant ce sur quoi on voulait écrire est le nôtre, alors on peut valider l'écriture dans le log.
        • Pour la lecture :
          • On peut faire pareil qu'avec l'écriture : ajouter un message indiquant qu'on veut lire, attendre de le recevoir, puis faire la lecture en fonction de l'ordre indiqué dans le log.
            • C'est comme ça que ça marche dans les quorum reads, dans etcd.
          • On peut demander à avoir tous les messages de log liés à une lecture puis faire la lecture à partir de là.
            • C'est comme ça que fonctionne la fonction sync() de ZooKeeper.
          • On peut lire à partir d'un réplica synchrone avec le leader (en cas de single-leader), dont on est sûr qu'il a les données les plus récentes.
      • A l'inverse, on peut aussi implémenter un système total order broadcast à partir d'un système linéarisable : il suffit d'avoir un compteur linéarisable qu'on attache à chaque message envoyé via total order broadcast.
    • On peut enfin noter qu'à la fois la linearizability et le total order broadcast sont tous deux équivalents au consensus.
  • Le consensus consiste en la possibilité pour les nœuds de se mettre d'accord sur quelque chose (on pense par exemple à l'élection de leader, ou à l'atomic commit problem où il faut choisir entre garder ou non une transaction présente sur certains nœuds), alors même que des noeuds peuvent être en faute à tout moment.
    • C'est un sujet très subtil et complexe.
    • Le FLP result est un résultat théorique montrant que le consensus est impossible dans un system model asynchrone. Dans la pratique, à l'aide de timeouts (même s'ils peuvent être parfois faussement positifs), on arrive à atteindre le consensus.
    • Quand une transaction est écrite en BDD, il est hors de question de la retirer par la suite parce qu'elle a pu être prise en compte par d'autres transactions. Il faut donc bien réfléchir avant d'entrer définitivement l'écriture en BDD.
    • Le two-phase commit (2PC) est un algorithme de consensus implémenté dans certaines BDD.
      • 2PC n'est pas très bon, des algorithmes plus modernes existent chez ZooKeeper (Zab) et etcd (Raft).
      • Attention à ne pas confondre 2PC avec 2PL (2 phase lock) qui permet l'isolation pour la sérialisation, le mieux est d'ignorer le rapprochement de leur nom.
      • Fonctionnement :
        • on a besoin d'un nouveau composant : le coordinator.
        • Lors d'une transaction, après les lectures / écritures, quand on veut inscrire vraiment tout ça en BDD, le coordinator va procéder en 2 étapes :
          • demander successivement à chaque nœud si il est prêt à faire un commit et attendre leur réponse.
          • si oui, faire le commit, sinon annuler la transaction.
        • L'idée c'est que lors de la 1ère phase, quand le coordinator demande si les nœuds sont prêts, en fait il leur demande aussi de tout préparer pour que même en cas de crash rien ne soit perdu de leur côté. La seule chose qui leur resterait à faire alors serait de valider les données déjà mises en forme pour aller dans la BDD.
        • Lorsque le coordinator prend sa décision finale de faire s'exécuter ou d'annuler la transaction en phase 2, alors il l'écrit localement et passe un point de non-retour. A partir de là il réessayera en permanence de faire finaliser la transaction auprès de tout nœud qui deviendrait indisponible à partir de ce moment-là.
        • Dans le cas où le coordinator crash juste après avoir demandé aux noeuds de se préparer fait que les noeuds doivent rester en attente. Ils ne peuvent pas unilatéralement prendre de décision de valider ou annuler une transaction chacun de leur côté. La solution est d'attendre que le coordinator revienne, lise ce qu'il avait décidé sur son fichier de log, et envoie les messages qui conviennent.
          • Il existe un autre algorithme appelé 3-phase commit (3PC) qui résout le problème de l'aspect bloquant lié à l'attente du commit du coordinator, mais il implique des temps de réseaux bornés. Or nos réseaux habituels sont imprévisibles. Pour cette raison, c'est le 2PC qui continue d'être utilisé.
          • Le souci de ce cas c'est surtout le lock au niveau de la BDD, souvient sur les entrées concernées par la transaction. Si le coordinator ne revient jamais ou que les logs sont perdus, alors on peut se retrouver face à des locks orphelins, et un administrateur humain devra manuellement résoudre ces conflits, puisque les locks sont censés survivre même à un redémarrage de la BDD.
    • En pratique, les transactions distribuées sont souvent décriées parce qu'elles coûteraient trop par rapport à ce qu'elles apporteraient.
      • Les transactions distribuées utilisant MySQL sont connues pour être 10 fois plus lentes que les mêmes transactions sur un seul nœud.
      • Il y a deux types de transactions distribuées : celles qui sont implémentées par une même BDD qui tourne sur plusieurs nœuds, et celles qui consistent à faire communiquer des technologies hétérogènes d'un nœud à un autre. Les dernières sont bien plus compliquées.
      • Les transactions hétérogènes peuvent faire communiquer par exemple une BDD et un message broker, et ne commiter que si tout a marché, et annuler tout sinon.
        • XA (eXtended Architecture) est justement un protocole qui permet d'implémenter le 2PC dans des technologies hétérogènes. Il s'agit d'une API en C qui se connecte aux programmes qui s'exécutent sur une machine.
          • Il est supporté par de nombreuses BDD : PostgreSQL, MySQL, DB2, SQL Server, Oracle.
          • Et par plein de message brokers : ActiveMQ, HornetQ, MSMQ, IBM MQ.
          • XA a cependant des limitations, par exemple il ne permet pas de détecter les deadlocks, et ne supporte pas le SSI (serializable snapshot isolation).
      • Il faut noter quand même que le coordinateur est souvent un single point of failure vu qu'il contient lui-même des données persistantes cruciales pour le fonctionnement du système. Mais étonnamment les possibilités de le rendre réplicable sont en général rudimentaires.
      • 2PC a quand même un point problématique aussi, c'est qu'il a tendance à amplifier les failures, puisque dès qu'un nœud ne répond pas on va annuler la transaction. C'est pas très “fault tolerant” tout ça.
    • Le consensus tolérant les fautes :
      • On peut citer 4 propriétés définissant le consensus :
        • 3 de safety :
          • Uniform agreement : tous les nœuds doivent arriver au même choix.
          • Integrity : aucun nœud ne décide deux fois.
          • Validity : le choix décidé est valide.
        • Et une de liveness :
          • Termination : les nœuds ne se retrouvent pas bloqués, même en cas de crash de certains d'entre eux. Ils évoluent vers la terminaison du processus de choix.
            • 2PC ne remplit pas cette condition puisque le coordinator peut bloquer le système en cas de faute.
      • Les algorithmes de consensus tolérants aux fautes sont difficiles à implémenter (donc on ne va pas les implémenter nous-mêmes mais utiliser des outils qui les implémentent).
        • Ce sont les suivants :
          • Viewstamped replication (VSR)
          • Paxos
          • Raft
          • Zab
        • Ces algorithmes sont de type total order broadcast.
        • A chaque tour les nœuds décident du prochain message à traiter, et décident par consensus.
        • Pour le remplacement des leaders les nœuds utilisent des timeouts, et lancent une élection avec un quorum. Et c'est seulement quand le nœud a bien reçu le message de la majorité qu'il sait qu'il est bien le leader.
        • Ces algorithmes sont encore un sujet de recherche, et ont parfois des edge cases problématiques qui basculent le leader entre 2 nœuds, ou qui forcent en permanence le leader à renoncer.
    • Services de coordination :
      • Les services comme ZooKeeper sont rarement utilisés directement par les développeurs. On va plutôt les utiliser à travers d'autres services comme HBase, Hadoop YARN, OpenStack Nova et Kafka.
      • Ce sont en gros des stores de clé-valeur qui tiennent en RAM.
      • ZooKeeper a notamment ces caractéristiques :
        • Linearizable atomic operations : à l'aide d'un lock, une seule opération parmi les opérations concurrentes peut réussir.
        • Total ordering of operations : les fencing tokens permettent de préserver l'ordre des transactions.
        • Failure detection : les nœuds ZooKeeper et les autres nœuds s'envoient des messages régulièrement, et en cas de timeout déclarent le nœud échoué.
        • Change notifications : les clients (les autres nœuds) peuvent s'abonner à des changements spécifiques des autres nœuds à travers ZooKeeper, ce qui évite de faire des requêtes pour voir où ça en est.
      • ZooKeeper est pratique pour des informations qui changent toutes les minutes ou heures comme l'association d'une adresse ip à un leader.
        • Si on veut répliquer l'état d'une application qui peut nécessiter des milliers ou millions de changements par seconde, on peut utiliser des outils comme Apache BookKeeper.
      • ZooKeeper fait partie des membership services, issu d'une longue recherche depuis les années 80. En couplant le consensus avec la détection de fautes, ils permettent d'arriver à une certaine connaissance de qui composent les membres du réseau.
  • Intégrer des systèmes disparates ensemble est l'une des choses les plus importantes à faire dans une application non triviale.
  • Les données sont souvent classées en 2 catégories qu'il est de bon ton d'expliciter :
    • Les systems of record qui sont les données de référence.
    • Les derived data systems qui sont en général des données dénormalisées, par exemple stockées dans un cache.

10 - Batch Processing

  • Il existe 3 types de systèmes :
    • Services (online systems) : un client envoie un message et reçoit une réponse. En général, le temps de réponse et la disponibilité (availability) sont très importants.
    • Batch processing systems (offline systems) : des tâches de fond, souvent exécutées périodiquement, durant plusieurs minutes voire plusieurs jours. La performance se mesure par la quantité de données traitées.
    • Stream processing systems (near-real-time systems) : il s'agit d'une forme particulière de batch processing. On ne répond pas à une requête d'un client humain, mais on réagit à un événement assez rapidement après qu'il ait eu lieu.
  • Le batch processing avec les outils Unix :
    • L'outil sort d'Unix va automatiquement prendre en charge des données plus grandes qu'il n'y a de mémoire vive, mettre ça en disque pour faire les opérations, et paralléliser au niveau CPU.
    • La philosophie unix est très proche de l'agile et du devops. On casse les gros problèmes en petits, on fait de petits programmes qui font une chose et la font bien. On fait des itérations courtes.
    • Une des clés de la puissance des outils unix est l'interface uniforme, permettant de les composer ensemble. De nos jours c'est plutôt l'exception que la norme parmi les programmes.
    • On a également une séparation entre la logique et le câblage des données grâce à stdin et stdout.
    • Les outils unix sont très pratiques pour l'expérimentation : les entrées sont immuables, et on peut envoyer la sortie vers un less par exemple.
    • Mais le plus souci c'est que les outils unix ne marchent que sur une machine, pas sur des architectures distribuées.
  • MapReduce est un modèle assez bas niveau de batch processing, connu pour être l'algorithme qui permet à Google d'être aussi scalable.
    • Il ressemble aux outils unix mais sur des architectures distribuées.
    • Il prend des inputs, et envoie le résultat dans des outputs.
    • Les inputs ne sont normalement pas modifiés et il n'y a pas de side-effects autre que les outputs.
    • Les fichiers d'output sont écrits de manière séquentielle.
    • Alors que les outils unix écrivent dans stdout, MapReduce écrit dans un système de fichiers distribués.
      • Hadoop utilise HDFS (Hadoop distributed File System), qui est une implémentation open source de Google File System.
      • Il en existe d'autres comme GlusterFS, Quantcast File System (QFS).
      • D'autres services sont similaires : Amazon S3, Azure Blob Storage, OpenStack Shift.
      • Fonctionnement de HDFS :
        • HDFS est basé sur une approche shared-nothing, c'est-à-dire qu'il lui suffit d'ordinateurs connectés par un réseau ip classique.
        • Un démon tourne sur chaque nœud et expose les fichiers qui sont sur ce nœud. Et un serveur central appelé NameNode contient des références vers ces fichiers.
        • Il y a de la réplication entre les nœuds.
        • De cette manière HDFS est capable de faire fonctionner des dizaines de milliers de machines et des petabytes de données.
    • Le fonctionnement se fait en 4 étapes :
      • 1- On lit des fichiers et on les structure sous forme d'entrées.
        • C'est le parser qui s'en charge.
      • 2- on appelle la fonction mapper pour extraire des clés-valeurs
        • Il s'agit ici d'une fonction où on peut ajouter du code à nous. La fonction est appelée une fois par entrée et permet d'extraire de la manière souhaitée les clés-valeurs.
      • 3- on trie les clés-valeurs par clés
        • C'est fait automatiquement.
      • 4- on appelle la fonction reducer pour faire notre action sur les clés-valeurs
        • Là encore on peut ajouter du code à nous. On a en paramètre toutes les valeurs associées à une clé et on peut en faire ce qu'on veut en sortie.
    • On peut aussi enchaîner plusieurs MapReduce, l'un préparant les données en entrée pour l'autre.
    • MapReduce permet aussi de paralléliser les opérations de manière transparente pour le code. Comme il y a de nombreuses entrées à traiter, chacune peut s'exécuter localement sur la machine du réplica où elle est. Cela permet aussi d'éviter les transferts réseau en localisant les calculs.
    • Concernant le code custom des fonctions mapper et reducer, dans Hadoop elles sont écrites en Java, alors que dans MongoDB et CouchDB elles sont écrites en Javascript.
    • Contrairement aux outils Unix, MapReduce ne permet pas de chaîner directement ses jobs. Il faut plutôt écrire le résultat d'un job dans un dossier, puis donner ce dossier comme entrée au MapReduce suivant. C'est du moins comme ça que ça se passe dans Hadoop.
      • Du coup tout un tas d'outils permettent de coordonner les jobs MapReduce dans Hadoop : Oozie, Azkaban, Luigi, Airflow, Pinball.
      • D'autres outils haut niveau autour de Hadoop permettent également de gérer ce genre de choses : Pig, Hive, Cascading, Crunch, FlumeJava.
    • A propos des reduce-side joins avec MapReduce :
      • On ne va envisager les jointures que sur des tables entières pour notre cas qui concerne les batchs, typiquement quand on traite des BDD destinées à l'analyse des données.
      • Par exemple : si on a d'un côté des événements avec un user id, et de l'autre côté la table des users avec certaines de leurs caractéristiques. On va vouloir corréler les deux pour ne sélectionner que les faits d'un certain type d'utilisateurs.
      • Pour des raisons de performance, on va opter pour le plus de localité possible, et donc on ne va pas faire des accès random en traitant chaque entrée une par une là où elle est. On va plutôt copier la table des users dans le même filesystem HDFS que la table des faits, puis on va lire les deux conjointement.
      • La technique des sort-merge joins permet à plusieurs mappers de trier des données par la même clé (par exemple l'id de l'utilisateur pour des événements dont il est l'objet, et pour des données personnelles sur l'utilisateur), puis à un reducer de récupérer ces données et de les merger ensemble pour faire l'action qu'on voulait avec cette jointure.
        • Une fois que les mappers ont fait leur travail, chaque clé agit comme une adresse au sens où les valeurs d'une même clé vont être envoyées au même nœud pour que le reducer soit exécuté avec ces valeurs-là. Il y a bien une localité des données pour l'exécution du traitement.
        • D'une certaine manière on a séparé l'obtention des données du traitement des données, ce qui contraste avec la plupart des applications où on fait des requêtes en BDD en plein milieu du code.
      • Dans certains cas on peut se retrouver avec des hot keys par exemple des données liées aux followers de célébrités. Ceci peut donner trop de charge à un nœud de reducer, et les autres devront alors l'attendre pour que l'opération de MapReduce soit terminée.
        • Pour éviter ça on va détecter les hot keys et les traiter différemment des autres clés. On va les séparer dans plusieurs reducers différents sur plusieurs nœuds, et ensuite on fusionnera le résultat final.
          • Pig fait d'abord une opération pour déterminer les hot keys, et ensuite fait le traitement de la manière décrite.
          • Crunch a besoin qu'on lui dise explicitement les hot keys.
          • Hive a aussi besoin que les hot keys soient spécifiés explicitement dans une table de metadata séparée.
    • A propos des map-side joins :
      • Les reduce-side joins sont pratiques parce que les mappers lisent les données quelles qu'elles soient, préparent, trient et donnent ça aux reducers. Mais tout ceci coûte en terme de copies au niveau du DD.
        • Si nous avons des informations sur la structure des données, nous pouvons faire des map-side joins, où il s'agit simplement de tout faire dans les mappers et se débarrasser des reducers.
      • Un cas où c'est utile est quand on doit faire une jointure entre un grand dataset et un petit dataset, suffisamment petit pour que ça puisse être chargé dans la RAM de chaque mapper. Chaque mapper aura alors à disposition l'ensemble du petit dataset pour chercher les entrées qui l'intéressent.
        • On appelle cet algorithme le broadcast hash join.
        • Cette méthode est supportée par Pig, Hive, Cascading et Crunch, ainsi que la data warehouse Impala.
      • Dans le cas où on a deux tables partitionnées de la même manière et qu'on veut faire une jointure dessus, on peut faire le map-side join sur chacune des partitions, ce qui permet de ne charger en mémoire qu'une faible quantité de données.
      • Si les deux datasets sont en plus triés selon la même clé, alors on n'a pas besoin que l'une des deux entre en mémoire. Les mappers pourront chercher les données qui les intéressent du fait qu'elles sont triées de la même manière.
      • Le choix d'un map-side join ou d'un reduce-side join a un impact sur les données résultant du MapReduce : avec le map-side les données seront partitionnées de la même manière qu'elles l'étaient à l'input, alors qu'avec le reduce-side, les données seront partitionnées selon la clé de la jointure.
    • Le batch se rapprocherait plus des OLAP que des OLTP dans la mesure où il scanne une grande quantité de données, mais le résultat d'un batch sera une forme de structure et non pas un rapport à destination de data analysts.
      • Un exemple de batch est l'utilisation initiale de MapReduce par Google pour faire des indexes pour son moteur de recherche. Encore aujourd'hui MapReduce est un bon moyen de créer des indexes pour Lucene / Solr.
      • Un autre exemple est de construire des BDD key-value pour du machine learning, ou pour des systèmes de recommandation.
        • On pourrait penser que la bonne solution serait d'orienter la sortie du MapReduce vers notre BDD, entrée par entrée, mais c'est une mauvaise idée, à la fois pour des raisons de performance (localité des données, pas d'utilisation réseau, parallélisation des tâches) et d'atomicité du batch job.
        • La bonne solution consiste plutôt à créer une toute nouvelle BDD sur le filesystem distribué, de la même manière qu'on crée le fichier d'index.
          • Plusieurs systèmes de BDD supportent le fait de créer des fichiers de BDD à partir d'opérations MapReduce : Voldemort, Terrapin, ElephantDB, HBase.
          • Ces fichiers sont écrits une fois et demeurent ensuite read-only.
          • Les systèmes de BDD qui les supportent vont servir les anciennes données, commencer à copier ce fichier depuis le filesystem distribué vers le disque local, et dès que c'est fait switcher vers le fait de servir ces données-là.
    • MapReduce suit la philosophie Unix :
      • On peut rejouer une opération MapReduce autant de fois qu'on veut sans dommages pour les données d'entrée.
        • Si les données sont corrompues pour une raison éphémère, on retente l'opération.
        • Si c'est un bug logiciel, on le résout, et on refait la même opération encore.
      • On a une séparation de la logique, et du câblage pour décider où vont les données.
      • Par contre là où les outils unix font beaucoup de parsing parce que le format est le texte, Hadoop et compagnie peuvent utiliser Avro et Parquet pour permettre une évolutivité des schémas.
    • Comparaison entre Hadoop et les BDD distribuées :
      • Les BDD distribuées implémentant le massively parallel processing (MPP) avaient déjà la capacité de faire des jointures distribuées en parallèle depuis 10 ans quand MapReduce est sorti. La différence c'est qu'elles obligent les données à respecter un schéma prédéfini.
      • Par contraste, le modèle MapReduce a permis de collecter n'importe quelles données, y compris du texte, des images etc. et de les mettre tels quels, transférant alors le problème de l'interprétation de ces données au consommateur.
        • Ça s'appelle le sushi principle : raw data is better. Et ça permet par exemple de consommer la même donnée différemment selon les contextes.
        • On peut par exemple collecter les données, et dans une étape séparée utiliser un MapReduce pour réorganiser ces données de manière à les transformer en data warehouse.
      • Les BDD MPP sont efficaces pour le cas d'utilisation qu'elles prévoient : la manipulation des données via des requêtes SQL. En outre, ça fournit un bon niveau d'abstraction pour ne pas avoir à écrire de code.
        • D'un autre côté, tout ne peut pas être traité avec des requêtes SQL. Si on a des utilisations particulières comme du machine learning, des systèmes de recommandation, de recherche dans du texte etc. alors on a probablement besoin d'exécuter du code custom sur ces données. C'est ce que permet MapReduce.
        • Si on a MapReduce, on peut construire un modèle SQL par dessus. C'est ce qu'a fait Hive.
      • La versatilité permise par les raw data dans du Hadoop permettent d'implémenter du SQL, du MapReduce, mais aussi d'autres modèles encore.
        • On a des BDD OLTP comme HBase
        • On a des BDD analytiques comme Impala
        • Les deux utilisent HDFS mais pas MapReduce.
      • Deux autres différences :
        • La manière de gérer les fautes n'est pas la même : les systèmes MPP annulent la requête en cas de faute, alors que MapReduce va annuler une partie du job, peut être le mapper ou le reducer, et réessayer pour le terminer.
        • La gestion de la mémoire n'est pas la même : les systèmes MPP vont avoir tendance à stocker beaucoup en mémoire vive, alors que MapReduce va plutôt écrire sur disque dès que possible.
        • Ceci est en partie dû au fait que MapReduce a été fait par Google dans un contexte où les jobs de grande priorité et de faible priorité tournent sur les mêmes machines. En moyenne un job batch a 5% de chances d'être arrêté parce que ses ressources sont préemptées par un processus plus prioritaire. C'est aussi pour cette raison qu'on écrit sur disque dès que possible et qu'on tolère beaucoup les fautes.
          • Sans ce genre de contraintes de préemption, MapReduce pourrait se révéler moins pertinent dans sa manière de fonctionner.
  • Malgré le succès de MapReduce dans les années 2000, il y a d'autres modèles intéressants.
    • MapReduce, bien que simple à comprendre, n'est pas simple à mettre en œuvre. Par exemple, le moindre algorithme de jointure a besoin d'être refait from scratch.
    • Il existe un ensemble d'outils construits par-dessus MapReduce, et qui fournissent d'autres abstractions (Pig, Hive, Cascading, Crunch).
    • Il existe aussi des modèles complètement différents de MapReduce, et qui permettent d'obtenir de bien meilleures performances pour certaines tâches.
    • Contrairement aux programmes Unix, MapReduce fait de la matérialisation des états intermédiaires, c'est-à-dire que la sortie d'un MapReduce doit être complètement écrite avant de pouvoir être consommée par un autre processus. A contrario les programmes Unix mettent en place un buffer sous le forme du pipe qui permet au programme suivant de démarrer en consommant la sortie du précédent bout par bout au fur et à mesure.
      • Ceci a plusieurs désavantages :
        • Le fait de devoir attendre qu'un job MapReduce soit complètement terminé avant d'entamer le suivant est source de lenteur.
        • Souvent, le mapper ne sert qu'à lire le code déjà formaté correctement et est donc inutile. On pourrait alors chaîner plusieurs reducers.
        • Le fait que les états intermédiaires matérialisés soient sur le filesytem distribué veut dire qu'ils sont aussi répliqués, ce qui est plutôt overkill pour l'usage qu'on en fait;
      • Pour répondre à ces problèmes, des dataflow engines ont été développés.
        • Parmi les plus connus il y a Spark, Tez et Flink.
          • Tez est relativement petit, alors que Spark et Flink sont des frameworks plus gros, avec leurs propres couches réseau, scheduler, API.
        • Ils permettent :
          • de ne pas nécessairement faire l'étape de tri, ce qui permet de faire des économies quand l'ordre des entrées n'importe pas.
          • de chaîner les operators (qui remplacent les mappers et reducers) dans l'ordre souhaité, ce qui permet aussi d'éviter les mappers inutiles.
          • des optimisations locales, sans faire appel au réseau, et sans écrire dans le filesystem distribué HDFS quand ce n'est pas nécessaire. On ne matérialise donc pas forcément les états intermédiaires.
          • de commencer la prochaine opération dès que des données sont disponibles, et sans attendre que la précédente soit terminée.
        • On peut les utiliser pour faire les mêmes opérations qu'avec MapReduce, et comme les operators sont une généralisation des mappers et reducers, on peut switcher de MapReduce vers Spark ou Tez dans Pig, Hive ou Cascading.
      • Alors qu'avec MapReduce on avait une bonne tolérance aux fautes, avec Spark, Flink et Tez on doit trouver d'autres astuces :
        • Si la machine qui faisait le calcul est perdue, on trouve d'autres données qui permettent de reconstruire la donnée perdue : la liste des opérations appliquées, et un état précédent, ou au pire la donnée originale qui est sur HDFS.
        • Concernant le problème du déterminisme, si une opération était non déterministe et que la donnée a été transmise à un autre acteur alors qu'on a une faute, alors il faut tuer l'acteur en question. Et de manière générale il faut éviter les opérations non déterministes.
    • On peut également utiliser les batch pour des données sous forme de graphs.
      • PageRank est un exemple connu de système sous forme de graph.
      • Pour les parcourir et y faire des opérations, un MapReduce ne suffit pas puisqu'il ne peut faire qu'une lecture/écriture. Mais on peut répéter ce genre d'opérations sous forme itérative, tant qu'on n'a pas atteint le but recherché.
      • Cependant MapReduce n'est pas très efficace pour itérer plusieurs fois avec de petits changements.
        • On a alors un modèle appelé bulk synchronous parallel (BSP), aussi connu sous le nom de Pregel model, popularisé par un papier de Google.
          • Il est implémenté par Apache Giraph, Spark GraphX API, Flink Gelly API.
          • C'est la même chose qu'avec MapReduce sauf que les données sont conservées en mémoire, et en cas de faible changement, il n'y a que peu de choses à recréer.
          • Il est résistant aux fautes, en vérifiant l'état de tous les vertices régulièrement, et en l'écrivant sur disque dur.
          • Le calcul est parallélisé, et ça cause beaucoup de communication réseau. Dans la mesure du possible, si les données peuvent tenir en RAM sur un seul nœud, ou même sur son DD, il vaut mieux tenter l'approche non distribuée qui sera plus rapide, sinon le Pregel model est inévitable.
    • A mesure que le temps passe, des couches sont construites par dessus MapReduce, permettant d'avoir des abstractions.
      • Les jointures peuvent ainsi être faites par des opérateurs relationnels, permettant à l'outil de décider de la manière de l'implémenter. C'est supporté par Hive, Spark et Flink.
      • Grâce à ces diverses abstractions, les batch processings se rapprochent des BDD distribuées d'un point de vue performance, tout en permettant quand c'est nécessaire, d'exécuter du code arbitrairement pour plus de flexibilité.

11 - Stream Processing

  • L'idée derrière les streams c'est de faire la même chose que les batchs, mais de manière beaucoup plus récurrente, et jusqu'à la plus petite unité possible : plutôt que de faire le traitement une fois par jour on fait le traitement dès qu'on a des données nouvelles.
  • Les données dans le stream processing sont des events. Ils sont mis à disposition par un producer, à destination de consumers. Ils sont groupés dans un stream d'events.
  • Comment transmettre les event streams :
    • On peut imaginer un mécanisme de polling où le producer met à disposition et les consumers vérifient régulièrement s'il n'y a pas de nouveaux events. Mais ça fait beaucoup de messages à envoyer si on veut être réactif. Il vaut mieux que les consumers soient notifiés à chaque event.
      • En général les BDD supportent mal cette technique. On a bien les triggers qui permettent d'exécuter du code à chaque requête, mais ça reste assez limité. Les BDD ne sont pas conçues pour ça.
    • La bonne solution est d'utiliser un messaging system.
      • Pour différencier ces systèmes, il faut regarder deux points :
        • Que se passe-t-il si le producer crée plus d'events que les consumers ne peuvent consommer ?
          • Soit les consumers sautent ces messages.
          • Soit les messages sont mis dans un buffer qui grossit, et dans ce cas que se passe-t-il si ça continue de grossir jusqu'à dépasser la RAM ?
          • Soit les consumers empêchent le producer de produire tant qu'ils n'ont pas fini les events déjà produits.
        • Que se passe-t-il si le système est down ou que des nœuds crashent ? est-ce qu'on perd des events, ou est-ce qu'ils sont persistés / dupliqués ?
      • Une première possibilité est la communication directe entre producer et consumers :
        • Des librairies de messaging brokerless comme ZeroMQ et nanomsg utilisent TCP/IP pour communiquer.
        • UDP multicast est un protocole qui permet d'envoyer des events sans garantie de réception.
        • StatsD et Brubeck utilisent UDP pour envoyer des métriques en tolérant des pertes.
        • Le consumer peut exposer une API REST ou RPC appelée par le producer. C'est l'idée des webhooks. Dans le cas où les consumers sont HS, il se peut simplement qu'ils ratent l'event.
      • Une autre solution est l'utilisation de message brokers (ou message queues).
        • Ce sont en fait des BDD, soit in memory, soit avec une forme de persistance, qui mettent en relation les producers et consumers en général de manière asynchrone.
          • Ils tolèrent donc les crashs côté consumer, puisque le message pourra être traité plus tard.
        • Par rapport aux BDD :
          • Ils ont des similarités, et peuvent même participer à des protocoles 2PC utilisant XA.
          • Mais il y a des différences :
            • Les BDD gardent les données, alors que les brokers les effacent quand ça a été traité.
            • Les brokers partent du principe que le nombre de messages à avoir en mémoire est faible. S'il grossit les performances peuvent se dégrader.
      • Lorsqu'il y a plusieurs consumers, on peut trouver deux stratégies pour leur envoyer les events :
        • Load balancing : si les messages coûtent cher à traiter, on donne chaque message à un consommateur.
          • Les protocoles d'encapsulation AMQP et JMS supportent tous deux cette pratique.
        • Fan-out : Chaque consumer reçoit le message et peut le traiter indépendamment des autres.
          • Là encore AMQP et JMS supportent cette pratique.
      • Pour que le broker sache quand il faut enlever le message de la queue et éviter de l'enlever en cas de crash du consumer, le consumer qui a traité le message doit faire un acknowledgement. Sinon le message reste et devra être traité.
        • Ces crashs peuvent causer un traitement des messages dans un ordre différent de celui d'arrivée. Si on veut éviter ça, on peut faire une queue par consumer.
    • Les brokers traditionnels se distinguent des BDD ou des batches par le fait que les données sont détruites une fois traitées. Mais on peut très bien combiner la faible latence de traitement des messages (streaming) avec de la persistance durable : on a alors les log-based message brokers.
      • Il s'agit d'écrire les events dans un fichier de log, comme on le ferait pour les LSM-Tree, ou les write-ahead logs. Les consumers peuvent alors traiter le fichier séquentiellement, et une fois à la fin être notifiés à chaque nouveau message.
      • Pour pouvoir scaler avec ce modèle au-delà de ce que peut supporter la lecture d'un seul disque, on peut utiliser le partitionnement : les messages sont partitionnés sur différentes machines représentant des producers, et des consumers viennent traiter les messages sur chaque partition.
        • Au sein de chaque partition, on peut avoir un identifiant séquentiel indiquant l'ordre. Par contre, ça ne marche pas à travers les partitions.
        • Grâce au partitionnement, ce type de log-based brokers, malgré le fait d'écrire sur disque, arrivent à traiter plusieurs millions de messages par seconde.
      • Ce type de broker est implémenté par Apache Kafka, Amazon Kinesis Streams et Twitter's Distributed Log. Google Cloud Pub/Sub est architecturé de cette façon, mais expose une JMS-style API.
      • Les log-based brokers supportent le fan-out messaging puisque les logs sont conservés et peuvent être lus un grand nombre de fois.
      • Pour ce qui est du load-balancing messaging, c'est mis en place à l'aide des partitions, qui sont assignés à des consumers spécifiques.
        • Il y a des désavantages :
          • On n'a qu'un consumer par partition.
          • Les messages lents d'une même partition vont ralentir les autres messages de cette partition.
      • Quand utiliser les brokers classiques vs log-based :
        • Quand le processing des messages peut être coûteux, et qu'on a envie de paralléliser message par message (et quand l'ordre des messages n'est pas très important), on peut utiliser les brokers de type JMS / AMQP.
        • Quand en revanche les messages sont rapides à traiter, et que l'ordre importe, alors les log-based brokers sont pertinents.
          • Vu que l'ordre est respecté seulement au sein des partitions, on peut très bien choisir comme clé de partitionnement la chose dont on veut que les événement liés gardent le bon ordre. Par exemple l'id d'un utilisateur.
      • Étant donné que l'ordre est respecté au sein de chaque partition, on n'a plus besoin d'acknowledgement quand le traitement est fait pour chaque event. On sait que ce sera fait dans l'ordre et on peut regarder régulièrement le log offset de chaque consumer.
        • Si un consumer échoue, un autre reprendra au dernier log offset connu. Et si des messages avaient été traités mais dont le log offset n'était pas connu, ils seront traités deux fois (il va falloir régler ce problème).
      • A propos de l'espace disque :
        • A force d'écrire des logs sur le DD, il finit par être plein, et il faut alors supprimer des données ou les bouger vers un espace d'archivage.
        • Cela veut donc dire que si on consumer est vraiment trop lent, il pourrait finir par ne plus avoir accès aux messages non lus qui ont été déplacés.
          • Il faut quand même relativiser ça : un DD typique fait 6To, et en écrivant séquentiellement à la vitesse max on écrit en moyenne à 150 Mo/s. Ce qui fait 11 heures pour remplir le disque dur. Et sachant qu'on n'écrit pas en permanence à la vitesse max, en général des events de plusieurs jours vont pouvoir être stockés sur une même machine productrice.
          • Si un consumer est trop en retard, on peut aussi lever une alerte pour qu'un être humain gère. Vu les délais, il aura normalement le temps de régler la situation.
        • On peut noter aussi que pour les log-based brokers, vu qu'ils écrivent toujours sur DD, le temps de traitement reste à peu près constant, alors que pour les brokers plus classiques, si on dépasse la RAM et qu'on doit écrire sur DD, les performances se dégradent.
      • On peut remarquer que les log-based brokers sont plus proches des batches que les brokers classiques. Les données anciennes étant conservées, on peut les rejouer à loisir pour faire des tâches dessus.
  • Les streams et les bases de données :
    • Les principes des streams peuvent aussi être utiles pour les BDD.
      • Par exemple, le replication log envoyé par le leader n'est rien d'autre qu'un stream.
      • On peut aussi considérer que chaque opération d'écriture en BDD est un événement, et qu'on peut reconstruire la BDD à partir du log d'events déterministes.
    • On se retrouve souvent avec des copies des données sous différents formats pour différents usages (cache, data warehouse etc.). Mais comment garder ces données synchronisées ?
      • Une solution est d'utiliser les batches. Mais c'est lent, et on n'aura pas de données à jour rapidement.
      • Une autre solution serait d'écrire en même temps dans la BDD principale et dans ces autres copies. Mais dans des systèmes distribués il peut survenir des inconsistances entre ces copies.
        • Pour régler ce problème, on pourrait transformer les copies en suiveuses de la BDD principales comme avec le modèle leader / follower.
        • Malheureusement pendant longtemps les logs des messages allant dans la BDD ont été considérés comme des API privées. Récemment on a un intérêt vers le fait de les exploiter comme des streams qu'on appelle change data capture (CDC).
        • La solution est d'utiliser un log-based broker pour transporter les events d'écriture de la BDD (leader) vers les datasets qui sont des followers (search index, data warehouse etc.).
          • C'est utilisé par Databus de LinkedIn, Wormhole de Facebook et Sherpa de Yahoo.
          • Bottled Water le fait pour PostgreSQL en lisant son write-ahead log.
          • Maxwell et Debezium le font pour MySQL.
          • Mongodriver le fait pour MongoDB.
          • GoldenGate le fait pour Oracle.
          • Kafka Connect Framework offre des connecteurs CDC pour divers BDD.
          • RethinkDB, Firebase, CouchDB, MongoDB et VoltDB permettent aussi d'avoir un mécanisme pour exporter le stream des données hors de la BDD.
          • En général, cette solution est utilisée dans un mode de réplication asynchrone.
        • Certains outils permettent de commencer un dataset suiveur avec un snapshot initial des données, plutôt que de récupérer la totalité des logs pour reconstruire la BDD.
        • Certains outils comme Apache Kafka permettent aussi de récupérer les logs compactés, au sens de la compaction des LSM-Tree : seules les logs représentant la dernière version des entrées sont gardées. Si une entrée est supprimée à un moment, toutes les logs précédentes de cette entrée peuvent être supprimées aussi par la compaction par exemple.
    • Event sourcing : c'est une idée développée par la communauté domain-driven design (DDD).
      • Cela consiste à stocker tous les changements d'état d'une application sous forme de logs de change events
      • La différence entre le change data capture de la BDD et l'event sourcing c'est que le change data capture permet d'ajouter / enlever / modifier des choses dans la BDD et d'en faire un log, alors que l'event sourcing décourage ou interdit la modification, mais consiste plutôt à accumuler des events qui représentent des choses qui se produisent plutôt que de simples changements d'état qui s'annuleraient entre eux.
        • La conséquence est qu'on ne peut pas vraiment faire de compaction pour les events de l'event sourcing, parce qu'ils ne s'annulent pas entre eux à proprement parler. Il faut garder ces events immuables.
      • L'event sourcing est un modèle très puissant pour représenter clairement ce qui se passe dans l'application, et permet aussi d'avoir des facilités pour débugger.
      • Il existe des BDD spéciales pour l'event sourcing comme Event Store, mais en réalité n'importe quelle BDD ou message broker serait adapté.
      • L'event sourcing sépare bien les events des commands. Quand une requête arrive de l'utilisateur c'est d'abord une command. Elle doit être traitée et validée, et c'est seulement quand on est sûr qu'elle l'est qu'elle devient un event immuable. Elle est alors transmise à divers systèmes consommateurs et ne peut pas être supprimée, mais seulement changée par un autre event d'annulation par exemple.
    • Les streams et les états vis à vis de l'immuabilité :
      • On peut voir la BDD comme étant un sous ensemble, ou une version cache la plus récente des données que sont les logs d'events. Avec le mécanisme de compaction des SSTables c'est encore plus évident puisqu'on a les logs, et on vient enlever ce qui est “inutile” pour obtenir la BDD qui est l'état le plus actuel des données.
      • Un des avantages à avoir les logs des changements immuables comme source de vérité principale à partir de laquelle on peut construire diverses formes de dataset est que même si on fait une opération malheureuse qui corrompt les données, si c'est juste sous forme de log il suffira de revenir en arrière dans les logs et c'est réglé. Avec une vraie BDD si on a corrompu les données on risque de ne pas pouvoir défaire.
      • On peut dériver diverses formes de données à partir des logs :
        • Par exemple, Druid ingère les données de Kafka, de même pour Pistachio qui utilise Kafka comme un commit log, et Kafka Connect peut exporter les données de Kafka vers diverses BDD ou indexes.
      • Stocker les données est facile si on n'a pas à se préoccuper de le faire dans un format qui permettra une lecture optimisée en fonction de notre contexte. On peut donc séparer l'écriture de la lecture, en créant de nouveaux dataset dérivés quand on a besoin des données pour faire quelque chose de spécifique.
        • Cette idée de séparer les données d'écriture et de lecture est connue sous le nom CQRS (Command Query Responsibility Segregation).
        • Dans cette approche la question de “faut-il vraiment dénormaliser ?” ne se pose plus vraiment : il est logique de dénormaliser pour optimiser en lecture, vu que de toute façon les données seront présentes sous une forme plus canonique dans la version écrite.
      • Avantages et inconvénients de l'event sourcing et du change data capture :
        • Un des inconvénients est que si l'écriture se fait de manière asynchrone pour gagner du temps (ce qui est souvent le cas), on risque de ne pas avoir la garantie de read after your writes par exemple. Pour remédier à ça on pourrait rendre la copie synchrone, ou utiliser des transactions distribuées, ou un total order broadcast.
        • Un des avantages est que ça peut faciliter la concurrency control : à chaque fois qu'une requête a besoin de modifier plusieurs objets, on peut très bien écrire un event dans le log qui implique l'ensemble de ces objets. Et donc on aurait des opérations atomiques écrites en une fois.
      • A propos de l'immuabilité :
        • Elle est utile si les données ne changent pas tant que ça, mais si elles changent beaucoup on risque de se retrouver avec des logs énormes pour pas beaucoup de données.
        • On a aussi des contraintes légales qui imposent parfois de supprimer certaines données.
        • On peut alors réécrire l'historique pour enlever certaines données. Datomic appelle ça l'excision.
          • Il faut savoir aussi qu'étant donné les diverses copies de dataset, backups et autres, c'est assez difficile de complètement supprimer les données.
  • Traitement des streams.
    • On peut faire 3 choses avec un stream :
      • L'écrire en BDD ou autre forme de persistance.
      • Le donner directement à l'utilisateur en lui affichant.
      • Le modifier pour fabriquer un nouveau stream à travers un operator comme avec les batchs, dont le résultat ira à nouveau dans une persistance ou chez l'utilisateur.
    • Tout ceci est assez similaire à ce qui se passe avec les batchs.
      • La différence c'est que le stream ne se finit pas, et donc on ne peut pas faire de sort ou de jointures sort-merge comme avec les batchs.
      • La tolérance aux erreurs aussi n'est pas la même : on peut difficilement rejouer un stream qui tourne depuis des années comme on rejouerait un batch qui vient d'échouer.
    • A propos des usages du streaming :
      • On l'utilise pour du monitoring quand on veut être averti de choses particulières, par exemple avec la détection de fraudes, le statut des machines d'une usine etc.
      • Les complex event processing (CEP) permettent de déclarer des patterns à trouver (souvent avec du SQL), et créent des complex events à chaque fois que ça match, il s'agit de trouver une combinaison d'events.
        • C'est implémenté dans Esper, IBM InfoSphere Streams, Apama, TIBCO StreamBase, SQLstream.
      • Les stream analytics qui ressemblent aux CEP mais sont plus orientés vers le fait de trouver des résultats agrégés à partir des données streamées. Par exemple calculer une moyenne, une statistique.
        • On utilise souvent des fenêtres de données pour faire les calculs dessus.
        • On utilise parfois des algorithmes probabilistes comme les Bloom Filters pour savoir si un élément est dans un set et d'autres. Ces algorithmes produisent des résultats approximatifs mais utilisent moins de mémoire.
        • Parmi les outils on a Apache Storm, Spark Streaming, Flink, Concord, Samza et Kafka Streams. Et parmi les outils hostés on a Google Cloud Dataflow et Azure Stream Analytics.
      • Les dataset dérivées des logs comme dans l'event sourcing peuvent être vus comme des materialized views, dans ce cas il faut prendre en compte l'ensemble des logs et pas juste une fenêtre.
        • Samza et Kafka Streams font ça.
      • On peut faire aussi un peu pareil que les CEP mais en recherchant un seul event qui match un critère de recherche. Alors que d'habitude on doit indexer avant de faire une recherche, là il s'agit de rechercher en plein streaming.
        • La feature percolator d'Elasticsearch permet de faire ça.
    • La notion de temps dans la gestion des stream processing :
      • Alors que dans les batch processing ce qui compte c'est éventuellement le timestamp des events analysés, et pas le temps pendant le quel le batch s'exécute (ce qui rend la réexécution du batch transparente d'ailleurs), dans le cadre du stream processing le temps pendant lequel le processing s'exécute peut être pris en compte, par exemple pour faire des fenêtres.
        • Attention cependant aux lags : il est possible que lors du stream processing un event soit processé bien après avoir été émis. Et dans ce cas on peut se retrouver avec des events traités dans un ordre qui n'est pas le bon vis-à-vis de leur émission.
      • Quand on stream avec des fenêtres de temps contenant des events pour y faire des opérations, on ne peut jamais être sûr que tous les events de la fenêtre sont arrivés : ils ont peut être été retardés (qu'on appelle straggler)
        • Dans ce cas, soit on dit tant pis et on annule les events retardataires, en levant éventuellement une alerte s'il y en a trop.
        • Soit on publie plus tard un correctif avec les events retardataires.
      • Quand on veut prendre en compte le temps, le temps de la création de l'event est souvent plus précis (par exemple un event peut être créé offline par un mobile, et envoyé seulement quand il est connecté), mais aussi moins fiable vu que la machine n'est pas sous notre contrôle contrairement au serveur.
        • Une des solutions est de relever (1) le temps de l'event indiqué par le client, (2) le temps de l'envoi de l'event indiqué par le client, et (3) le temps de la réception de l'event par le serveur. De cette manière on peut comparer les horloges du client et du serveur vu que le (2) et le (3) doivent être très proches.
      • Il y a plusieurs types de fenêtres temporelles :
        • Tumbling window : Les fenêtres sont fixes, et chaque event appartient à une fenêtre.
        • Hopping window : Les fenêtres font la même taille mais se chevauchent, certains events qui sont entre les deux sont dans les deux fenêtres.
        • Sliding window : Les fenêtres font la même taille mais se déplacent dans le temps, et donc les events les plus anciens sont exclus au fur et à mesure, remplacés par des events plus récents.
        • Session window : Les fenêtres n'ont pas la même taille, elles regroupent des events proches dans le temps où un même utilisateur a été actif.
    • Les streams étant une généralisation des batchs, on a ici le même besoin des jointures.
      • On peut dénombrer 3 types :
        • Le stream-stream join (window join) consiste à joindre deux ensemble streams d'events ensemble. Par exemple dans le cadre d'une recherche, joindre les events de recherches faites aux events clics qui s'en sont suivis (ou à l'absence de clics après timeout).
        • Le stream-table join (stream enrichement) consiste à “enrichir” les events issus d'un stream avec le contenu d'une BDD. Par exemple les actions d'un utilisateur enrichis (complétés ou triés) avec des infos issus de son profil.
          • Pour ce faire il nous faut une copie de la BDD sur le disque local de préférence, et si suffisamment petit on peut même la mettre en RAM. C'est très similaire aux Map-side joins des batchs.
          • Vu que les données de la table risquent d'être mises à jour, on peut utiliser le change data capture pour récolter les mises à jour de la table régulièrement.
        • Le table-table join (materialized view maintenance) consiste à matérialiser une requête de jointure entre deux tables, à chaque fois qu'il y a un changement dans ces deux tables qui risque d'affecter le résultat de cette jointure.
          • On peut prendre l'exemple de twitter qui, en même temps qu'il stocke les tweets et followers, construit une timeline en cache au fur et à mesure.
      • On remarque que dans la plupart des cas, le temps est important, et que deux événements, ou un événement et une mise à jour en BDD pourraient arriver avant ou après l'autre (du fait du partitionnement). Ceci rend la jointure non déterministe (si on la refait on risque d'avoir un résultat différent).
        • Dans les data warehouses ce problème s'appelle _slowly changing dimension (SCD) _et la solution à ça peut être d'ajouter un identifiant qui est changé à chaque event. Mais la conséquence c'est qu'on ne peut plus faire de compaction.
    • A propos des fautes dans le cadre des streams :
      • L'avantage avec les batchs c'était le fait de pouvoir réexécuter en cas d'erreur, et d'avoir au final le job exécuté comme s'il l'avait été une seule fois.
        • Une des solutions est le microbatching : on fait des petites fenêtres de données (souvent d'1 seconde) et on les traite comme des batchs.
          • Spark Streaming fait ça.
          • Une variante consiste à faire des checkpoints réguliers sur DD, et en cas de crash on recommence à partir du checkpoint.
            • Flink fait ça.
          • Attention cependant au moment où on fait autre chose avec ces données, comme écrire en BDD ou envoyer un email. Dans ce cas, il s'agit de side effects qui pourront être réexécutés en cas de réexécution du microbatch.
            • Pour régler ce problème, il faut tout préparer, et exécuter tout ce qui est validation des opérations, side-effects et autres en une seule fois et de manière atomique.
              • C'est un peu de la même manière que le 2PC (two phase commit) des transactions distribuées.
              • C'est utilisé par Google Cloud Dataflow, VoltDB et Apache Kafka.
            • Une autre solution pour ce problème est de créer de l'idempotence, c'est-à-dire faire en sorte qu'une chose faite plusieurs fois donne le même résultat.
              • On peut le faire par exemple en retenant un offset qui fera en sorte de ne rien faire si on tente de refaire l'opération.
              • Attention au fait que cela implique qu'il faut rejouer les messages dans le même ordre (un broker log-based permet ça), de manière déterministe, et sans concurrence.
      • On peut aussi vouloir que des states (par exemple compteurs, moyennes etc.) soient reconstruites en cas de faute.
        • Dans certains cas ça peut être fait à partir des events, par exemple parce qu'il s'agit d'un état qui porte sur peu d'entre eux.
        • Sinon une solution peut être de les sauvegarder régulièrement quelque part pour aller les chercher en cas de besoin.
          • Flink capture régulièrement ces valeurs et les écrit sur du HDFS.
          • Samza et Kafka Streams répliquent les changements des states vers un stockage persistant avec compaction.
          • VoltDB réplique les states en faisant le processing des messages sur plusieurs nœuds.
          • Il faut voir que la sauvegarde en local avec accès au disque ou la sauvegarde via le réseau peuvent chacun être plus ou moins performants en fonction des cas.

12 - The Future of Data Systems

  • Chaque outil a ses avantages et inconvénients, et il n'y a pas d'outils parfaits.
    • Certaines personnes disent que tel ou tel type d'outil n'a aucune utilité, mais ça reflète surtout le fait qu'eux ne l'utilisent pas, et qu'ils ne voient pas plus loin que le bout de leur nez.
  • Il convient souvent de combiner plusieurs outils pour plusieurs usages :
    • Parmi ceux-ci on peut trouver :
      • Une BDD relationnelle pour la persistance de données structurées. (ex : PostgreSQL)
      • Un index de recherche pour une recherche performante, mais qui est moins bon sur la persistance des données (ex : Elasticsearch)
      • Un système d'analyse du type data warehouse ou batch / stream processing.
        • Parmi les batchs / streams on pourrait vouloir alimenter un système de machine learning, de classification, de ranking, de recommandations, de notification basée sur le changement de données.
      • Un cache ou des données dénormalisées issues des données initiales.
    • Par exemple, on peut avoir une BDD et un search index, avec les données écrites d'abord dans la BDD, puis propagées dans le search index via change data capture (CDC).
      • Si on décide qu'on veut écrire à la fois dans la BDD, et dans le search index, alors on risque d'avoir des latences qui causent des différences d'ordre d'écriture entre les deux.
        • Une solution à ça c'est d'utiliser un système d'entonnoir qui force l'ordre, dans l'idée d'un total order broadcast.
    • Que choisir entre les données dérivées (CDC, event sourcing) et les transactions distribuées (2PC) pour faire communiquer plusieurs outils entre eux ?
      • Selon l'auteur, XA, le protocole qui permet de faire communiquer les outils via les transactions distribuées a une mauvaise tolérance aux fautes et une faible performance. Et en l'absence d'un autre protocole aussi répandu (ce qui ne risque pas d'arriver rapidement), il est plus pertinent d'opter pour les datasets dérivés.
        • Cependant, les transactions distribuées supportent la linearizability et donc permettent par exemple le “read your own writes”, alors que les données dérivées sont en général asynchrones et donc n'apportent pas ces garanties. Cette eventual consistency est à mettre dans la balance.
          • Plus tard on parlera d'un moyen de contourner ce problème.
    • Attention au fait de vouloir du total ordering :
      • Pour avoir du total ordering il faut que les données passent par une seule machine (par exemple single leader). Sinon on peut créer des partitions mais on aura des ambiguïtés entre partitions.
      • Dans le cas de plusieurs datacenters on a en général besoin de 2 leaders => on n'aura donc pas de total ordering.
      • Quand on fait du micro-service, il est courant de déployer le code sur des machines avec chacune son stockage et sans que ce stockage soit donc partagé => on se retrouve là aussi donc à ne pas respecter le total ordering.
      • Pour être clair : le total ordering implique le total order broadcast, qui est équivalent au consensus. Et la plupart des algorithmes de consensus ne sont pas faits pour marcher si le throughput dépasse les données que peut gérer un seul serveur. Le fait de pouvoir gérer un tel throughput avec des datacenters distribués dans le monde est un sujet de recherche.
    • A propos de l'ordre causal :
      • Pour les événements qui touchent le même objet, celui-ci étant sur la même partition on peut ordonner ces actions et respecter la causalité.
      • En revanche, pour les événements qui portent sur plusieurs objets il n'y a pas de solution facile. Quelques pistes :
        • Les clocks logiques peuvent aider.
        • Si on log des events pendant pour les lectures, alors les autres évents peuvent les utiliser pour identifier le moment où un événement ne s'était pas produit et créer un ordre comme ça.
        • Les structures de résolution automatique de conflit (fusion des objets par exemple) peuvent aussi aider.
    • A propos des batches et streams :
      • Une des raisons pour lesquelles il est pratique d'avoir des dataset dérivés par batch/stream plutôt que transactions distribuées est qu'on peut fauter quelque part et ne pas tout annuler, mais seulement retenter la construction du batch/stream.
      • Un des avantages des batchs/streams c'est qu'avec les datasets dérivés, on peut changer le schéma de nos données pour un dataset, et reprocesser le tout, ou continuer pour le stream. On n'a pas à faire d'opérations destructives pour faire évoluer notre code.
        • On peut d'ailleurs faire les changements graduellement, blocs de données par bloc de données.
      • La lambda architecture consiste à avoir un batch et un stream qui vont processer la même chose pour avoir la donnée immédiatement, mais avoir un process mieux tolérant aux erreurs plus tard. Le stream fait une approximation, alors que le batch fait un calcul plus précis régulièrement.
        • Il y a cependant plusieurs problèmes :
          • Maintenir la logique dans le batch et le stream est difficile.
          • Il faut merger les deux régulièrement, et ça peut être difficile si les opérations appliquées sont difficiles.
          • Reprocesser toutes les données avec le batch est très coûteux, donc on peut à la place reprocesser une seule heure de données et y ajouter le stream. Cependant, rendre le batch incrémental le fragilise.
        • Mais plus récemment on a d'autres solutions pour rendre la lambda architecture plus utilisable grâce à certaines features qui sont de plus en plus supportés par les outils.
  • A propos de BDD :
    • Les BDD et les filesystem font la même tâche.
    • Mais ils ont certaines différences : les filesystem Unix offrent une API bas niveau pour traiter avec les fichiers, alors que les BDD offrent une API plus haut niveau avec SQL.
      • D'une certaine manière certaines BDD NoSQL tentent d'ajouter la philosophie Unix aux BDD.
    • Les BDD et les batchs / streams ont des choses en commun :
      • Par exemple, la construction de search indexs dans les batchs/streams sont un peu la même chose que la construction d'index secondaires.
      • Et du coup on en arrive à la conclusion qu'en fait les batchs/streams ne sont que la continuation d'une même base de données transformée pour l'adapter aux besoins, distribuée sur d'autres machines et administrée éventuellement par d'autres équipes.
      • L'auteur spécule que les données seront organisées en deux grands axes, qui sont en fait deux faces de la même pièce :
        • Federated databases (unifying reads) : il s'agit de fournir une API de lecture pour accéder à toutes les données existantes du système, tout en laissant les applications spécialisées accéder directement aux datasets spécifiques dont elles ont besoin. L'idée est donc de connecter toutes les données ensemble.
          • PostgreSQL et son foreign data wrapper permet de faire ça.
        • Unbundled databases (unifying writes) : il s'agit de traiter les écritures pour qu'on puisse écrire dans n'importe quelle version des données, et qu'elles soient quand même synchronisées avec le reste. Alors que les BDD supportent les indexes secondaires, ici on a différents datasets interconnectés et donc on doit en quelque sorte maintenir nos indexes à la main.
          • On est en plein dans la tradition Unix où des petits outils font une chose bien, et peuvent s'interconnecter.
          • Alors que la fédération des données n'est pas trop difficile, maintenir les données synchronisées est plus compliqué à faire.
            • Pour accomplir ces données synchronisées on recourt traditionnellement aux transactions distribuées, mais selon l'auteur c'est la mauvaise approche. L'approche sous forme de données dérivées depuis un event log asynchrone, et l'utilisation de l'idempotence est bien plus solide.
            • Une des raisons déjà évoquée est que faire communiquer des systèmes de données hétérogènes via un mauvais protocole marche moins bien que via une meilleure abstraction avec des logs d'event et de l'idempotence.
            • Le gros plus de l'approche avec les event logs est le couplage faible entre les composants :
              • La nature asynchrone de cette approche permet de tolérer bien mieux les fautes (par exemple, un consommateur fautif va rattraper son retard plus tard via les messages accumulés) alors qu'avec les transactions distribuées synchrones par nature, les fautes ont tendance à être amplifiées.
              • Au niveau des équipes, chacune peut se spécialiser dans un type de dataset pour un usage, et le faire indépendamment des autres.
          • Entre utiliser un système de BDD intégré et un système composé de datasets dérivés, le choix des datasets dérivés n'est pas forcément systématique. Ça peut être une forme d'optimisation prématurée, et d'ailleurs si un système de BDD répond à nos besoins, autant l'utiliser lui seul.
          • Ce qui manque dans l'histoire c'est une manière simple et haut niveau d'interconnecter ces systèmes, par exemple “declare mysql | elasticsearch” comme équivalent de “CREATE INDEX” dans une BDD.
            • Il y a des recherches à ce sujet mais pour le moment rien de tel, on doit faire beaucoup de choses à la main.
    • Pour continuer sur l'idée de l'unbundling databases, et des applications autour du dataflow :
      • On peut trouver des ressemblances avec le concept d'unbundling des BDD et des langages de dataflow comme Oz, Juttle, les langages fonctionnels réactifs comme Erlang, et les langages de programmation logique comme Bloom.
      • L'idée de l'unbundling est aussi présente dans les tableurs quand ils mettent à jour toute une colonne dès qu'une donnée est écrite. Il faut juste faire la même chose mais dans un contexte distribué, et avec des technologies disparates.
      • On a différentes formes de données dérivées, mais en gros dès que la dérivation est spécifique à notre métier, il faut écrire du code applicatif pour gérer ce dataset-là. Et les BDD ont en général du mal à permettre ça. Il y a bien les triggers / stored procedures, mais ça reste une feature secondaire.
      • Il est devenu un pattern courant et une bonne pratique de séparer le code applicatif du state (ie. la persistance), en ayant des serveurs stateless qui accèdent à une BDD commune.
        • Les développeurs fonctionnels disent qu'ils sont pour “la séparation de l'église et de l'état”.
        • Cependant, de même que dans la plupart des langages il n'y a pas de système de souscription (sauf à le faire avec le pattern observer), dans les BDD il n'y en a pas non plus sauf récemment avec les CDC par exemple.
      • Vu qu'on veut sortir la logique de mise à jour automatique par exemple d'un index dans la BDD hors de celle-ci, on peut partir du principe que la donnée n'est pas une chose passive utilisée par l'application, mais que les changements dans un dataset peuvent déclencher du code applicatif pour créer un autre dataset.
        • A cet effet, on peut utiliser des log message brokers (et non pas des message brokers traditionnels qui servent à exécuter des jobs de manière asynchrone).
          • L'ordre des messages est souvent important pour maintenir des datasets dérivés.
          • On doit être tolérant aux fautes et ne pas perdre de messages, sous peine d'inconsistance.
          • Les message brokers permettent au code applicatif de s'exécuter sous forme d'operators, ce qui est pratique.
      • Le stream processing et les services :
        • L'architecture sous forme de services est plutôt à la mode, son avantage principal est de permettre une forme de scalabilité dans l'entreprise, en permettant à plusieurs équipes de déployer séparément.
        • Il y a cependant une différence entre les services qui vont envoyer un message pour recevoir une réponse du service qui a les données, et le stream processing qui va construire et maintenir à jour un dataset local à la machine qui a le code applicatif, qui n'aura plus de requête réseau à faire => la méthode avec le stream processing est donc plus performante.
    • Lecture des données dérivées :
      • Les données dérivées sont construites et mises à jour en observant la donnée initiale et la faisant passer à travers des operators, tout ceci pendant la phase d'écriture. On a ensuite du code exécuté qui lit ces données dérivées et qui répond à une requête d'un client, pendant la phase de lecture donc.
        • Ce point de rencontre représente en quelque sorte le point d'équilibre entre la quantité de travail qu'on souhaite faire à l'écriture, et la quantité de travail qu'on souhaite faire à la lecture.
        • On peut déplacer ce point de rencontre pour faire plus de travail à l'écriture, ou plus à la lecture.
          • Par exemple pour la recherche, on peut très bien ne pas créer de search index, et tout faire à la lecture.
          • Ou alors on peut non seulement créer un search index à l'écriture, mais aussi créer tous les résultats de recherche possibles, comme ça à la lecture on n'aura plus qu'à lire un cache (aussi appelé materialized view).
            • Si créer l'ensemble des résultats de recherche serait sans doute excessif, on peut très bien imaginer mettre en cache les résultats des recherches les plus fréquentes.
          • On voit qu'on retrouve aussi notre exemple de twitter qui avait choisi de mettre en cache toutes les timelines, sauf pour les célébrités où il faisait la recherche en BDD.
        • Autre exemple de lecture de données dérivées : les applications web sur mobile qui gagnent de plus de capacité d'autonomie, y compris offline, peuvent stocker une forme de dataset dérivé au sein même du mobile, permettant au code sur le client d'en faire quelque chose offline.
          • Les outils frontend comme le langage Elm et le framework React / Redux permettent de souscrire à des events de l'utilisateur, en mode event sourcing.
          • Il serait tout à fait naturel de faire la même chose dans la relation client / serveur : permettre au client de faire une requête, puis de réceptionner non pas une réponse mais un stream de messages réguliers.
      • Les log message brokers passent en général leur contenu à une forme ou une autre de BDD spécialisée, mais il y a aussi une certaine persistance des events eux-mêmes (les logs) dans le message broker. En général seuls les events d'écriture y sont consignés, ce qui est raisonnable mais n'est pas la seule manière de faire possible.
        • Il est possible qu'en fonction du besoin applicatif, on ait aussi intérêt à consigner les events de lecture. Ça permet notamment de faire un stream-table join entre les lectures et les données existantes.
        • C'est utile en particulier dans le cas où on a plusieurs partitions qu'il faut traverser pour obtenir notre résultat.
          • La feature de distributed RPC de Storm implémente cette fonctionnalité.
        • Ça prend bien sûr plus de place donc il faut y réfléchir.
        • Un des avantages est que ça permet aussi de régler le problème de causalité vis-à-vis d'écritures sur des objets différents.
  • A la recherche des données correctes :
    • On a tendance à avoir un mouvement vers une plus grande performance, availability et scalability, avec une consistency qui est parfois délaissée.
      • Exemple : la réplication leaderless.
      • On peut aussi noter le rapport hasardeux à l'isolation et l'implémentation de faibles niveaux d'isolation dans beaucoup de BDD.
    • On peut répondre à certaines problématiques de corruption de données à l'aide de la serializability et des atomic commits, mais c'est vraiment coûteux et ça marche surtout sur un seul datacenter, avec des limites de scalabilité.
      • Il y a aussi les transactions qui permettent de régler certains problèmes, mais ce n'est pas la seule solution.
    • N'oublions pas non plus les erreurs et bugs applicatifs qui peuvent endommager les données de manière définitive, même en présence de serializability…
    • Pour lutter contre ces problèmes voici quelques solutions :
      • L'immutabilité des données (du genre event sourcing et autres) permet d'être sûr que même en écrivant des données corrompues, on pourra toujours les annuler pour retrouver l'état d'avant.
      • Rendre les opérations idempotentes pour qu'elles ne puissent être exécutées qu'une seule fois au plus est une forme de protection contre la corruption de données.
      • De manière générale, il est intéressant d'implémenter des mécanismes end-to-end qui vont suivre la requête de bout en bout. Par exemple TCP fournit ce genre de garanties à son niveau, mais une connexion TCP peut sauter et on peut en établir une autre pour refaire la même transaction, on a alors besoin de quelque chose qui suit notre transaction.
        • Malheureusement, implémenter de tels mécanismes end-to-end au niveau applicatif n'est pas simple. Pour l'auteur, il faudrait qu'on trouve la bonne abstraction pour rendre ça facile, mais il y a de la recherche à faire.
    • Appliquer des contraintes :
      • La contrainte de uniqueness dans un système distribué nécessite le consensus, et donc une forme de fonctionnement synchrone. Si les writes se faisaient de manière asynchrone, alors on ne saurait pas immédiatement si on peut écrire en respectant cette contrainte ou pas, et on aurait le conflit plus tard.
        • Un bon moyen pour garantir cette contrainte est de partitionner en fonction de la clé qui doit avoir la contrainte d'unicité. Mais là aussi bien sûr on ne pourra pas écrire dans la BDD de manière asynchrone.
      • Pour les contraintes au sein des log-based message brokers il s'agit aussi de faire en sorte que les requêtes avec possibilité de conflit soient dans la même partition, et de vérifier séquentiellement, message par message, que la requête respecte bien la contrainte vis-à-vis de la BDD locale.
        • Dans le cas où les entrées qui sont l'objet de contraintes sont localisées dans des partitions différentes ça se complique un peu.
          • On peut utiliser un atomic commit (par exemple 2PC).
          • Mais on peut aussi faire sans (exemple de débit / crédit d'un compte) :
            • On attribue un id à la requête.
            • Le stream processor crée 2 messages : un pour décrémenter le compte qui a un débit, et un autre pour incrémenter l'autre compte (qui sont chacun sur leur partition).
            • Les processors suivants consomment les messages, appliquent le débit ou le crédit, et dédupliquent en fonction de l'id de la transaction initiale.
            • Vu qu'on est dans un log based broker avec l'ordre des messages préservés et de la persistance, en cas de crash de l'un des consommateurs, il redémarre et réapplique les messages non processés dans l'ordre prévu.
            • Nous avons donc réussi à réaliser une transaction multi-partition sans utiliser de protocole de type atomic commit, en cassant la transaction en plusieurs morceaux s'exécutant chacun sur leur partition, et en ayant un mécanisme end-to-end (ici l'id) assurant l'intégrité du tout (le fait qu'un bout ne sera pas exécuté 2 fois).
    • A propos de l'intégrité et de la relation au temps :
      • Le terme consistency englobe en réalité deux enjeux :
        • La timeliness qui consiste à s'assurer que l'observateur voit une donnée à jour. C'est tout l'objet du terme eventual consistency quand la timeliness n'est pas respectée.
        • L'integrity qui consiste à préserver les données d'une corruption permanente des données, y compris dérivées. Pour la régler il faudrait réparer et non pas juste attendre ou réessayer.
        • Si le non-respect de la timeliness est embêtant, le non-respect de l'integrity peut être catastrophique.
      • Alors que dans les transactions ACID la timeliness et l'integrity sont confondues, on vient de voir que dans le stream processing on peut les décorréler, et arriver à garantir l'integrity tout en ayant un fonctionnement asynchrone et donc ne garantissant pas la timeliness. Et le tout sans utiliser les transactions distribuées coûteuses;
        • Selon l'auteur, cette technique est particulièrement prometteuse.
      • On peut aussi se demander si toutes les applications ont vraiment besoin d'un respect intransigeant de la timeliness, et donc d'un respect de la linearizability (dès qu'une écriture est faite, elle impacte les lecteurs) :
        • On peut très bien faire une compensating transaction dans le cas où on a accepté une transaction côté client mais qu'il se révèle qu'elle ne respecte pas les contraintes.
        • D'ailleurs un processus d'excuse et de compensation peut être pertinent dans de nombreux cas, par exemple pour la réservation, souvent on propose plus de places que disponibles en partant du principe qu'il y aura des désistements. Et dans le cas où on a mal prévu, il faut pouvoir avoir un processus de compensation.
      • On peut créer un système qui pour l'essentiel évite la coordination :
        • 1 - on préserve l'intégrité des données dans les systèmes dérivés sans atomic commit, linearizability, ou coordination synchrone entre partitions.
        • 2 - la plupart des applications peuvent se passer de contraintes temporelles fortes pour la timeliness.
        • Selon l'auteur, ce type de système sans coordination a beaucoup d'avantages. On peut très bien utiliser la coordination synchrone pour certaines opérations importantes qui ne permettent pas de retour en arrière, et garder le reste sans cette coordination.
        • Finalement on peut voir la chose de cette manière : avoir de fortes garanties synchrones du type transactions distribuées réduit le nombre d'excuses qu'il faudra faire pour les données inconsistantes présentées, mais ne pas les utiliser réduit le nombre d'excuses qu'il faudra faire pour toutes les indisponibilités du système dues à la faible performance induite par la coordination.
    • Vis-à-vis des erreurs matérielles et logicielles :
      • Il y a des corruptions probables contre lesquelles notre système model prévoit des parades, et des corruptions contre lesquelles non, comme par exemple faire confiance aux opérations du CPU. Pourtant tout peut arriver avec plus ou moins de probabilité.
      • Il ne faut pas oublier non plus que les BDD ne sont que des logiciels qui peuvent avoir des bugs, et pour nos codes applicatifs c'est encore pire.
      • La corruption des données finit par arriver qu'on le veuille ou non. Il faut une forme d'auto-auditabilité. Il faut vérifier régulièrement que nos données sont bien là et intègres, de même que nos backups.
      • L'approche représentée par l'event sourcing permet d'auditer plus facilement les données.
        • Et si on a bien fait la séparation entre les données sources et dérivées c'est encore plus clair.
        • On peut faire un checksum sur le log d'events pour le vérifier, et on peut rejouer les batchs pour recréer des données dérivées propres.
      • Une bonne pratique dans la vérification des données est de le faire sur des flows end-to-end. Cela permet d'inclure tout le hardware et le software dans ce qui est vraiment vérifié.
      • Les techniques cryptographiques de vérification de l'intégrité introduites par la blockchain est un mécanisme très intéressant pour l'avenir de l'intégrité des données.