Reading notes
Effective Kafka

Effective Kafka

1 - Event Streaming Fundamentals

  • Les systèmes distribués sont plus complexes que les systèmes non distribués, ils déplacent une partie de la complexité du local vers le global.
    • La raison pour laquelle on les utilise c'est qu'ils permettent de décomposer le système en plus petits problèmes qu'on va pouvoir diviser entre plusieurs équipes.
    • La complexité globale peut être réduite par certaines techniques, par exemple les messages asynchrones.
    • On y trouve des échecs partiels, intermittents, ou même byzantins (les nœuds envoient des informations fausses).
    • Le problème le plus important est sans doute celui de la consistance.
  • L'event-driven architecture (EDA) consiste à avoir des emitters envoyant des notifications d'event à des consumers.
    • Les emitters n'ont aucune connaissance des consumers. Et de même les consumers n'ont pas connaissance des emitters.
    • Les notifications d'event sont immutables, que ce soit côté emitter ou consumer.
    • L'EDA est la manière la plus découplée de faire communiquer des composants entre eux.
      • Le seul couplage sera dans le contenu des messages qui transitent.
      • Imaginons un système d'e-commerce, avec une plateforme BI et un CRM. Il leur suffira de consommer les events d'achat et d'y réagir en toute indépendance.
        • Parmi les autres possibilités qu'on aurait pour l'exemple e-commerce :
          • On peut les mettre dans un monolith (non-modulaire), mais la complexité risque d'augmenter à mesure que le modèle global est enrichi.
          • On peut utiliser des patterns d'intégration : des messages synchrones envoyés par le composant e-commerce ou par les deux autres. Dans ce cas on se rapproche du distributed monolith parce que les composants ne seront pas indépendants.
          • On peut utiliser la data decapsulation, où les composants BI et CRM viennent lire la DB du composant e-commerce. Dans ce cas on se retrouve dans un mode “get rich quick scheme” qui mène toujours à des pleurs. Le couplage est maximal.
        • Cet exemple montre que l'EDA scale de manière linéaire, alors qu'avec les approches plus couplées, la complexité explose quand on scale le nombre de composants.
    • L'EDA est beaucoup plus résilient que les approches couplées : si un composant est en situation d'échec, il a peu de chances d'impacter d'autres composants.
      • Si on reprend l'exemple d'e-commerce :
        • Dans le cas où le composant d'e-commerce est en situation d'échec, les autres composants vont continuer à pouvoir fonctionner, mais simplement ils ne recevront plus de nouveaux events.
        • Dans le cas où par exemple le CRM est en situation d'échec, les events continueront d'arriver, et il pourra toujours rattraper son retard dès qu'il est rétabli.
        • On peut aussi prévoir une mesure pour que si le message broker est en situation d'échec, l'émetteur puisse publier les events localement, pour les mettre dans le message broker plus tard.
      • Dans un système couplé, un composant qui est en échec peut entraîner des correlated failures chez les autres qui en dépendent.
        • On peut aussi avoir des congestive collapses dans le cas où certains composants sont temporairement surchargés, et que les requêtes synchrones mènent à avoir des timeouts, puis à envoyer plus de requêtes.
    • L'EDA a aussi des avantages en termes de consistance.
      • Il favorise l'ownership d'un élément stateful par un composant unique, les autres composants recevant les notifications d'event ne pouvant pas modifier cet état.
      • En dehors du composant owner, les events sont rejouables dans le bon ordre, garantissant une sequential consistency.
    • L'EDA n'est cependant pas adaptée dans certains cas.
      • Elle n'est pas adaptée aux interactions synchrones.
      • Par contre, dans les cas où on peut l'utiliser, elle permet des améliorations significatives des aspects non fonctionnels.
  • L'event streaming est un moyen d'obtenir un stream durable et ordonné d'events immutables, délivrés aux consumers qui ont souscrit.
    • L'event streaming n'est pas nécessaire pour implémenter l'EDA, qui peut d'ailleurs être implémenté dans un monolith (cf. outils comme React qui sont basés sur des events).
    • En revanche l'event streaming est pertinent comme choix face aux solutions concurrentes (comme les message queues) dans le cadre d'EDA distribuées, parce qu'il a été conçu pour ça.
      • L'event streaming supporte nativement l'immutabilité des events.
      • Il supporte la garantie d'ordre des events.
      • Il supporte le fait d'avoir de multiples consumers.

2 - Introducing Apache Kafka

  • Kafka est une plateforme d'event streaming, mais elle comprend aussi un écosystème entier qui permet l'implémentation d'EDAs.
  • L'event streaming est récent comparé aux formes traditionnelles de messaging (MQ-style).
    • Il n'y a pas de standard, mais Kafka est le leader du domaine, et son fonctionnement sert de modèle pour les solutions concurrentes comme Azure Event Hubs et Apache Pulsar.
  • Historiquement, Kafka a été open-sourcé en 2011 par LinkedIn, et confié à la fondation Apache.
    • Il avait été conçu notamment pour gérer les events d'activité des utilisateurs.
    • En 2019, LinkedIn opérait 100 clusters Kafka, pour un total de 100 000 topics et 7 millions de partitions.
    • Aujourd'hui Kafka est utilisé par des géants de la tech, pour des usages comme le real-time analytics, la data ingestion, le log aggregation et le messaging pour l'EDA.
      • Uber par exemple l'utilise pour gérer au total plus de 1000 milliards d'events par jour.
  • Parmi les usages qui permettent l'EDA, Kafka supporte :
    • Publish-subscribe : un emitter publie des events, et plusieurs consumers les consomment sans que ces noeuds se connaissent.
      • C'est notamment utilisé pour des microservices avec un faible couplage.
    • Log aggregation : un ensemble de sources publient des events sous forme de log (soit applicatifs, soit d'infrastructure), qu'on va ensuite agréger au sein du même topic, pour le consommer dans une DB optimisée pour la lecture, comme Elasticsearch ou HBase.
    • Log shipping : il s'agit de streamer des logs depuis une DB master vers un topic où plusieurs DB followers vont consommer et se mettre à jour.
      • Ce pattern permet notamment d'implémenter l'event sourcing.
    • SEDA pipelines : le Stage Event-Driven Architecture est l'implémentation d'une pipeline d'events, où on fait une opération à chaque étape, avant d'émettre un event modifié pour l'étape suivante.
      • C'est typiquement utilisé avec les data warehouses, data lakes, le reporting et autres outils de BI.
      • On peut voir le log aggregation comme une forme de SEDA.
    • CEP : le Complex Event Processing consiste en un composant qui consomme des events de multiples sources, et en extrait l'information pertinente.
      • Il a souvent besoin d'un stockage pour se rappeler les patterns déjà vus et y réagir.
      • Ça peut être par exemple pour le trading algorithmique, l'analyse des menaces de sécurité, l'analyse de fraude en temps réel etc.
    • Event-sourced CQRS : Kafka se place entre la DB master et les DBs de projection, en permettant de les alimenter chacune au travers du concept de consumer groups.
      • La différence avec le log shipping c'est que le log shipping opère plutôt à l'intérieur d'un subdomain, alors que le CQRS peut aussi opérer à travers les subdomains.

3 - Architecture and Core Concepts

  • Kafka est composé de plusieurs types de noeuds :
    • Broker nodes : ce sont les composants principaux de Kafka, ils s'occupent des opérations I/O et de la persistance.
      • Ces nœuds sont des processus Java.
      • Chaque partition est sous la responsabilité d'un nœud master qui peut écrire dedans, les followers en ont une copie et peuvent être lus.
        • Un même nœud peut être master pour certaines partitions, et follower pour d'autres.
        • L'ownership peut passer à un autre nœud en cas de besoin (opération spéciale qui le nécessite ou échec du nœud qui était master de la partition).
        • Concernant l'attribution de l'ownership, ça se fait d'abord en élisant un des nœuds comme cluster controller, puis celui-ci assigne l'ownership des partitions au gré des besoins.
      • Augmenter le nombre de nœuds brokers constitue un moyen de scaler Kafka.
        • On peut améliorer la durability en ayant plusieurs copies de chaque partition (jusqu'à autant que le nombre de nœuds).
        • On peut améliorer l'availability pour les données en lecture.
    • Zookeeper nodes : Zookeeper est un projet open source distinct de Kafka.
      • Ses nœuds sont chargés d'élire le broker qui sera le cluster controller, de garantir qu'il n'y en ait qu'un, et d'en réélire un s'il n'est plus opérationnel.
      • Ils fournissent aussi diverses métadonnées à propos du cluster, par exemple l'état des différents nœuds, des informations de quotas, les access control list etc.
    • Producers : les applications clientes qui écrivent dans les topics.
      • Un producer communique avec Kafka via TCP, avec une connexion par broker node.
    • Consumers : les applications clientes qui lisent des topics.
  • Le fonctionnement de Kafka se base sur des notions d'ordering venant de la théorie des ensembles (set theory).
    • Le total ordering consiste à avoir un ensemble d'éléments dont une seule configuration est possible.
      • On peut l'illustrer avec un set de nombres entiers { 2, 4, 6 }. Si on enlève l'élément 4, puis qu'on le remet, il ne pourra qu'être à la 2ème place, avant le 6 et après le 2.
    • Le partial ordering consiste à avoir un ensemble d'éléments ordonnés selon un critère spécifique, mais dont plusieurs configurations sont possibles pour satisfaire le critère.
      • Par exemple, si on a des entiers qu'on veut ordonner de manière à ce que le diviseur d'un nombre soit toujours après ce nombre, et qu'on a [ 2, 3, 4, 6, 9, 8 ], on peut tout autant les organiser en [ 3, 2, 6, 9, 4, 8 ].
    • La notion de causal order indique qu'on respecte le fait que certains éléments ont une relation happened-before entre eux qui est respectée, quel que soit leur ordre d'arrivée à destination.
      • Cette notion vient de l'étude des systèmes distribués (et non de la théorie des ensembles).
      • Elle est une forme de partial ordering.
      • Elle est la conséquence du fait qu'il n'y ait pas d'horloge commune à l'ensemble des nœuds d'un système distribué, et que les events peuvent arriver dans le mauvais ordre.
  • Les records sont l'unité principale de Kafka. Ils correspondent aux events.
    • Ils sont composés :
      • D'attributs assez classiques : la value qui peut être sous forme binaire, des headers pour donner des métadonnées, la partition associée au record, l'offset par rapport aux autres records de la partition, un timestamp.
        • La combinaison partition + offset permet d'identifier un record de manière unique.
        • L'offset est une valeur entière qui ne peut qu'augmenter, même s'il peut y avoir des gaps entre deux offsets qui se suivent (cf. compaction chapitre 14).
      • D'un champ binaire un peu plus inhabituel qui est la key, et qui est utilisée par Kafka pour associer les records avec une même partition.
    • Kafka est largement utilisé pour traiter des events à l'intérieur d'un bounded context, tout comme les events entre bounded contexts.
    • Il est aussi de plus en plus utilisé en remplacement des brokers traditionnels (RabbitMQ, ActiveMQ, AWS SQS/SNS, Google Cloud Pub/Sub etc.). Dans ce cas, les records ne correspondent pas forcément à des events, et on n'est pas forcément dans de l'EDA.
  • Les partitions sont l'unité de stream principale qui contient les records.
    • Les records d'une même partition sont totally ordered.
    • Les records publiés dans une partition par un même producer seront donc aussi causally ordered (la précédence respectée).
      • En revanche, si plusieurs producers publient dans la même partition sans eux-mêmes se synchroniser entre eux, les records de chaque producer seront causally ordered pour un même producer, mais ne le seront pas entre les producers (ça dépendra de qui l'a emporté pour publier plus vite).
      • Publier dans plusieurs partitions ne règle pas ce problème : les records de chaque producer ne seront pas causally ordered. Si on veut un tel ordre, il faut un seul producer.
  • Les topics sont des unités logiques qui regroupent des partitions.
    • Vu qu'il s'agit d'une union de partitions qui sont chacune totally ordered, les topics peuvent être considérés comme partially ordered.
      • On peut donc écrire dans les records de plusieurs partitions en parallèle, et n'assurer que l'ordre des records dans chaque partition.
    • On peut indiquer à la main la partition vers laquelle on veut publier un record, mais généralement on indique la key, qui sera hashée pour correspondre avec une partition donnée.
      • Dans le cas où on réduit le nombre de partitions, les messages peuvent être détruits.
      • Dans le cas où on augmente le nombre de partitions, on peut perdre l'ordre qu'on voulait conserver avec nos keys, puisque la fonction de hash redirigera vers une autre partition.
      • Même si on a un nombre de partitions supérieur au nombre de keys, il est possible que deux keys mènent vers la même partition.
        • La seule chose qui est garantie, c'est qu'avec la même key, et si le nombre de partitions ne change pas, l'ordre sera respecté.
  • Un consumer peut souscrire à un topic en tant que membre d'un consumer group, et bénéficier d'un mécanisme de load balancing avec d'autres consumers.
    • Le 1er consumer qui souscrit se voit assigner toutes les partitions. Quand un 2ème consumer souscrit au topic, il se voit assigner environ la moitié des partitions qui étaient assignées au 1er. et ainsi de suite.
    • Les consumers ne peuvent que lire les events sans impact sur eux.
      • Une des conséquences c'est qu'on peut en ajouter beaucoup sans stresser le cluster. Et c'est une des différences par rapport aux brokers classiques.
      • Ils maintiennent les offsets de là où ils en sont pour chacune des partitions qu'ils sont en train de lire.
      • Les consumers de différents consumer groups n'ont pas d'impact les uns sur les autres.
    • Kafka s'assure qu'il n'y a qu'un consumer d'un même consumer group qui peut lire dans une même partition.
      • Si un consumer ne lit plus de messages jusqu'à dépasser un timeout, Kafka assignera ses partitions à un autre consumer, considéré comme sain, du même groupe.
  • Pour que Kafka puisse réassigner une partition à un autre consumer en gardant le bon offset, ou redonner le bon offset à un consumer qui se reconnecte après s'être déconnecté, il faut que les consumers communiquent leurs offsets à Kafka.
    • On appelle ce processus committing offsets.
    • On peut avoir un contrôle sur le moment où on va faire ce commit, et donc agir sur la garantie de delivery des messages, c'est-à-dire le fait qu'ils soient intégralement traités.
      • On peut passer d'une stratégie at-most-once à une stratégie at-least-once en faisant le commit après l'exécution de la callback au lieu du moment où le message est pris par le consumer.
      • Par défaut, Kafka va faire un commit toutes les 5 secondes, sauf si un record est toujours en train d‘être exécuté, auquel cas il attendra la prochaine occasion 5 secondes plus tard.
        • On peut régler cette durée de 5 secondes à une autre valeur avec la configuration auto.commit.interval.ms.
        • Ça implique que si le record est exécuté, et que dans les quelques secondes après le cluster bascule la partition sur un autre consumer, on risque de ne pas avoir commité et de réexécuter la callback du record dans le nouveau consumer.
        • Si on veut avoir le contrôle sur le moment exact où on veut faire le commit, on peut désactiver le commit automatique (configuration enable.auto.commit à false), et le faire à la main dans le consumer.
    • Le commit peut se faire via un canal in-memory asynchrone pour ne pas bloquer le consumer, avec la possibilité de fournir une callback qui sera exécutée par Kafka quand le commit aura été pris en compte
      • Ou alors le consumer peut aussi utiliser un appel synchrone pour le commit.
    • Un cas classique est de traiter les records avec une stratégie at-least-once par batch, qu'on appelle poll-process loop :
      • Le consumer garde un buffer de records qu'il prefetch en arrière-plan.
      • Il traite les records un par un (ou parfois en parallèle avec un pool de threads si c'est OK d'un point de vue business).
      • Quand on arrive au dernier record, il fait le commit de l'offset.
      • Puis il prend le batch suivant et recommence.
  • Même si c'est moins courant, il est possible de souscrire un consumer sans qu'il soit membre d'un consumer group.
    • Dans ce cas, il ne bénéficiera pas des divers mécanismes associés aux consumer groups : load balancing, rebalancing en cas d'échec, détection de l'échec par inactivité, persistance de l'offset.
      • Il devra indiquer les couples topic/partition auxquels il souscrit, et devra persister ses propres offsets lui-même dans un store.
    • Il peut y avoir deux cas d'usages :
      • Le besoin d'avoir vraiment le contrôle sur la manière de consommer les messages, en stockant soi-même son offset etc.
        • Mais ce cas d'usage est très rare, et difficile à implémenter correctement.
      • Un consumer éphémère qui est là juste pour monitorer ou débugger un topic, sans avoir besoin de persister d'offsets.
        • C'est ce que fait par exemple l'outil Kafdrop qui permet de visualiser les messages présents dans les partitions via une interface web : à chaque fois il attache un consumer sans groupe.

4 - Installation

  • Il y a 4 méthodes pour installer Kafka (et Zookeeper) :
    • En utilisant les images Docker.
      • Si on choisit une autre méthode que Docker, on aura juste besoin d'avoir d'avoir un JDK d'installé.
      • La méthode Kafka dans Docker est la plus immédiate pour avoir Kafka qui tourne, mais elle est aussi connue pour être difficile à configurer si on veut personnaliser.
    • En utilisant un package manager (yum, apt, homebrew etc.)
    • En clonant le dépôt git et en installant depuis les sources.
    • En téléchargeant des binaires sur le site de Kafka.
      • Il suffit de télécharger un tar.gz et de le désarchiver, pour obtenir les exécutables de Kafka qu'on peut lancer avec notre JDK.
      • Le livre part là-dessus.
  • La configuration de Kafka peut se faire en changeant les fichiers de conf dans le dossier config/.
    • On peut voir les configs prises en compte dans les logs, à chaque fois qu'on démarre Kafka.

5 - Getting Started

  • On a du tooling livré avec Kafka sous forme de scripts shell pour le gérer en CLI.
    • On peut par exemple créer un topic puis y ajouter des records.
    • On peut changer des offsets pour un consumer group.
    • etc.
  • L'auteur déconseille de laisser Kafka créer automatiquement les topics (auto.create.topics.enable à true) pour plusieurs raisons :
    • Les valeurs par défaut de Kafka remontent à sa création, et n'ont pas forcément été pensés pour l'usage qu'il a en général aujourd'hui.
    • Quand on crée un topic, on devrait décider du nombre de partitions en fonction des critères de parallélisation. Donc un nombre par défaut ne va en général pas être satisfaisant.
    • La création de topic à la lecture est encore plus problématique, puisqu'on va avoir des lecteurs qui croient lire quelque chose et qui ne lisent rien.
  • Le lag est la différence entre l'offset qui a été commité par un consumer sur une partition donnée et le high water mark de la partition (c'est-à-dire le dernier record dispo à la consommation).
  • La suppression d'un topic est asynchrone, c'est-à-dire qu'elle sera effectivement réalisée quelque part dans le futur par Kafka, après qu'on l'ait demandée.
    • Pour nos tests d'intégration, il va donc falloir trouver des solutions :
      • 1 - Supprimer le consumer group, les offsets enregistrés, ou mettre les offsets au high water mark (tous les trois ont le même effet).
      • 2 - Tronquer les partitions en avançant le low water mark (le record le plus ancien disponible à la consommation).
      • 3 - Utiliser des noms de topics uniques, et les supprimer au fil de l'eau (si on ne les réutilise pas, le fait qu'ils soient supprimés de manière asynchrone ne pose problème).
        • Cette dernière option est celle recommandée par l'auteur.
  • Supprimer les offsets pour un consumer group et sur un topic donné, fait que la prochaine fois que ces consumers voudront consommer le topic, ils seront par défaut assignés au dernier record.
    • Ou au premier en fonction de l'option auto.offset.reset.
    • Si on supprimer un consumer group, c'est comme si on supprimait ses offsets pour l'ensemble des topics où il avait consommé des records.
  • L'essentiel des classes du client Java se résument à :
    • 1 - L'interface Producer, l'implémentation KafkaProducer, et la représentation du record ProducerRecord.
    • 2 - La même chose côté consumer : Consumer, KafkaConsumer, ConsumerRecord.
    • Et c'est à peu près la même chose pour les autres clients qui s'en inspirent.
  • L'option enable.idempotence à la création du producer permet de garder des séquences pour les couples producer/partition, pour s'assurer qu'un record n'est pas publié deux fois ou dans le mauvais ordre, dans le cas où il y aurait un timeout pendant une publication.
    • L'auteur conseille de l'activer.
  • Il faut bien penser à fermer la connexion, sinon on risque de monopoliser des ressources côté client et serveur.

6 - Design Considerations

  • A propos de la séparation des responsabilités entre producers et consumers.
    • Dans le cas d'un event-oriented broadcast, c'est le producer qui a la responsabilité de la configuration du topic et du format des données publiées.
      • C'est utile pour que les producers ne connaissent pas du tout les consumers, et qu'on reste sur du couplage faible.
      • Le fait qu'on puisse avoir plusieurs consumers aux intérêts différents montre qu'il est plus pertinent que le producer ait la responsabilité des messages.
        • Pour autant, on peut se demander comment faire en sorte que les consumers soient tous satisfaits par le modèle proposé par le producer.
        • On peut mettre en place du topic conditioning, c'est-à-dire compartimenter les problèmes liés à chaque consumer avec une architecture SEDA, contenant pour chaque consumer (ou groupe de consumers aux intérêts communs), un module de conditionnement publiant à son tour dans un topic pour le consumer visé.
          • Cette solution permet de séparer les responsabilités, et laisser le producer avec son modèle, et chaque consumer avec le sien.
    • Pour du peer-to-peer messaging, c'est au contraire le consumer qui a la responsabilité de la configuration du topic et du format de données.
      • Le consumer envoie des commandes au producer, pour que celui-ci lui prépare des données qu'il mettra dans Kafka.
    • Dans tous les cas, les flows doivent être designés avec soin, en prenant en compte les besoins des producers et des consumers.
  • Concernant la question du parallélisme dans le cas où on veut laisser plusieurs consumers consommer depuis plusieurs partitions, il y a des facteurs à prendre en compte pour obtenir quelque chose de performant.
    • L'organisation des partitions d'un topic est consumer-driven, du fait du design de Kafka. Le consumer se pose la question de la bonne clé de partitionnement.
      • En pratique, le consumer doit trouver une entité suffisamment stable pour que son identifiant puisse servir de clé de partitionnement.
      • Par exemple, si on a des tournois de football, avec des events représentant ce qui se passe dans le jeu, on peut prendre le match comme entité stable, et avoir tous les events d'un même match ordonnés dans une même partition.
      • Si on garde l'exemple mais qu'un consumer est intéressé par le déroulement du tournoi, alors il nous faudra garder l'ordre des matchs, et donc choisir comme entité stable le tournoi.
        • Mais on aura alors moins de possibilités de parallélisation puisqu'on ne pourra plus paralléliser les matchs.
        • L'autre possibilité c'est de laisser le consumer qui a besoin de l'ordre des tournois le reconstituer lui-même, avec des infos qu'il a dans les events.
    • Se pose ensuite la question du nombre de partitions du topic.
      • Pour rappel on ne peut pas enlever de partitions sans détruire de messages, et en rajouter fait que la fonction de hash n'envoie plus dans les mêmes partitions qu'avant le rajout (donc il vaut mieux éviter si on veut garder l'ordre des messages).
      • Une solution peut être d'avoir dès le début un nombre suffisamment élevé de partitions par topic, pour ne jamais avoir à les augmenter.
        • Attention cependant, trop de partitions peut causer des problèmes de performance.
        • Confluent recommande un nombre de partitions maximal par broker de 100 x b x r partitions (avec b le nombre de brokers du cluster, et r le facteur de réplication).
        • Si on atteint le nombre maximal de partitions qu'on avait prévu, une technique peut être de créer un nouveau topic avec plus de partitions, et de copier l'ensemble des messages de l'ancien topic vers le nouveau. Ça nécessite un peu d'effort.
    • Le nombre de consumers dans un consumer group doit être au moins aussi grand que le nombre de partitions si on veut profiter du maximum de parallélisme.
      • Par contre, allouer un tel nombre peut aussi mener à du gâchis de ressources, vu que le broker ne fonctionne pas forcément en flux tendu.
      • On peut alors plutôt allouer un nombre variable de consumers au groupe, basé sur l'activité du cluster.
    • Enfin on peut envisager d'avoir du parallélisme à l'intérieur des consumers, en gérant plusieurs threads, pour traiter plusieurs records en même temps.
  • A propos de la question de la delivery des messages.
    • On parle ici de “delivery” au sens où les messages sont traités jusqu'au bout par les consumers, pas juste du fait qu'ils soient disponibles à la lecture (ça, ils le restent de toute façon pour tous les consumers dès lors que la publication a marché).
    • On peut avoir une delivery at-most-once, en faisant le commit dès le début de la lecture.
      • C'est utile dans les cas où la perte occasionnelle de donnée n'est pas grave, et ne laisse pas le système consommateur dans un état inconsistant de manière permanente.
      • Ca peut être aussi parce que faire l'action deux fois pose problème, alors le que le fait de la rater de temps en temps non.
    • On peut avoir une delivery at-least-once, en ne faisant le commit qu'après exécution complète de la callback du consumer.
      • C'est utile dans le cas où la perte de donnée n'est pas acceptable, et où on est prêt à recommencer certains messages pour l'éviter.
      • Par contre on doit être prêt à avoir la callback potentiellement exécutée plusieurs fois.
    • Et enfin, si on veut une delivery exactly-once, on ne peut malheureusement pas compter sur le message broker à lui seul : on doit s'assurer d'avoir un flow idempotent.
      • On pourrait le vouloir pour avoir à la fois la consistance parce que la perte de donnée ou le fait de ne pas faire une action n'est pas acceptable, mais en même temps où le fait de le faire deux fois n'est pas acceptable non plus.
      • Pour réussir ça, on a besoin d'avoir une idempotence de bout en bout, c'est à dire que :
        • La callback du consumer ne doit faire que des changements idempotents. Par exemple un update en DB qui ne change pas l'état de la DB quand il est joué plusieurs fois.
        • Le consumer doit vérifier si les side-effects qu'il fait ont déjà été faits pour ne pas les refaire une 2ème fois. Par exemple, Kafka offre un mécanisme de transaction qui permet de ne publier qu'une fois dans un topic sortant pour un message d'un topic entrant.
        • Dans le cas où on ne peut pas savoir si le side-effect a déjà été fait ou pas, il faut que le side-effect lui-même soit rendu idempotent de bout en bout.

7 - Serialization

  • Le client Java a des serializers de base et une interface à implémenter pour créer des serializers Kafka custom.
    • Pour l'auteur, même si cette approche est idiomatique, il vaut mieux avoir Kafka et tout ce qui y est lié isolé dans une couche de messaging pour que la logique business n'y soit pas liée et soit testable.
      • L'auteur préfère laisser la sérialisation côté logique business, et donc conseille de ne pas utiliser les serializers custom de Kafka dans ce cas.
    • Et de la même manière, les choses spécifiques à Kafka comme le fait de mettre l'ID des customers comme clé, doivent être dans la couche de messaging pour pouvoir être mis en commun entre les use cases.
  • Quand on est en mode commit manuel, on peut appeler la fonction qui fait le commit de manière asynchrone sans l'attendre.
    • Ça aura pour effet d'avoir plus d'offsets pas encore commités mais un throughput plus élevé.
    • On respecte quand même le at-least-one delivery.
  • Dans le cas où on utilise le mécanisme de poll-process loop (où on consomme les messages par batch), le client Java va avoir deux threads : un pour aller chercher plus de records et un autre pour faire le processing des records qui sont déjà là.
    • Il s'agit là d'un mécanisme de pipelining, où la 1ère étape va chercher de la donnée pour la mettre dans le buffer suivant jusqu'à ce que le buffer soit plein, auquel cas elle attend avant de continuer.
    • L'auteur propose une version encore plus parallélisée, en ajoutant une 3ème étape dans la pipeline pour séparer la désérialisation du reste du traitement du message.
      • L'avantage c'est que ça peut augmenter le throughput, mais l'inconvénient c'est une utilisation plus intensive du CPU.
      • Il faut créer un thread à la main, et gérer la communication inter-thread à travers un buffer, avec tous les edge cases liés au parallélisme.
      • Selon l'auteur, cette technique a du sens parce que :
        • L'utilisation de Kafka est souvent associée à des cas d'usages qui ont besoin de performance.
        • Elle ajoute de la complexité, mais qu'on n'a à faire qu'une fois et qu'on peut isoler dans un adapter qu'on réutilise.
      • Côté publisher ça aurait moins de sens vu que la sérialisation est moins coûteuse que la désérialisation.
  • Il peut être pertinent de filtrer des messages au niveau de la couche adapter du consumer Kafka.
    • Par exemple, si le topic contient plus de messages que ce que le use-case qui le consomme peut ou veut désérialiser.
      • Ça peut être parce que le producer publie les messages plusieurs fois, en indiquant la version du schéma dans le header, et qu'on ne veut en lire qu'une version sans avoir à parser les autres.
      • Ça peut aussi être un topic qui contient plusieurs types de messages, dont on ne veut traiter qu'un type.

8 - Bootstrapping and Advertised Listeners

  • Chaque partition a un leader broker, et n follower brokers qui contiennent sa donnée (avec n + 1 étant le replication factor).
  • Pour pouvoir écrire un record, un client publisher doit l'envoyer au broker leader de la partition qui l'intéresse.
    • Le leader transférera aux followers, mais on ne peut pas compter sur un des followers pour transférer d'abord au leader.
    • Ça veut donc dire que le client devra avoir une connexion directe avec quasi tous (ou même tous) les brokers, vu que tous les brokers peuvent être des leaders de partitions et qu'il risque de vouloir en lire plusieurs.
    • Les brokers sont au courant de la topologie du cluster parce qu'ils ont l'info partagée via ZooKeeper.
      • Le client peut donc demander la liste des adresses IP des brokers à n'importe lequel d'entre eux. Et donc, pour peu qu'il ait au moins une adresse de broker valide, il peut réobtenir toutes les autres.
      • Le client est initialement fourni avec une bootstrap list des brokers, et ensuite se débrouille pour la mettre à jour en leur demandant.
    • Cette technique de base de demander la liste des adresses à au moins un broker dont on a l'adresse valide n'est pas super fiable : si le client n'a plus aucune adresse valide parce qu'elles ont toutes changé, il est coincé.
      • Ce que fait la communauté pour répondre à cette problématique c'est d'utiliser des alias DNS, pointant vers les bonnes adresses IP des brokers.
      • La spécification DNS permet même d'indiquer un seul nom qui sera associé à une liste d'adresses IP pointant vers chacun des brokers.
  • Il y a un problème classique de configuration auquel beaucoup de monde se heurte, et qui empêche la connexion du client aux brokers : le client demande la liste des adresses, et le broker lui répond des adresses en localhost.
    • La solution est de configurer les advertised listeners dans le fichier config/server.properties.
    • Les propriétés sont initialement commentées dans le fichier, et donc c'est les valeurs par défaut qui s'appliquent (on peut les retrouver dans la documentation de Kafka (opens in a new tab)).
    • advertised.listeners permet d'indiquer les URI qui seront envoyées aux clients qui demandent la liste des adresses des brokers. C'est ça qu'il faut configurer avec le bon hostname pour résoudre le problème de config.
    • Dans le cas où on a des clients situés dans des environnements réseau différents, on a besoin de leur advertiser des adresses différentes pour les mêmes brokers.
      • C'est le cas par exemple si on a un VPC (virtual private cloud) avec le cluster Kafka et des clients, et d'autres clients situés à l'extérieur et ne pouvant pas accéder aux adresses IP internes au VPC.
      • Dans ce cas, on va pouvoir configurer plusieurs URI (ou plutôt sockets) sur lesquels écoute chaque broker (dans listeners), et plusieurs URI qui sont advertised (dans advertised.listeners).
        • Il faut faire attention à indiquer des ports différents pour chacune des URI si on ne veut pas de problèmes.
  • Les problématiques de bootstrapping se posent largement dans les environnements conteneurisés. La simple utilisation de docker-compose nous amène à avoir l'équivalent d'un VPC interne aux containers lancés par docker-compose, et un mapping de port vers la machine hôte.
    • Exemple de config Kafka dans un docker-compose :
      kafka:
        image: bitnami/kafka:2
        ports:
          - 9092:9092
        environment:
          KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
          ALLOW_PLAINTEXT_LISTENER: "yes"
          KAFKA_LISTENERS: >-
            INTERNAL://:29092,EXTERNAL://:9092
          KAFKA_ADVERTISED_LISTENERS: >-
            INTERNAL://kafka:29092,EXTERNAL://localhost:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >-
            INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
        depends_on:
          - zookeeper
    • On définit ici deux protocoles propres à Kafka (et associés au type PLAINTEXT, donc non sécurisés) : un qu'on appelle INTERNAL pour l'URI depuis le réseau interne des containers docker-compose, et un autre qu'on appelle EXTERNAL pour le réseau de l'hôte.
    • KAFKA_LISTENERS est l'équivalent de listeners dans config/server.properties, c'est-à-dire les sockets sur lesquels le broker écoute.
      • On choisit deux ports différents qui permettent de différencier les connexions internes et externes, et on indique qu'on écoute sur toutes les interfaces possibles (en n'indiquant aucun hostname ni adresse IP).
    • KAFKA_ADVERTISED_LISTENERS est l'équivalent de advertised.listeners, c'est-à-dire les adresses URI communiquées aux clients pour joindre le broker.
      • On indique bien le hostname localhost aux clients du réseau externe, et le hostname kafka aux clients du réseau interne (le nom des containers sert aussi de hostname dans docker-compose).
    • KAFKA_INTER_BROKER_LISTENER_NAME permet d'indiquer quel protocole doit être utilisé pour la communication avec les autres brokers du cluster.
    • depends_on permet d'indiquer l'ordre dans lequel on start les containers dans docker-compose.

9 - Broker Configuration

  • La configuration peut se faire sur 4 entités de Kafka : les brokers, les topics, les clients et les users.
  • Il existe une configuration statique et une configuration dynamique.
    • Historiquement la configuration dynamique a été introduite pour faciliter l'administration de gros clusters, et pour ne plus avoir besoin de restart.
      • La communauté a décidé qu'enlever la configuration statique était trop radical, donc elle a été gardée en fallback.
    • La configuration statique se fait en changeant le fichier config/server.properties et en redémarrant le broker.
    • La configuration dynamique se fait via l'admin API de Kafka, au niveau du broker ou du cluster entier.
      • Elle est stockée dans Zookeeper, mais ne nécessite pas la communication directe avec Zookeeper pour faire des modifications de config.
  • Côté précédence, c'est d'abord la config dynamique par entité qui prend le pas, puis la config dynamique au niveau du cluster, et enfin la config statique.
    • Si rien n'est défini, les valeurs par défaut s'appliquent.
    • Dans le cas de propriétés dépréciées et remplacées par d'autres, les propriétés dépréciées sont prises en compte si elles sont utilisées, et sinon c'est la valeur par défaut des nouvelles propriétés qui est prise en compte.
  • Quelques infos sur les changements de config des brokers.
    • Sur la configuration statique.
      • Toutes les propriétés de config/server.properties sont optionnelles, sauf zookeeper.connect qui contient la liste des adresses des nœuds ZooKeeper.
      • Il est considéré comme une bonne pratique de spécifier la propriété broker.id qui représente l'identifiant du broker. Si on ne le fait pas, ZooKeeper assignera un ID automatiquement à chaque broker (par défaut en commençant par 1001).
        • Pour changer cette propriété, il faut :
          • D'abord arrêter le broker.
          • Faire le changement dans server.properties.
          • Faire le changement dans le fichier meta.properties (qui se trouve dans le dossier de log du broker), ou même supprimer le fichier meta.properties qui sera régénéré.
            • Le dossier de log contient des fichiers essentiels avec l'info des partitions et des records (rien à voir avec du logging, on parle des données de Kafka).
            • Son path est configurée avec l'option log.dirs, par défaut c'est /tmp/kafka-logs.
          • Redémarrer le broker.
    • Sur la configuration dynamique.
      • On peut changer la config via l'outil CLI fourni par Kafka sous forme de script bash : kafka-configs.sh, ou via une librairie cliente tierce qui va se connecter à Kafka.
        • Par exemple pour afficher la liste des configurations dynamiques pour le broker 1001 sur un Kafka qui tourne localement :
          ./kafka-configs.sh
              --bootstrap-server localhost:9092
              --entity-type brokers
              --entity-name 1001
              --describe
      • Il faut faire attention avec les configurations dynamiques, on peut facilement mettre un cluster par terre si on fait une mauvaise manip.
        • Quand on modifie une config pour tout le cluster, c'est une bonne pratique de la modifier d'abord pour un broker, au cas où elle aurait un impact non souhaité qui serait du coup plus limité.
  • A propos de la configuration des topics.
    • Ils peuvent être configurés statiquement via config/server.properties, ou dynamiquement au niveau du cluster (une configuration de topic par broker n'aurait pas de sens).
      • On peut aussi modifier dynamiquement certaines propriétés par topic.

10 - Client Configuration

  • La configuration du client est beaucoup plus sensible, en partie parce qu'elle tombe sous la responsabilité des développeurs applicatifs.

    • En général la configuration des brokers se fait par des personnes spécialistes de l'infra, gérant d'autres éléments d'infrastructure, et connaissant la manière de gérer les risques.
      • On voit aussi de plus un shift vers les versions de serveurs Kafka pré-configurées. Ça ne peut pas être le cas des clients.
  • La plupart des problèmes avec Kafka viennent d'une mauvaise utilisation côté client, parce que les développeurs ne le connaissent pas assez bien.

    • Exemple : il est notoire que Kafka offre des garanties importantes pour ce qui est de la durabilité des records. Mais en réalité ça dépend des paramètres.
      • Il y a déjà la question du stockage, elle-même influencée par le nombre de brokers.
      • Et ensuite il y a des configurations côté client :
        • Le replication factor et quelques autres pour ce qui est de s'assurer que la donnée reste en cas de problème avec certaines machines.
        • Le nombre d'acknowledgements que le broker leader de la partition doit demander avant de considérer le record comme validé, et le fait d'attendre soi-même l'acknowledgement du leader avant de considérer le message comme publié.
    • Les développeurs imaginent aussi que le comportement par défaut de Kafka optimise la garantie d'ordre et de delivery des records. Mais ces valeurs sont issues de l'utilisation initiale de Linkedin qui avait surtout besoin de performance dans son cas d'usage.
  • La 1ère règle de l'optimisation avec Kafka est : ne le faites pas.

    • La plupart du temps, les configurations qui offrent des garanties vis-à-vis des records n'ont pas un si grand impact que ça. On peut attendre d'en avoir vraiment besoin.
  • Pour ce qui est des configurations communes à tous les types de clients (producer, consumer, admin).

    • bootstrap.servers permet de contacter les brokers, mais ensuite le plus important c'est que les brokers envoient les bonnes adresses (cf. le chapitre précédent).
    • client.dns.lookup donne la possibilité d'utiliser des alias DNS liés à plusieurs adresses.
    • client.id permet de définir l'identifiant du client, comme on l'a fait pour le serveur dans le chapitre d'avant. Ça permet la traçabilité, et la gestion de quotas.
    • retries indique le nombre de fois qu'on va recommencer une opération qui se termine par une erreur transiente, c'est-à-dire qui peut potentiellement ne pas se reproduire en réessayant.
      • retry.backoff.ms indique la durée d'attente avant de réessayer.
      • Par défaut on bourrine, en recommençant un nombre infini de fois toutes les 100 ms.
      • L'autre possibilité c'est en gros de limiter les retries, en ayant conscience que du coup on se retrouvera à un moment où un autre à avoir des opérations qui sont en erreur pour des raisons temporaires. Mais on n'aura pas bloqué pendant longtemps.
    • Quand on veut utiliser Kafka dans des tests d'intégration, il faut prendre en compte que le fait de le lancer dans un environnement virtualisé type Docker va ralentir considérablement son démarrage.
      • Le fait que Kafka écoute sur le port ne suffit pas pour qu'il soit prêt à accepter des requêtes. Il peut donc falloir attendre un certain temps au début des tests pour qu'il démarre.
      • Et c'est encore pire avec Docker sur MacOS.
  • Pour ce qui est de la configuration du producer.

    • acks permet d'indiquer le nombre d'acknowledgements qu'on veut attendre de la part du broker leader avant de considérer que le message est publié.

      • 0 indique qu'on ne veut pas attendre du tout.
      • 1 indique qu'on veut attendre que le leader lui-même ait écrit le record dans son log à lui.
        • C'est la valeur par défaut si enable.idempotence est false.
      • -1 permet d'indiquer qu'on veut attendre que le leader mais aussi tous les followers aient écrit le record dans leurs log.
        • C'est la valeur par défaut si enable.idempotence est true.
    • max.in.flight.per.connection indique le nombre de records qu'on veut pouvoir publier (par défaut 5), avant d'avoir à attendre le nombre d'acknowledgements qu'on a indiqué dans acks.

      • Augmenter ce nombre permet de se prémunir contre la lenteur du réseau, vu qu'attendre la confirmation à chaque fois qu'on veut publier nous empêche de publier vite.
      • Par contre, on risque de ne pas publier dans le bon ordre pour les records entre deux acknowledgements.
        • Il suffit qu'un record A ait une erreur transiente qui est retentée puis réussie, mais que le record suivant B ait réussi immédiatement et avant le record A. Ce qui inverse l'ordre de publication de A et B.
        • Pour ne pas avoir le problème il faudrait soit avoir max.in.flight.per.connection à 1 (attendre la confirmation à chaque publication), soit retries à 0 (ne jamais réessayer les erreurs transientes).
        • En réalité il y a une 3ème option qui est d'activer enable.idempotence, où Kafka va utiliser un mécanisme qui remet le bon ordre pour les records qui arrivent avec le mauvais ordre.
    • enable.idempotence.

      • Permet de garantir que :
        • Les records soient publiés au plus une fois (donc dédupliqués).
        • Les records sont publiés dans l'ordre indiqué par le client producer.
        • Les records sont d'abord persistés sur l'ensemble des réplicas avant d'envoyer l'acknowledgement.
      • Il nécessite que (si ces propriétés ne sont pas renseignées, elles seront mises aux bonnes valeurs par défaut, mais il ne faut juste pas de conflit) :
        • max.in.flight.per.connection soit entre 0 et 5.
        • retries soit plus grand que 0.
        • acks soit à -1.
      • Pour ce qui est du problème de duplication, il peut se produire dans le cas où le producer subit un timeout alors que le message a été pris en compte par le serveur, mais avant qu'il ne reçoive l'acknowledgement. Il va donc réessayer d'envoyer le message juste après, ce qui fera un doublon.
      • Le mécanisme passe par l'attribution à chaque message par le producer, d'un ID qui s'incrémente monotoniquement. Et le broker maintient le dernier ID traité pour chaque couple [ producer ID, partition où on publie le record ].
        • Si le record qui arrive est identifié comme étant déjà arrivé, il est ignoré comme duplicata.
        • Si le record qui arrive a un ID plus grand qu'un incrément de 1 par rapport au dernier message traité, alors le message est considéré comme étant dans le mauvais ordre, et le broker répond une erreur indiquant au producer qu'il faut le requeuer.
    • compression.type permet d'indiquer l'algo de compression qui sera utilisé par le producer (détaillé dans le chapitre 12).

      • Parmi les possibilités :
        • none
        • gzip
        • snappy (optimisé pour le throughput, au dépend de la compression)
        • lz4 (optimisé aussi pour le throughput, surtout la décompression)
        • zstd (nouvel algo, qui est censé faire un bon ratio throughput / performance).
    • key.serializer et value.serializer servent à indiquer la sérialisation des clés et valeurs des records (cf. le chapitre 7).

    • partitioner.class permet d'indiquer une classe Java qui va définir une manière différente de la manière par défaut d'associer les records et les partitions.

      • La manière par défaut va, dans l'ordre :
        • 1 - Si la partition est indiquée explicitement dans la publication du record, elle sera utilisée.
        • 2 - Sinon, si on a indiqué une clé, la clé sera hashée pour déterminer la partition.
        • 3 - Sinon, si le batch courant a une partition qui lui est assignée, on utilise cette partition.
        • 4 - Sinon, on assigne une partition au batch et on l'utilise.
          • Le 3 et 4 ont été introduits dans Kafka plus récemment, et permettent, dans le cas où on n'a pas de préférence d'ordre liée à une clé, de n'impliquer qu'un broker pour les records d'un batch. Ça améliore les perfs par 2 ou 3, tout en assurant une distribution entre brokers quand on a un grand nombre de batchs.
      • Le client Java a aussi deux autres classes disponibles :
        • RoundRobinPartitioner permet d'alterner entre les brokers, sans prendre en compte la clé.
        • UniformStickyPartitioner permet de garder les records d'un même batch pour une même partition, sans prendre en compte la clé.
      • On peut aussi donner une classe perso, mais l'auteur conseille d'envisager aussi d'encoder notre ordre custom dans une clé.
    • interceptor.classes permet de définir des classes Java qui vont faire quelque chose de particulier à l'envoi et à l'acknowledgement.

      • Ça peut être utile pour le côté “plugin” réutilisable, parce qu'on est sur de l'AOP (Aspect Oriented Programming).
      • On peut par exemple l'utiliser pour ajouter une couche qui fait du logging, du tracing, de l'analyse de message pour empêcher la fuite de données etc.
      • Attention par contre : les exceptions dans les interceptors ne sont pas propagées.
      • Globalement si on y met quelque chose, il vaut mieux que ce soit du code simple et non bloquant.
    • max.block.ms permet d'indiquer un timeout au processus de publication (par défaut 60 secondes).

    • batch.size permet d'attendre d'avoir une certaine taille de messages (par défaut 16 KB) avant d'envoyer un batch de publication.

      • linger.ms fait la même chose au niveau du temps (par défaut 0 ms) en ajoutant un temps minimal à attendre avant d'envoyer un autre batch.
      • L'intérêt est de faire moins de requêtes au serveur et donc d'augmenter le throughput.
    • request.timeout permet d'indiquer un timeout vis-à-vis de la réponse du broker pour faire l'acknowledgement (par défaut 30 secondes), avant de réessayer ou d'indiquer la publication comme échouée.

    • delivery.timeout permet d'indiquer un temps global pour une requête de publication, qui englobe l'envoi, les retries, et la réponse du serveur.

      • Par défaut, c'est 120 secondes.
      • Il doit être supérieur aux autres timeouts réunis.
    • transaction.id et transaction.timeout.ms permettent de gérer le comportement des transactions (cf. le chapitre 18).

  • Pour ce qui est de la configuration du consumer.

    • key.serializer et value.serializer servent à indiquer la désérialisation des clés et valeurs des records (cf. le chapitre 7).
    • interceptor.classes permet de faire la même chose que côté consumer, en traitant les records par batch.
    • Une des choses les plus importantes à régler, c'est la taille de ce qu'on va aller chercher en une requête. Ça se configure en plusieurs propriétés.
      • Plus on prendra de données, et plus le throughput sera grand, mais moins on aura un bon délai de propagation de bout en bout d'un record.
      • La propriété timeout donnée à Consumer.poll() permet de limiter son temps d'exécution.
      • fetch.min.bytes (par défaut 1) permet de demander au broker d'attendre d'avoir au moins un minimum de données à envoyer avant de répondre.
        • En réalité, le broker doit quand même envoyer une requête même s'il n'a pas assez de données, dans le cas où il dépasse un timeout fixé par fetch.max.wait.ms (par défaut 500 ms).
      • fetch.max.bytes (par défaut 50 MB) indique au broker à partir de quelle taille il doit arrêter d'ajouter des données.
        • Vu qu'un record à lui seul (et donc à fortiori un batch) peut de toute façon dépasser cette taille, la limite n'est qu'indicative.
        • La même propriété limite existe pour la taille des partitions : max.partition.fetch.bytes (par défaut 1 MB).
          • Cette propriété permet de limiter l'impact des partitions “gourmandes”, en laissant de la place aux partitions qui ont moins de données.
        • Intéressant à savoir : les brokers ne font en général pas de traitement sur les batchs. Les batchs sont envoyés par les producers, stockés tels quels, et envoyés tels quels aux consumers. C'est un choix de design de Kafka pour garantir une grande performance.
      • max.poll.records (par défaut 500) permet de limiter le nombre de records retournés par Consumer.poll().
        • Contrairement aux autres propriétés, celle-ci n'impacte pas le broker. C'est le client qui reçoit le même nombre de records par batch, et il va lui-même limiter ceux qu'il rend disponible. Il bufferise les autres pour les rendre disponibles à l'appel suivant.
        • Elle est là pour éviter que le client n'ait à traiter trop de records, et ne puisse pas appeler à nouveau poll() avant max.poll.interval.ms.
    • group.id permet d'indiquer le groupe d'un consumer. Si on ne le fournit pas, il deviendra sans groupe, et ne pourra pas bénéficier des mécanismes de d'assignation automatique de partition, détection des échecs, ni faire de commits au serveur pour sauvegarder son offset.
    • group.instance.id consiste à indiquer un identifiant à un consumer, unique dans son groupe, rendant le consumer static. L'effet est que si le consumer n'est plus là, sa partition n'est pas réassignée, mais reste en attente de son retour.
      • C'est pour éviter les rebalancing trop fréquents dans un contextes de manque d'availability transient.
      • Pour en savoir plus : chapitre 15.
    • La détection d'échecs est contrôlée par la combinaison de heartbeat.interval.ms, session.timeout.ms et max.poll.interval.ms.
      • Ce sujet fait partie des sujets délicats, source de nombreux problèmes.
      • heartbeat.interval.ms (par défaut 3 secondes) contrôle la fréquence à laquelle le consumer envoie des heartbeats.
      • Le broker coordinator du groupe de son côté vérifie que le consumer n'envoie pas son prochain heartbeat après le délai de session.timeout.ms (par défaut 10 secondes). Sinon il l'expulse et réassigne ses partitions dans le groupe.
      • max.poll.interval.ms (par défaut 5 minutes) est le délai maximal pour qu'un consumer rappelle poll(). S'il ne l'a pas fait, il va lui-même arrêter d'envoyer des heartbeats et demander à quitter le groupe.
        • Si le consumer est statique, il arrête les heartbeats mais ne demande pas à quitter le groupe. Il sera évincé par le broker s'il dépasse la session.timeout.interval sans avoir réémis de heartbeats.
        • Le but de ce comportement est d'éviter les situations où plusieurs consumers traitent les mêmes messages.
    • auto.reset.offset permet d'indiquer ce qui se passe quand un consumer n'a pas d'offsets pour la partition qu'il consomme.
      • Les options sont : earliest pour partir du low water mark, latest pour partir du high water mark, et none pour renvoyer une exception.
      • Les offsets sont stockés par le group coordinator dans un topic nommé __consumer_offsets. Ce topic a un temps de rétention comme n'importe quel topic (par défaut 7 jours).
      • L'offset peut ne pas exister si :
        • 1 - C'est le début de la formation du groupe et que la partition n'a pas encore été lue par lui.
        • 2 - Quand rien n'a été consommé sur cette partition par le groupe (et donc aucun offset n'a été commité dans __consumer_offsets) depuis plus longtemps que le délai de rétention de __consumer_offsets.
        • 3 - Quand on a un offset qui pointe vers un record qui est dans un topic où le délai de rétention est plus faible, et a été dépassé. Donc l'offset pointe vers le vide.
    • enable.auto.commit permet d'indiquer si le commit automatique est activé pour un consumer. Il s'agit d'envoyer un commit jusqu'au dernier record traité par le dernier l'appel à poll(), pour mettre à jour son offset.
      • Par défaut le client commit toutes les 5 secondes (temps réglable avec auto.commit.interval.ms).
      • Si ça marchait vraiment comme ça (tel que le dit la doc), le client mettrait à jour son offset au dernier record reçu dans le batch envoyé par le dernier appel à poll(), alors même qu'il n'a pas forcément terminé de traiter le batch.
        • En réalité, l'implémentation règle le problème en envoyant le commit dans le même thread que celui qui traite les records, et seulement après que le batch ait été traité.
        • Mais ce comportement n'est pas garanti vu que la doc ne dit pas ça, Kafka pourrait à tout moment mettre à jour le comportement pour faire le commit dans un autre thread toutes les 5 secondes.
        • Pour éviter les problèmes, l'auteur conseille de faire le commit à la main.
    • isolation.level permet d'indiquer le type de comportement d'une transaction vis-à-vis du consumer.
      • La valeur read_uncommitted va renvoyer tous les records sans prendre en compte les transactions.
      • La valeur read_committed va renvoyer les records qui ne font pas partie des transactions, et ceux qui font partie de transactions validées, mais pas ceux qui font partie de transactions qui ne sont pas encore validées.
        • Pour garantir l'ordre, tous les records qui doivent se trouver après les records qui sont dans des transactions non validées, seront aussi bloqués le temps de la transaction.

11 - Robust Configuration

  • Kafka fait le choix d'émettre un warning dans le cas où on donne un mauvais nom de propriété de configuration.
    • Pour éviter les typos, on peut utiliser les constantes pour donner les valeurs.
    • NDLR : en TypeScript les clients sont typés.
  • Si la propriété vient d'un fichier de config qui n'est pas du code, il n'y aura pas de check à la compilation.
    • Dans ce cas, il nous faut vérifier le contenu au runtime.
    • L'auteur propose de faire une classe de validation, qui propose des méthodes de type fluent chaining.
      final var config = new TypesafeProducerConfig()
        .withBootstrapServers("localhost:9092")
        .withKeySerializerClass(StringSerializer.class)
        .withValueSerializerClass(StringSerializer.class)
        .withCustomEntry(
          ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1
        );

12 - Batching and Compression

  • Les batchs sont traités par Kafka comme un processus de bout en bout : le producer envoie les records par batchs, ils sont stockés comme tels, puis envoyés au consumer sous le même format.
    • Ca permet de recourir à la zero-copy optimization, où les données sont copiées depuis le réseau vers le disque, puis à nouveau vers le réseau, sans que le CPU n'ai eu à intervenir pour transformer la donnée.
      • Dans le cas où TLS serait activé, la zero-copy optimization ne serait plus vraiment possible puisqu'il faudrait au moins déchiffrer ce qu'envoie le producer et chiffrer ensuite pour envoyer au consumer, ce qui utilise du CPU proportionnellement à la donnée.
  • Ce processus de création de batchs arrive quand il y a beaucoup de records à traiter successivement : Kafka va batcher les records qui sont en attente d'être envoyés (en limitant la taille des batchs à batch.size). Quand le client veut publier au compte goutte, il ne fait pas de batchs.
    • linger.ms peut permettre d'avoir plus souvent des batchs : pendant ce temps qu'on attend, des records peuvent s'accumuler pour être batchés.
    • Kafka compte beaucoup sur du fine tuning fait par des admins pour la situation précise dans lequel il est utilisé.
  • Le batching a encore plus d'intérêt quand on utilise la compression.
    • Il n'est pas inhabituel d'obtenir des ratios de compression entre x5 et x7 sur du JSON.
    • L'essentiel de la performance de compression est obtenue déjà avec de petits batchs.
    • La compression est réalisée par le producer, et la décompression dans le consumer, donc ça a l'avantage de ne pas mettre de charge sur le serveur.
      • Le serveur offre aussi la possibilité de modifier la compression de son côté si on le veut vraiment : avec la propriété compression.type côté broker, qui a par défaut la valeur producer, et peut prendre une valeur de type de compression (gzip, snappy etc.).
    • L'auteur recommande de toujours activer la compression pour les records textuels et binaires (sauf si on sait qu'ils ont une très grande entropie, c'est-à-dire que leur contenu est très variable et difficilement prévisible, donc difficilement compressible).
    • Côté algo, il conseille les heuristiques suivantes :
      • Si on a des clients legacy (avec une version inférieure à 2.1.0) :
        • De base LZ4.
        • Si le réseau est identifié comme un bottleneck : Gzip.
      • Si on a des clients récents :
        • De base LZ4.
        • Si le réseau est identifié comme un bottleneck : ZStandard.
      • Bien sûr, si on a un vrai besoin de fine tuner la performance, il faut faire des benchmarks avec chacun des algos dans notre contexte spécifique.

13 - Replication and Acknowledgements

  • Le système de réplication fonctionne par sequential consistency : un leader par partition envoie la donnée aux followers.
  • Plus le replication factor est élevé, et plus l'acknowledgement des records peut être ralenti à cause du fait qu'il faut attendre le follower le plus lent.
    • Pour répondre à ce problème, chaque leader maintient dans ZooKeeper une liste des In-Sync Replicas (ISR), c'est-à-dire les followers qui ne dépassent pas un retard temporel spécifique vis-à-vis des records du leader.
      • On peut régler un nombre minimal de followers dans l'ISR avec min.insync.replicas (par défaut 1, mais l'auteur conseille au moins 2, pour toujours avoir au moins une autre copie à jour).
        • En dessous de ce nombre se trouvant dans l'ISR, le leader arrête d'accepter la publication de records et attend qu'un nombre suffisant de followers redeviennent éligibles à l'ISR.
      • Le temps maximal de lag à partir duquel un follower est exclu de l'ISR est configuré avec replica.lag.time.max.ms (par défaut 10 secondes).
      • C'est les followers de l'ISR dont on attendra la confirmation pour une durabilité maximale, et non pas celle de l'ensemble des followers.
      • Le producer ne peut que dire s'il veut attendre l'acknowledgement de tous les followers (de l'ISR), du leader seulement, ou de personne. Il ne peut pas influer sur qui se trouve ou non dans l'ISR.
  • Seuls les réplicas de l'ISR sont éligibles pour devenir leaders de partition.
    • Sauf si on a mis la propriété unclean.leader.election à true.
  • Quelle que soit l'approche choisie, elle aura des désavantages plus ou moins grands :
    • Avec un faible min.insync.replicas on risque de ne plus avoir de réplicas à jour pour prendre la main au moment où le leader est en échec.
    • Avec un min.insync.replicas élevé proche ou égal au replication factor, on risque d'avoir des réplicas lents qu'on est obligés d'attendre.
    • Avec un plus grand replication factor (la propriété default.replication.factor), et potentiellement plus de brokers, on risque quand même d'être lent parce qu'on a plus de réplicas à mettre à jour.
  • On peut augmenter le replication factor de topics existants, mais ça nécessite de créer un fichier de config de réassignement sous forme JSON, avec l'ordre des réplicas qu'on préfèrerait pour chaque partition (pour le choix des nouveaux leaders d'une manière qui les répartit entre brokers).
    • Pour nous aider avec cette config, il y a l'outil kafka-reassign-tool sur GitHub.
    • La création d'un réplica supplémentaire demande à copier les partitions pour lesquelles on augmente le replication factor, donc ça peut prendre du temps et occuper le cluster.
  • Pour décommissionner un broker, il faut d'abord le vider de son rôle de leader pour toutes les partitions où il l'est.
    • On peut pour ça utiliser la même technique avec le fichier de config de réassignement, en indiquant pour toutes les partitions où il est leader, les IDs d'autres brokers.
  • Concernant l'acknowledgement.
    • Quand un producer choisit de ne pas en recevoir (acks = 0), il n'a plus de garantie de durabilité sur ce qu'il envoie (bien que la réplication se fasse comme d'habitude côté serveur), et il n'est plus non plus informé de l'offset des records qu'il publie (par retour de la méthode send() par exemple).
      • Ca peut par exemple être utile dans un cas de traitement de données de température qu'on affiche en direct : la perte de quelques données n'est pas très grave.
    • Quand un producer choisit d'en recevoir un quand seulement le leader a validé le record (acks = 1), en réalité il n'y a pas beaucoup plus de garantie qu'avec acks = 0.
      • Le leader peut échouer à effectivement écrire le record (il répond avant que l'écriture soit complète), ou il peut lui-même être en situation d'échec juste après l'acknowledgement, et avant d'avoir envoyé le record aux autres réplicas.
      • En fait ça revient à se demander si la machine du leader est considérée comme plus fiable que celle du producer pour ce qui est de décider si un record est publié ou pas.
      • De manière générale ce mode est surtout utile dans les cas où la perte de quelques données est tolérable, mais où le client a besoin de connaître l'offset du record qu'il vient d'écrire.
    • Quand un producer choisit de recevoir tous les acknowledgements (acks = -1 ou all), il a la garantie de durabilité maximale.
  • L'auteur conseille comme heuristique par défaut d'adopter -1 ou all pour la valeur de acks (au lieu de 1 par défaut), et au moins 2 pour min.insync.replicas (au lieu de 1 par défaut) avec un replication factor d'au moins 3.
    • Si on est dans des cas où la perte de données est tolérable, alors on pourra diminuer ces contraintes.

14 - Data Retention

  • Les données de chaque partition sont par défaut dans des dossiers de la forme /tmp/kafka-logs/getting-started-0/.
    • Le dossier contient un fichier nommé leader-epoch-checkpoint, qui contient toutes les réassignation de leader pour la partition. De cette manière, chaque réplica peut ignorer les messages d'un collègue broker qui se prendrait pour le leader de la partition sans l'être.
    • Le contenu des records se trouve dans fichiers nommés selon le 1er offset du record qu'ils ont, avec l'extension .log.
    • Chaque fichier de log a un index nommé de la même manière mais avec une extension .index. Il contient un map entre les offsets des records (ou des batchs) du fichier de log, et l'offset physique dans le fichier de log pour aller les lire.
    • On a enfin un autre fichier nommé pareil mais avec l'extension .timeindex, et qui contient un map entre des timestamps des records et l'offset physique dans le fichier de log.
  • Kafka a des propriétés configurables, liées à la taille des fichiers de log et à leur ancienneté, pour contrôler le moment où on switch au fichier suivant pour écrire.
    • Par exemple log.segment.bytes (par défaut 1 GB), log.roll.hours (par défaut 1 semaine).
    • On peut aussi configurer un temps aléatoire de décalage du switch, pour que l'ensemble des partitions ne changent pas de fichier de log en même temps.
  • Les fichiers d'index ont une place pré-allouée, dont la taille est contrôlable par une propriété.
    • On peut de la même manière activer la pré-allocation des fichiers de log, pour gagner en performance sur certains filesystems.
  • Le script kafka-dump-log.sh dans les outils d'admin de Kafka permet de lire le contenu des fichiers qui composent les logs.
  • Il existe des cleanup policies qui sont de deux types : supprimer les anciens records, ou faire de la compaction pour gagner en place.
    • log.cleanup.policy permet de contrôler le type de policy, cross-topic ou pour un topic spécifique.
      • Par défaut la valeur est delete, l'autre valeur étant compact. On peut spécifier les deux en même temps, en les séparant par une virgule.
    • Le cleanup ne s'applique qu'aux fichiers de log inactifs, c'est-à-dire les fichiers de log dont on a déjà switché vers un autre fichier.
    • Quand la policy est delete.
      • Un background process va régulièrement (toutes les log.retention.check.interval.ms, par défaut 5 minutes) vérifier pour chaque fichier de log inactif s'il est sujet à être supprimé ou non, en fonction des règles de rétention configurées (par exemple log.retention.bytes (non configuré par défaut), log.retention.hours (par défaut 1 semaine)).
      • Avec les valeurs par défaut, un fichier de log sera supprimé au bout d'1 semaine. Par contre, il sera supprimé d'un coup. Donc si on n'avait qu'un seul fichier qui n'avait pas atteint la taille d'1 GB pour switch de fichier avant les 1 semaine, on va perdre tous les records d'un coup, et écrire les nouveaux dans un nouveau fichier.
        • Si on veut une plus grande granularité, on peut configurer de plus petites valeurs pour pour le switch de fichier de log actif (log.segment.bytes ou log.roll.hours).
    • Quand la policy est compact.
      • La compaction est utile par exemple dans le cas où on a des events de type ECST (l'auteur ne mentionne pas le terme).
        • Normalement il faut une logique en deux temps : hydrater notre app downstream avec les données de l'app upstream, puis laisser l'app upstream publier ses changements sur Kafka.
        • Pour éviter d'avoir ce fonctionnement en deux temps, la compaction permet de publier dès le début les ECST dans Kafka, et de ne pas avoir besoin de l'autre mode puisque Kafka gardera toujours au moins le record le plus récent pour chaque entité.
        • Par contre ça ne marche qu'avec les events qui ont la totalité de la donnée de l'entité et qui donc “déprécient” les events précédents pour cette entité. Ça ne marche pas avec les events qui indiquent seulement les champs qui ont changé dans l'entité.
      • La compaction consiste à transformer Kafka en snapshot, où on ne garde que les données les plus récentes pour chaque entité, qu'on différencie par la key associée au record.
        • La lecture de l'ensemble du topic prendra donc un temps proportionnel au nombre de keys différents dont il existe des records.
      • D'un point de vue technique, la compaction est faite par des threads en arrière plan.
        • Côté config :
          • Leur nombre est contrôlé par log.cleaner.threads, par défaut 1.
          • log.cleaner.min.cleanable.ratio (par défaut 0.5) indique le ratio de log “sale“ à partir duquel il sera éligible à être compacté.
          • log.cleaner.min.compaction.lag.ms (par défaut 0) permet d'indiquer un temps minimal avant qu'un record ne puisse faire l'objet de compaction. Sachant que ça ne peut pas concerner le fichier de log actif, mais seulement ceux où il y a déjà eu un switch de fichier.
          • log.cleaner.min.compaction.lag.ms (par défaut infini) permet d'indiquer un temps maximal à partir duquel le log sera quand même compacté, même s'il ne satisfaisait pas le ratio de “saleté”.
          • log.cleaner.delete.retention.ms (par défaut 24 heures) indique la durée de vie des tombstones.
          • On peut aussi définir ces configs par topic (sauf pour le nombre de threads de compaction).
        • Pour calculer le ratio de “saleté”, Kafka maintient un cleaner point correspondant au point jusqu'où la compaction a déjà été faite, pour chaque fichier de log.
          • Le ratio consiste à diviser le nombre de records pas encore traités par le nombre de records existants dans la partie déjà traitée.
        • La compaction laisse les records dans le même ordre, et ne change pas leur offset. Elle va juste éliminer des records.
        • Les tombstones sont créés par les producers pour indiquer à Kafka que les entités d'une key particulière ne sont plus utiles.
          • Ce sont simplement des records, avec une valeur nulle, et la key pour laquelle on veut faire la suppression.
          • La raison pour laquelle ils restent un temps minimal (par défaut 24h) est de s'assurer que les consumers ont eu le temps d'avoir l'info de suppression du record, pour éviter qu'ils gardent l'entité en base alors qu'elle n'est plus censée exister.
    • On peut aussi combiner compaction et deletion.
      • Cette possibilité est utile dans des cas particuliers où les events perdent rapidement leur intérêt.
        • On peut alors potentiellement avoir une compaction plus agressive vu qu'on limite la taille des records en supprimant les plus anciens.
      • Un exemple peut être le topic __consumer_offsets qui compacte pour que le group coordinator puisse rapidement reconstruire l'état des consumers, et supprime les anciens offsets pour les groupes qui n'ont pas été actifs depuis longtemps pour éviter de trop grossir.

15 - Group Membership and Partition Assignment

  • Les consumer groups permettent de faire du load balancing au niveau de la consommation.
    • Kafka garantit qu'il y aura au plus un consumer d'un même groupe par partition.
      • “au plus” pour prendre en compte le cas où aucun consumer ne serait disponible dans le groupe.
    • L'assignation des consumers se passe en deux temps :
      • 1 - La phase group membership.
        • Il s'agit d'identifier les consumers d'un groupe, et d'élire un group leader parmi eux, pour que celui-ci décide des assignations partition / consumer.
          • 1 - Les consumers envoient un message au broker qui est coordinator pour ce group, pour s'identifier comme membres de ce groupe.
            • Ils savent qui est leur coordinator parce que son id leur est renvoyé par un des brokers, qui lui même peut le savoir par un mécanisme déterministe de hachage entre le group id et une des partitions : le broker leader de cette partition devient le coordinator du group.
          • 2 - Le coordinator attend un certain temps avant de répondre, pour que tous les consumers aient pu s'identifier comme membres du groupe, et pour éviter les nombreux rebalancings au début.
            • Le délai est appliqué quand le groupe est vide.
            • Le délai est contrôlable avec group.initial.rebalance.delay.ms (par défaut 3 secondes).
            • C'est typiquement inutile dans les scénarios où il n'y a qu'un consumer, comme dans des tests d'intégration par exemple où on peut le mettre à 0.
          • 3 - Il renvoie une réponse à chacun, contenant les IDs des consumers du groupe et l'ID du consumer leader.
      • 2 - La phase state synchronisation.
        • 1 - Le group leader va faire l'assignation des partitions aux consumers dont il a reçu la liste, et envoyer ça au coordinator.
        • 2 - Le coordinator à son tour renvoie les assignations à chaque consumer.
    • A chaque fois qu'un consumer rejoint un groupe existant, le coordinator oblige les autres consumers à se réidentifier, et se voir potentiellement réassigner des partitions (on appelle ça le rebalancing).
      • Pendant le rebalancing, les consumers se verront refuser toutes leurs opérations (y compris heartbeats) par une réponse REBALANCE_IN_PROGRESS.
      • A chaque rebalancing, le coordinator va assigner à chaque consumer un id qui est incrémenté monotoniquement. Donc un consumer zombie qui aurait oublié de se réidentifier serait rejeté la prochaine fois qu'il voudrait consommer.
      • Le client met à disposition la possibilité d'enregistrer des callbacks sur les events d'un rebalancing :
        • onPartitionsRevoked() est appelé dès que la consommation doit s'arrêter pour que le rebalancing puisse avoir lieu.
        • onPartitionsAssigned() indique au client les éventuelles nouvelles partitions qui lui ont été assignées.
        • onPartitionLost() indique d'éventuelles partitions perdues par le consumer.
          • Ça peut se produire si le consumer n'avait pas émis de heartbeats et était considéré en échec.
      • Le rebalancing par défaut (eager rebalancing) se fait en une étape.
        • Il implique donc que les consumers doivent à chaque fois partir du principe que l'ensemble des assignations de partition sont potentiellement révoquées et cleaner les messages en cours de traitement.
        • L'incremental cooperative rebalancing permet d'éviter ça en plaçant les assignations à la fin, en utilisant éventuellement plusieurs étapes :
          • Une seule étape s'il n'y a que de nouvelles assignations de partitions.
          • S'il y a aussi des révocations : une première étape de révocations, et une deuxième étape d'assignations.
        • Pour que l'incremental cooperative rebalancing soit plus efficace, et contrebalance le fait qu'il nécessite plus d'appels réseau, il faut que la stratégie d'assignation de partition soit sticky (c'est-à-dire qu'on essaye au maximum de garder les assignations qui existent pendant le rebalancing).
  • Dans les systèmes distribués, il y a deux propriétés importantes : la liveness qui est le fait qu'un système continue d'opérer et de progresser dans ses tâches, et la safety qui est le fait que les invariants du système soient préservés.
    • Kafka satisfait la liveness par :
      • Les checks réguliers d'availability des consumers par le système de heartbeats à envoyer avant un timeout.
      • La vérification que les consumers progressent, en s'assurant qu'ils appellent régulièrement poll() avant de dépasser un timeout.
    • Plus les valeurs des deux timeouts sont petites et plus le client détectera vite les échecs, mais au prix de plus de consommation de ressources et de plus de faux positifs.
      • Globalement l'auteur trouve ces valeurs par défaut raisonnables dans la plupart des cas.
    • Dans le cas où on doit retenter une requête vers un composant externe (DB, broker etc) qui échoue plusieurs fois, on risque d'échouer nous-mêmes à respecter le timeout prouvant qu'on progresse (max.poll.interval.ms), on alors 5 possibilités :
      • 1 - Mettre une très grande valeur à max.poll.interval.ms, pour “désactiver” le timeout.
        • Il s'agit d'un cas où on veut que l'ordre soit absolument respecté, et que les actions pour chaque record soient absolument réalisées, au prix d'une potentielle attente jusqu'à ce que la ressource externe réponde correctement.
        • Le problème c'est qu'on ne prend pas en compte qu'on pourrait avoir un problème en interne, notamment des bugs dans le consumer lui-même, et que notre timeout nous protégeait aussi de ça.
      • 2 - Mettre une valeur raisonnable pour le timeout. Dans ce cas, tant que le service externe est down, le consumer va recommencer jusqu'au timeout, et être rebalancé (exclu puis réintégré).
        • C'est le même comportement que le 1- où on veut faire les records dans l'ordre coûte que coûte, mais là on règle les éventuels problèmes de consumer bloqué.
        • Il y a par contre un risque de perdre en performance à force d'enchaîner les rebalancings.
      • 3 - Détecter nous mêmes dans le consumer le fait qu'on va bientôt dépasser le timeout, et se déconnecter après avoir nettoyé ses tâches en cours, pour se reconnecter tout de suite après.
        • En fait, vu qu'on se déconnecte/reconnecte, on va entraîner un rebalancing de fait.
        • Le petit avantage par rapport à la 2- c'est qu'on va pouvoir faire des checks supplémentaires localement sur le fait de ne processer le record qu'une fois.
      • 4 - Mettre en place une deadline par record, et si la deadline est dépassée, considérer qu'il a été traité en passant au suivant, mais le republier dans le topic pour qu'il soit retraité plus tard.
        • Cette solution implique que l'ordre de traitement des records n'est pas essentiel.
        • On pourrait aussi avoir un temps maximal ou un nombre de retries maximal dans le record, indiquant combien de temps ou de fois il faut continuer à essayer de le republier avant que ça ne serve plus à rien, dans le cas où il devient obsolète avec le temps.
        • Les consumer groups étant indépendants et pouvant lire dans un même topic, requeuer un message derrière le topic juste parce qu'un consumer group n'a pas pu le traiter à temps n'est pas vraiment ce qu'on veut.
          • Au lieu de ça, on peut avoir une topologie de type fanout, c'est-à-dire un topic qui publie dans un fanout group, qui lui-même publie dans un topic par consumer group. Et dans ce deuxième niveau on pourra requeuer un message non géré par un consumer groupe spécifique.
      • 5 - Mettre en place une deadline comme dans le 4-, mais sans requeuer le record du tout.
        • Il peut être intéressant, tout comme pour le 4-, de penser à mettre les records non traités dans une dead letter queue pour pouvoir investiguer la lenteur plus tard.
    • Malgré les garanties apportées par Kafka, il est possible que deux consumers traitent le même record.
      • Ça arrive dans le cas suivant :
        • 1 - Un consumer met beaucoup de temps à traiter un record, et dépasse le timeout pour appeler poll().
        • 2 - Il se fait exclure parce que son thread responsable des heartbeats n'en émet plus et demande même explicitement à être révoqué.
        • 3 - Le coordinator révoque le consumer et fait un rebalancing pour assigner sa partition à un autre consumer.
        • 4 - Le nouveau consumer commence à traiter les records non commités.
        • 5 - pendant ce temps, le consumer révoqué continue de traiter son record en cours, sans savoir qu'il a été arrêté.
      • Pour éviter ça, Kafka ne propose pas grand chose. L'auteur propose 3 approches à faire soi-même :
        • 1 - Se débrouiller pour que le consumer ne dépasse jamais le timeout, et que sinon on gère les conséquences à la main pour ne pas avoir de rebalancing.
        • 2 - Utiliser un distributed lock manager (DLM) pour protéger les sections critiques d'être traitées en même temps par deux consumers.
          • La protection agit sur le fait de traiter en même temps, pas le fait de traiter plusieurs fois en général.
            • Ceci dit, on peut du coup vérifier qu'on n'a pas déjà traité la section critique du record avant de la traiter à nouveau.
            • Attention à ne pas être tenté de faire l'optimisation de faire déconnecter/reconnecter le consumer dans le cas où on remarque que le record a déjà été traité : le consumer qui l'a traité n'a peut-être pas encore commité, et donc on risquerait de le retraiter à nouveau.
          • L'impact du DLM sur le throughput et la latence peuvent être minimisés en regroupant les records du buffer, par exemple par partition, et en faisant le lock avant et après le traitement de chacun de ces lots.
          • A la place du DLM on pourrait aussi avoir n'importe quel store persistant, comme Redis ou une DB.
        • 3 - Utiliser un process qui vérifie régulièrement le process qui tourne pour consommer les records, pour s'assurer qu'il consomme bien régulièrement.
          • S'il est bloqué depuis un certain temps, le process vérificateur le restart avant que le timeout côté Kafka soit déclenché.
    • L'idée des static members va très bien avec le fait d'avoir un système de health check externe à Kafka : par exemple Kubernetes qui s'assurerait de détecter les consumers en échec, et de les arrêter puis restarter.
      • Côté liveness, ça peut permettre d'éviter des rebalancings de la part de Kafka, et donc d'avoir un throughput plus important, au prix de certaines partitions spécifiques qui n'avancent plus pendant un temps plus long qu'en mode non static.
      • Côté safety, Kubernetes peut jouer le rôle d'orchestrateur pour s'assurer que les partitions ne sont pas traitées par plusieurs consumers en même temps.
  • La raison pour laquelle l'assignation des partitions se passe dans un consumer leader, c'est de permettre le changement de stratégie d'assignation pour chaque consumer, plutôt que quelque chose de commun en tant que config Kafka.
    • Il existe 4 assignors disponibles pour choisir ces stratégies :
      • Le range assignor est l'assignor par défaut, il consiste à classer les partitions pour un même topic et les consumers dans l'ordre du plus petit au plus grand, et ensuite d'attribuer des groupes de partitions de part égales aux consumers successifs.
        • Et on recommence la même chose pour chaque topic.
        • Si le nombre de partitions n'est pas divisible par le nombre de consumers, les premiers consumers se verront attribuer une partition de plus.
        • Son désavantage c'est qu'il assigne les partitions équitablement par topic, mais si on prend en compte l'ensemble des partitions existantes dont les consumers doivent s'occuper, on peut tomber sur une répartition assez inégale.
          • Ça se produit en particulier quand le nombre de partition par topic est plus petit que le nombre de consumers : les premiers consumers reçoivent une partition de chaque topic, alors que les derniers n'en reçoivent pas.
      • Le round robin assignor consiste à rassembler les partitions de tous les topics, puis de les attribuer un par un, dans l'ordre, aux consumers, en rebouclant sur la liste de consumers s'il y a plus de partitions que de consumers.
        • La répartition est bien meilleure que pour le range assignor, puisqu'elle est cross-topic.
      • Le sticky assignor consiste à assigner de manière à peu près équilibrée, mais surtout s'évertue à préserver le plus possible les assignations déjà faites, quand il faut faire une réassignation.
        • La répartition est de la même qualité que pour le round robin assignor, mais celui-ci minimise le nombre de partitions changées de main pendant un rebalancing.
      • Le cooperative sticky assignor consiste à faire la même chose que le sticky assignor, mais en utilisant le cooperative rebalancing protocol qui permet de réduire les pauses de rebalancing.
        • Les consumers ne sont plus obligés de se préparer à la révocation de toutes leurs partitions à chaque rebalancing. Ils savent lesquelles seront révoquées, et ensuite lesquelles leur seront assignées.
    • Pour changer la stratégie d'assignation, on ne peut pas simplement mettre à jour la propriété de config qui le fait (partition.assignment.strategy).
      • Le premier consumer qui sortirait du groupe pour y revenir avec la nouvelle stratégie provoquerait un problème d'inconsistance de stratégie au niveau de ce groupe.
      • On assigne d'abord l'ancienne stratégie et la nouvelle, puis on supprime l'ancienne. Au final on aura eu 2 bounces.

16 - Security

  • Kafka n'est pas configuré par défaut pour fonctionner de manière sécurisée.
    • Par défaut, n'importe quel client peut se connecter, y compris à ZooKeeper.
    • Les connexions ne sont pas chiffrées.
    • Même une fois l'authentification mise en place, les autorisations sont maximales.
  • La première sécurité est le blocage au niveau réseau avec un firewall.
    • L'auteur propose une topologie réseau en 4 blocs séparés par des firewalls :
      • Le bloc ZooKeeper, accédé uniquement par les brokers.
      • Le bloc Kafka brokers, accédé par les clients.
      • Le bloc clients : consumers, publishers, admin clients.
      • Le bloc externe qui passe par internet, et peut contenir un site distant, ou encore des télétravailleurs accédant via VPN.
  • Ensuite il faut activer le chiffrement TLS supporté par Kafka.
    • Chaque broker a besoin d'une clé RSA et d'un certificat CA, correspondant à son hostname.
    • La configuration SSL/TLS peut se faire dans server.properties.
      • Il faut notamment utiliser le protocole existant SSL sur un port différent de PLAINTEXT, à la fois pour listeners et advertised.listeners.
      • Pour activer le chiffrement pour les communications inter-broker, on peut ajouter inter.broker.listener.name=SSL.
    • Il suffit ensuite de connecter le client sur le bon port de Kafka, en indiquant les bonnes creds et le protocole utilisé.
    • Une fois qu'on s'est assuré que la connexion chiffrée fonctionne, l'auteur recommande de désactiver le socket non chiffré dans le serveur, en enlevant la version qui utilise PLAINTEXT dans listeners et advertised.listeners.
      • On peut même configurer le firewall pour interdire les connexions sur le port 9092.
    • Kafka n'a pas de mécanisme de chiffrement de la donnée elle-même, par défaut elle sera stockée en clair sur le filesystem des brokers.
      • Une des possibilités peut être d'utiliser un chiffrement au niveau du filesystem ou du disque dur entier des machines des brokers.
      • La méthode la plus sûre est de recourir à du chiffrement de bout en bout de la donnée, en chiffrant dans le publisher, et déchiffrant dans le consumer.
        • Il existe plusieurs projets open source qui permettent de le faire. Par exemple, le projet Kafka Encryption.
        • Si on publie des messages déjà chiffrés, il devient inutile d'activer la compression dans Kafka, puisque l'entropie des messages sera alors maximale.
        • Le chiffrement de bout en bout ne rend pas l'utilisation de TLS inutile.
          • TLS protège l'ensemble du record, y compris les headers par exemple.
          • Il protège contre les attaques man in the middle, en assurant l'identité de l'émetteur.
  • Kafka supporte plusieurs types d'authentification.
    • Le mutual TLS permet d'utiliser le mécanisme TLS habituellement utilisé pour que le client fasse confiance au serveur aussi dans l'autre sens : le client lui aussi envoie un certificat signé par un CA auquel le serveur fait confiance.
      • Côté serveur, on peut activer la fonctionnalité avec la propriété ssl.client.auth :
        • Elle vaut none par défaut.
        • required permet de forcer les clients à fournir un certificat valide s'ils veulent se connecter.
        • Il y a une 3ème option utile pour faire une migration progressive : requested permet d'accepter l'authentification par ce moyen, mais sans le rendre obligatoire le temps que tous les clients aient été migrés.
      • Côté client, il faut obtenir un certificat certifié par un CA valide du point de vue du serveur, et se connecter avec les propriétés de config qui sont le miroir de celles qu'utilise le serveur pour lui-même configurer TLS.
      • Pour obtenir l'information sur l'identité du client qui se connecte, c'est par défaut le champ CN du certificat qui sera utilisé.
        • La propriété ssl.principal.mapping.rules permet de personnaliser le champ à prendre par des règles de type regex.
        • Par contre, la fiabilité de la méthode pour déterminer l'identité dépend de la rigueur avec laquelle les certificats sont établis :
          • Si un des clients peut mettre l'identité d'un autre client dans le champ CN d'un certificat qu'il fait générer par l'autorité de confiance, alors il pourra se faire passer pour l'autre client.
        • Un autre problème aussi c'est que Kafka ne permet pas de révoquer un certificat pour un client particulier.
          • Le mieux qu'on puisse faire c'est de déployer un nouveau CA, et de faire signer tous les certificats des clients par ce CA.
          • Il faut aussi penser à utiliser des CA différents si on a plusieurs clusters Kafka.
      • L'authentification mutual TLS ne peut pas être utilisée en même temps que d'autres types d'authentification au niveau applicatif, même si la mutual TLS se trouve dans une couche réseau différente.
    • SASL (Simple Authentication and Security Layer) consiste à ajouter une méthode d'authentification à un protocole utilisateur.
      • Il est en général utilisé avec du TLS.
      • L'une des variantes supportées est GSSAPI (Generic Security Service API), aussi connu sous le nom de son implémentation principale qui est Kerberos.
        • Cet outil va avec l'usage de répertoires centralisés type Active Directory.
        • Il est surtout adapté aux utilisateurs individuels, mais Kafka a besoin d'avoir une authentification plutôt orientée autour de service accounts, parce qu'on ne peut pas démarrer et arrêter un client à chaque interaction utilisateur.
          • Le problème avec l'approche service accounts c'est que Kerberos n'étant pas forcément compatible avec toutes les ressources (par exemple Redis), on va pouvoir désactiver un account mais sans être sûr que l'ensemble des ressources le sont pour cet account.
        • L'auteur trouve que Kerberos est un système complexe pour ce qu'il apporte, et conseille plutôt les autres méthodes SASL.
      • Les autres variantes SASL supportées par Kafka sont PLAIN et SCRAM.
        • PLAIN est le diminutif de plaintext, pour dire que le user et mot de passe sont transmis en clair.
        • SCRAM est l'acronyme de Salted Challenge Response Authentication Mechanism, et il a la particularité de ne pas impliquer d'envoyer les credentials directement au serveur. Il apporte donc une meilleure sécurité.
        • Comme dit plus haut, l'authentification SASL n'est pas compatible avec l'authentification SSL côté client (avec le client qui fournit un certificat signé par un CA de confiance) : Kafka ne saurait pas quoi prendre comme identifiant entre le username dans SASL et le champ CN du certificat.
          • Si on fournit les deux, la configuration pour l'authentification SSL côté client ne sera pas prise en compte.
        • Avec SCRAM, les versions hashées des credentials valides sont stockées dans ZooKeeper, par exemple avec le script kafka-config.sh.
          • PLAIN quant à lui les stocke en clair dans server.properties.
        • On peut aussi utiliser SASL pour la communication inter-broker au lieu de juste SSL.
          • Attention à bien protéger le fichier server.properties qui va du coup contenir le username et password en clair, par exemple avec un petit chmod 600.
          • Une autre solution peut être de créer un fichier dans config/ qu'on protège, et d'y mettre la config jaas. Il faudra alors passer ce fichier dans l'option CLI java.security.auth.login.config au moment de démarrer le broker.
        • On peut à chaque fois utiliser netstat -an | egrep "9092|9093|9094" pour vérifier sur quels ports il y a Kafka en écoute et sur quels ports il y a une connexion établie.
      • La version de SASL avec OAuth bearer est là seulement dans un objectif de testing. Elle n'est pas sécurisée.
        • L'application peut spécifier un user arbitraire dans un token JWT.
        • On peut utiliser une implémentation open source comme Kafka OAuth.
      • Les delegation tokens sont un mécanisme complémentaire à SASL, permettant de faciliter la gestion des credentials sur un grand nombre de brokers.
        • Pour Kerberos il va s'agir de remplacer le déploiement des TGT ou keytab, et pour PLAIN et SCRAM ça va être les user / password.
        • Les delegation tokens sont limités dans le temps et donc permettent de ne pas compromettre les vrais credentials.
        • C'est particulièrement pratique dans le cas où les brokers sont créés de manière éphémère dans des workers.
        • Il faut faire la configuration côté broker en mettant en place delegation.token.master.key à la même valeur pour tous les brokers du cluster.
          • On crée ensuite les tokens avec une commande CLI kafka-delegation-tokens.sh.
          • Le token doit être renouvelé avant la période d'expiration (par défaut 1 jour) avec le même fichier de commande CLI.
        • Côté client, il faut indiquer qu'on utilise l'authentification avec delegation token, et indiquer les valeurs de TOKENID et HMAC qu'on a pu récupérer au moment de créer le token sur le broker.
    • On peut configurer une authentification sur ZooKeeper, en utilisant SASL.
      • Ça concerne du coup les brokers et les clients admin.
      • ZooKeeper a en plus un mécanisme d'autorisation à base d'ACL, permettant d'attribuer 5 types de droits aux utilisateurs anonymes et utilisateurs authentifiés par SASL.
      • De l'aveu de l'auteur, ajouter une authentification à un ZooKeeper qu'on a déjà isolé dans un réseau à part peut être excessif. Le niveau de sécurité dont on a besoin dépendra du contexte.
      • Pour activer l'authentification, il faut modifier zookeeper.properties et y ajouter la config pour activer SASL.
        • Il ne faut pas oublier de changer les autorisations qui existent pour les utilisateurs anonymes avec la commande CLI zookeeper-security-migration.sh.
      • Il faut ensuite activer l'authentification sécurisée à ZooKeeper depuis le broker, en ajoutant la propriété zookeeper.set.acl=true, et en redémarrant le broker avec l'option java.security.auth.login.config pointant vers le fichier de config contenant les identifiants SASL.
    • Les clients admin, que ce soit en script CLI ou de type Kafdrop, doivent aussi être configurés pour se connecter aux brokers qui ont une authentification activée.
      • Pour les scripts CLI, on peut changer le fichier client.properties.
      • Pour Kafdrop, il faut modifier le fichier kafka.properties.
  • Concernant l'autorisation.
    • Kafka a un système d'autorisations sous forme d'ACL centrées sur les ressources.
      • Il s'agit d'un système distinct de celui de ZooKeeper.
    • On a la possibilité d'autoriser des droits :
      • Par utilisateur.
      • Par host.
      • Pour un type de ressource particulier (par exemple Topic ou Group).
      • Pour un pattern spécifique à appliquer aux ressources (par exemple toutes les ressources commençant par un préfix, ou avec des * pour dire qu'on peut avoir n'importe quoi dans une partie du nom).
      • Pour une opération particulière (par exemple Read, Write, Describe etc.).
    • Pour commencer, il faut activer l'autorisation dans le serveur en ajoutant la classe d'autorisation et la liste des super users dans server.properties :
      authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
      super.users=User:admin
      • Les super users sont séparés par des point-virgules.
      • A partir de là, seuls les super users auront la possibilité de faire des choses. Tous les autres seront bloqués.
    • Parmi les scripts CLI, kafka-acls.sh permet de visualiser et configurer les ACLs.
    • La bonne pratique c'est d'assigner des utilisateurs distincts à chaque application (publisher, consumer etc.).
      • La même chose vaut pour le user utilisé pour l'inter-broker communication : il vaut mieux lui donner les bons droits plutôt que le mettre en super admin.
    • Kafka donne la possibilité d'autoriser ou interdire pour une règle d'autorisation donnée.
      • Les règles d'interdiction prennent toujours le pas sur les règles d'autorisation, quelles que soient les granularités.
      • On peut grâce à ça, par exemple :
        • Partir du fait que le défaut dans Kafka c'est que tout est interdit pour tout le monde.
        • Puis mettre une règle qui autorise un droit de lecture sur un topic pour tous les utilisateurs (en utilisant le wildcard “*”).
        • Et enfin mettre des règles interdisant ce droit d'écriture pour certains utilisateurs particuliers (par exemple un utilisateur guest dont on donne les identifiants à ceux qui veulent essayer).
    • Il y a des moyens de filtrer plus ou moins de choses quand on liste des droits avec le script CLI kafka-acls.sh :
      • On a la possibilité de lister tous les droits qui pourraient s'appliquer au nom qu'on indique (parce qu'ils comportent des règles de wildcard ou de préfix etc.), grâce à l'option --resource-pattern-type=any.
      • Ou alors lister seulement les droits qui portent sur le nom exact qu'on indique, grâce à l'option --resource-pattern-type=match.
    • Kafka permet de créer des règles d'autorisation ou d'interdiction pour les clients basé sur leurs adresses IP.
      • L'auteur déconseille cette fonctionnalité, étant donné la nature mouvante des topologies de client dans le cloud.
      • Il conseille à la limite d'utiliser le firewall pour faire ce genre de restrictions.
    • Voilà quelques scénarios d'autorisation habituels qu'on met en place :
      • Créer des topics : l'opération Create qu'on attribue pour les topics commençant par un préfixe.
      • Supprimer des topics : l'opération Delete sur les topics avec le même préfixe.
      • Publier dans un topic : l'opération Write ou IdempotentWrite (pour que ça marche avec la publication en mode idempotent), qu'on attribue pour les topics commençant par un préfixe.
      • Consommer depuis un topic :
        • Pour un consumer sans groupe, il faut l'opération Read sur le topic. En général on met le topic exact pour éviter d'augmenter l'exposition des données.
        • Si le consumer fait partie d'un groupe, alors il faudra aussi l'opération Read sur le groupe.

17 - Quotas

  • Les quotas servent à :
    • Empêcher les attaques DOS en faisant du throttling.
    • Aider à planifier la capacité de la machine pour assurer une bonne qualité de service.
      • En particulier quand on commence à avoir suffisamment de clients Kafka pour que les quelques brokers initialement nécessaires commencent à manquer de ressources.
  • Les quotas s'appliquent aux utilisateurs au niveau de chaque broker.
    • Ca veut dire qu'il faut prendre en compte le nombre de brokers, et potentiellement revoir les quotas quand on ajoute des brokers.
  • Il existe deux types de quotas :
    • 1 - Network bandwidth quotas.
      • Vérifie que les producers et consumers ne dépassent pas une certaine quantité de données transférées (en bytes / seconde).
        • Ca permet d'englober de nombreux aspects : bande passante réseau, ressources I/O, ressources mémoire à cause du buffering, ressources CPU dans le cas du chiffrement TLS.
      • Le broker calcule l'utilisation de la bande passante de chaque client par fenêtre glissante.
        • Quand il y a un dépassement, le broker va introduire artificiellement un délai avant de répondre. Le client ne saura donc pas s'il a subi une restriction ou si c'est juste des lenteurs réseau.
    • 2 - Request rate quotas.
      • Vérifie que les producers et consumers n'utilisent pas plus d'un certain pourcentage de CPU d'un thread I/O.
        • 50% correspond à la moitié de l'utilisation du thread I/O, 200% correspond à l'utilisation pleine de 2 threads.
      • Le network bandwidth quota couvre déjà une grande partie des cas. Request rate quotas vient le compléter dans les cas où un client a fait un mauvais réglage qui l'amène à faire un très grand nombre de requêtes vers le serveur, sans qu'il n'y ait forcément beaucoup de données dans ces requêtes.
        • Ca peut être par exemple si un consumer a configuré une valeur de fetch.max.wait.ms très basse, le poussant à faire des requêtes très régulières pour demander plus de records.
        • Comme autre cas de mauvaise configuration, ça peut aussi être de nombreuses requêtes qui aboutissent à “unauthorized”, ou encore une configuration différente de la compression entre client et serveur, aboutissant à une sur-utilisation du CPU inutile.
      • Ce mode de quota fonctionne aussi par fenêtre glissante, et ajoute aussi des pénalités d'attente silencieuses en cas de dépassement.
  • Les quotas sont attribués aux usernames et aux client IDs.
    • Les usernames sont ceux qui sont utilisés et vérifiés par Kafka par les mécanismes d'authentification (champ CN du certificat en cas d'authentification par mutual TLS, et champ username en cas d'authentification SASL) et d'autorisation.
    • Les client IDs sont les identifiants qu'un client déclare librement au moment de se connecter au serveur, avec le champ client.id.
    • On utilise souvent une combinaison des deux : le username pour l'authentification, et le client ID pour distinguer plusieurs machines appartenant à la même personne ou au même groupe de personnes.
  • L'attribution se fait via configuration dynamique, via le script CLI kafka-configs.sh ou un autre client admin.
    • Il est possible de spécifier des quotas pour un couple username / client ID, sachant que chaque membre du couple de valeurs peut avoir soit une valeur, soit la valeur <default>, soit ne pas avoir de valeur.
      • Le fait de savoir quelle règle de quota va s'appliquer se fait par matching parmi les règles existantes, avec une priorité aux règles les plus précises.
    • En fonction de la règle de quota qui est retenue pour chaque consumer, si deux consumers partagent la même règle, ils partageront aussi la valeur du quota.
    • D'un point de vue sécurité, l'auteur conseille de spécifier d'abord des valeurs par défaut qui sont très basses (en commençant par le couple username / client ID : <default> / <default>), et ensuite de les écraser par des règles plus spécifiques ayant des quotas plus larges.
  • La propriété buffer.memory (par défaut 32 Mo) côté client permet de le bloquer quand le buffer dépasse cette taille, ce qui peut permettre d'éviter le throttling côté serveur.
  • Le fait que le client ne sache pas s'il fait l'objet de pénalités d'attente ou s'il y a simplement de la congestion sur le réseau, peut poser problème dans certains cas.
    • Il peut bombarder de requêtes et finir par subir une attente si longue qu'elle dépasserait le delivery timeout. Il pourrait alors avoir tendance à réessayer plusieurs fois, menant à une forme de congestive collapse.
      • En général on peut résoudre ce problème en diminuant la propriété buffer.memory (par défaut 32 Mo) côté client pour obliger le client à attendre avant de publier plus que ce qu'il a en buffer.
    • Parfois on se trouve dans un cas où le client veut publier beaucoup de messages, et parmi eux la plupart des messages sans urgence particulière, et certains messages urgents dont il ne veut pas qu'ils fassent l'objet de ralentissement.
      • Dans ce cas, il est obligé d'essayer de deviner (par des moyens probabilistes) s'il fait l'objet de pénalités liées au quotas ou pas, pour éviter d'envoyer les autres messages le temps d'envoyer les messages urgents.
        • 1 - Il peut noter le nombre records envoyés mais pas encore acknowledgés par le broker : normalement ce chiffre devrait augmenter en cas de throttling, et diminuer pour atteindre presque 0 dans le cas contraire.
        • 2 - Il peut noter le timestamp du dernier record, et le comparer au temps actuel : s'il y a une différence importante, il est possible qu'il y ait eu du throttling.
  • La méthode de fenêtres glissantes qui calcule s'il faut appliquer des pénalités d'attente est configurable.
    • Le calcul se fait sur N samples d'une durée de S secondes, qui se renouvellent sample par sample.
      • N est configurable par quota.window.num (par défaut 11 samples).
      • S est configurable par quota.window.size.seconds (par défaut 1 seconde).
    • Une fois qu'un client a dépassé le quota dans la fenêtre de samples, il pourra à chaque sample de temps publier une quantité minimale, jusqu'à ce que sa consommation totale sur la fenêtre redescende en dessous de sa limite de quota.
      • Ça implique qu'un client qui publie à fond produise des pics tous les N samples, suivis de très faibles quantités publiées.
    • A propos de la stratégie de tuning de ces règles :
      • Plus on va augmenter quota.window.num, et plus le pic ponctuel pourra être élevé avant de subir une pénalité.
        • L'auteur conseille d'éventuellement modifier ce paramètre en conséquence.
      • Plus on va augmenter quota.window.size.seconds, et plus le temps d'attente de pénalité sera long.
        • L'auteur conseille de ne pas y toucher et de le laisser au minimum, c'est à dire 1 seconde.
    • Attention cependant, ce comportement non uniforme qui provoque des pics n'est pas documenté, et pourrait être modifié sans avoir besoin d'un process long.

18 - Transactions

  • Les transactions permettent de réaliser des exactly-once deliveries à travers une pipeline de plusieurs jobs (qu'on appelle stages) chaînés via des topics Kafka successifs.
    • Ils y arrivent parce qu'ils permettent de réaliser l'idempotence à travers plusieurs stages, et qu'en combinant ça avec l'at-least-one delivery, on obtient l'exactly-one delivery.
  • La problématique est la suivante :
    • On part d'un cas où on a un stage qui a besoin de consommer un topic Kafka, et pour chaque record consommé, publier un record dans un autre topic Kafka.
      • On ne s'intéresse pas ici à d'autres side-effects comme l'écriture en DB pour laquelle les transactions Kafka ne peuvent rien, mais bien seulement aux messages Kafka publiés et consommés.
    • Les problèmes suivants peuvent se produire :
      • Des erreurs réseau et des crashs du serveur, pour lesquelles on n'a pas besoin des transactions.
        • Le consumer peut les gérer grâce au mécanisme de retries tant qu'il n'a pas fait le commit d'offset.
        • Le producer peut les gérer grâce au mécanisme de retries tant qu'il n'a pas reçu d'acknowledgement, et au mécanisme d'idempotence qui garantit l'ordre et la déduplication.
      • Pour les crashs du process client on a un point faible : le cas où le client a déjà commencé à exécuter la callback du record, et est arrivé jusqu'à publier le record sortant, mais n'a pas encore fait le commit de son offset en tant que consumer.
        • S'il crash à ce moment-là, la prochaine fois qu'il se réveille il va traiter le même record entrant, et va publier encore le record qu'il avait déjà publié.
        • On a donc un risque de publier le message sortant plusieurs fois, sans que la publication avec l'option d'idempotence ne puisse rien y faire, puisqu'il ne s'agit pas de retries d'un même message.
    • Le même problème peut se généraliser avec la publication de plusieurs messages qui doivent tous n'être publiés qu'une fois par le stage.
  • Alors que Kafka permet de base une bonne durability (notamment grâce à la réplication des données dans chaque broker), avec le mécanisme de transactions il se voit doté d'autres caractéristiques d'ACID :
    • Atomicity : l'ensemble des messages publiés dans une même transaction sont soit tous validés, soit tous non validés, y compris dans des topics et partitions différents.
    • Consistency : on ne se retrouve pas dans un demi-état, soit tous les records sont validés, soit aucun.
    • Isolation : les transactions faites en parallèle ont le même résultat que si elles étaient faites les unes après les autres.
  • D'un point de vue performance, on n'a que 3 à 5% de diminution du throughput quand on utilise les transactions.
  • Pour ce qui est du fonctionnement détaillé.
    • Les transaction coordinators tournent sur les brokers.
      • Ils ont pour rôle :
        • 1 - d'assigner un ID à chaque producer (Producer ID, ou PID) qui en fait la demande.
        • 2 - gérer le statut des transactions dans un topic caché de Kafka (dont le nom est __transaction_state).
      • Pour que le système de transactions fonctionne, il faut que le PID du producer reste le même entre deux records consommés.
        • Et pour ça, il faut que le producer du stage suivant déclare le même transactional.id que le précédent.
          • Ce qui a pour effet que le transaction coordinator va assigner le même PID, tant que le délai transactional.id.expiration (par défaut 1 semaine) n'est pas dépassé.
        • L'association [ transactional ID, PID ] contient une propriété epoch qui indique la date de la dernière mise à jour de cette association.
          • Ce mécanisme permet de bloquer les process client zombies, c'est-à-dire qui ont été éjectés, mais qui continuent de penser que c'est à eux de publier : si leur epoch est plus ancien, ils ne pourront pas publier.
        • Garder le même PID permet aussi au producteur successeur de terminer les transactions non terminées du producer qui vient de crash ou timeout.
    • L'essentiel de l'aspect transactionnel se passe côté API du producer.
      • Le client producer Java a ces méthodes :
        • initTransactions() permet d'initialiser le système de transactions pour un producer donné.
          • On ne l'appelle qu'une fois, et ça assigne un PID et un epoch pour l'association [ transactional ID, PID ].
          • Ça va aussi attendre que les transactions précédentes associées à ce transactional ID soient terminées (soit COMMITED, soit ABORTED).
            • Dans le cas où le consumer précédent n'a pas eu le temps de dire s'il voulait commit ou abort, par défaut le broker va déclarer la transaction ABORTED.
        • beginTransaction() permet de commencer la transaction.
        • sendOffsetsToTransaction() envoie les offsets du consumer.
          • Le consumer va donc faire son commit à travers l'API du producer, et non pas avec sa méthode commit habituelle.
          • Il faut bien sûr que l'auto-commit soit désactivé pour le consumer.
        • commitTransaction() permet de valider la transaction.
        • abortTransaction() permet de l'annuler.
    • Le choix du transactional ID est un des sujets majeurs de confusion autour des transactions Kafka.
      • Parmi les possibilités naïves qu'on pourrait imaginer :
        • Si on lui attribue une même valeur parmi l'ensemble des producer process d'un même stage, seul l'instance de producer la plus récente pourra prendre la main, en transformant les producers qui sont issus de la lecture de toutes les autres partitions, en zombies.
        • Si on lui attribue une valeur complètement aléatoire et unique du type UUID, alors aucun producer ne sera transformé en zombie, pas même ceux qui auront été éjectés à cause d'un timeout.
          • Ces process à qui on aurait enlevé la responsabilité de leurs partitions, et qui seraient encore en train d'attendre qu'une transaction se termine, pourraient encore bloquer le fait que de nouveaux messages apparraissent dans leurs anciennes partitions pendant transactional.id.expiration.ms (par défaut 1 heure).
          • Dans le cas où ces process auraient encore des messages dans leur buffer, ils pourraient aussi continuer à exécuter leurs callbacks.
      • La bonne solution est d'assigner un transactional ID composé de la concaténation entre l'input topic et l'index de la partition de ce topic qu'on est en train de consommer.
        • Le résultat c'est potentiellement un grand nombre de producers créés, avec chacun son transactional ID composé du topic et de l'index de la partition.
          • Pour éviter d'en avoir trop, l'approche privilégiée est de ne créer que les producers pour les partitions assignées à un consumer donné, et de les supprimer si les partitions sont rebalancées et enlevées.
    • Côté consumers, la notion de transaction se matérialise dans le choix de ce qui sera lu.
      • Quand le producer publie des messages dans des topics dans le cadre d'une transaction, il va les publier directement et de manière irrévocable, mais ils seront entourés de markers.
        • Il y a un marker pour indiquer le début de la transaction dans la partition, et un autre pour indiquer la fin de transaction réussie (COMMITTED) ou échouée (ABORTED).
      • Le consumer dispose d'une option isolation.level (par défaut read_uncommited).
        • La valeur read_uncommited permet de lire tous les records de la partition, ceux qui ne font pas partie d'une transaction comme ceux qui en font partie, que la transaction soit validée, annulée, ou toujours en cours.
        • La valeur read_commited permet de ne lire que les records qui ne font pas partie d'une transaction, ou ceux qui sont dans une transaction validée.
          • Pour un consumer qui a read_commited activé, l'End Offset est remplacé par la notion de LSO (Last Stable Offset), qui pointe vers le dernier record qui ne fait pas partie d'une transaction non terminée.
          • Tant que la transaction est en cours, le consumer ne pourra pas lire plus loin.
  • Les transactions ont un certain nombre de limitations.
    • Le système de transaction de Kafka n'est pas compatible avec d'autres systèmes de transaction comme XA ou JTA.
    • Une transaction est limitée à un même producer (même transactional ID, même PID).
    • La transaction peut être lue de manière partielle par des consumers sans qu'ils s'en rendent compte : il suffit que le consumer n'ait à sa charge que certaines partitions où la transaction a publié des messages, mais pas les autres.
    • La exactly-once delivery ne s‘applique pas aux side effects en dehors de Kafka : par exemple on peut jouer une callback plusieurs fois, et ajouter plusieurs entrées en DB, même si côté Kafka les messages sont bien publiés exactly-once.
  • Faut-il utiliser les transactions ?
    • On peut se poser la question de la complexité additionnelle par rapport à ce que ça apporte : une déduplication des messages à travers les stages.
      • Dans le cas où la consommation de nos messages n'a que des side-effects idempotents, alors avoir des messages en double dans Kafka peut ne pas être problématique.
      • D'un autre côté, la complexité en question peut être abstraite dans une couche adapter.