Dans l'article précédent, nous avons dit que dans le contexte de multi-filetés en même temps, si nous devons réaliser le statut entre les cours d'eau, les informations entre les sujets d'échange sont une chose très compliquée et difficile. Parce que nous n'avons pas de privilèges système plus avancés, il n'y a pas de popularité de Dieu. Il est difficile de connaître l'ensemble de l'image de l'activité actuelle, de sorte que vous souhaitez concevoir une fonction de fonctionnement stable sans erreurs, non seulement très difficile, mais aussi très gênant.
Mode de consommation des consommateurs annuellement
Dans le développement quotidien, Transférer des données d'un fil à un autre et un flux familial de la famille en direct . Pour l'exemple le plus simple, nous devons imprimer le journal correspondant de cette demande lors du traitement des demandes Web. Les journaux d'impression sont un comportement IO, très consommé, nous ne pouvons donc pas la définir synchronisés selon les besoins, sinon cela affectera les performances du système. Le meilleur moyen est de lancer une série de sujets responsablesL'impression, le flux arrière n'est responsable que des commentaires et des journaux pertinents transmis au flux d'impression sous forme de messages.
Cette simple n'est pas liée aux détails, ajoutez quelques détails supplémentaires. Tout d'abord, les données du flux IO proviennent du courant d'arrière-plan, s'il n'y a aucune demande pendant un certain temps, ces flux dormiront et seront démarrés au besoin. Deuxièmement, s'il existe de nombreuses exigences pendant une certaine période, elle a provoqué l'impression d'IO d'imprimer toutes les données pendant un certain temps, la demande doit alors être enregistrée à l'avance, puis le thread io est «occupé» Traitement précédent.
Prenez ces détails, vous pouvez vous concevoir ou vous déranger. Heureusement, dans cette question, les gens ont pensé à nous et ont fait un design très classique qui l'utilise pour résoudre ce très bon problème. Ce mode est Mode de consommation de production .
Le principe de cette conception est vraiment simple, voir l'image.
sujetDivisé en
Thème de production et thème de la consommationSur la base de la relation entre données et données, dans laquelle le flux de fabricant est responsable des données de production. Les données seront stockées dans la file d'attente de la tâche. Thème de consommation Pour récupérer les données de cette file d'attente nécessite une consommation et n'interagit pas directement entre les flux des fabricants pour éviter l'interdépendance entre les cours d'eau.
Autres détails que la file d'attente de tâches ici n'est pas une file d'attente normale, en général, une bloquant la file d'attente
. Cela signifie que lorsque les flux de consommateurs tentent de collecter des données, si la file d'attente est vide, ces consommateurs se bloqueront automatiquement jusqu'à ce qu'il récupère des données. Bien sûr, il existe des files d'attente inconnues, si elles ne sont pas bloquées, lorsque nous essayons de récupérer des données de celui-ci, s'il n'ya pas de données, il ne sera pas suspendu, mais il retournera des valeurs nulles.Bien sûr, le temps d'attente de la file d'attente de bloc peut également être placé, nous pouvons attendre l'attendre ouMettre un temps d'attente le plus long
. Si ce temps retournera les différentes applications de file d'attente dans différentes scènes, nous devons effectuer des ajustements en fonction de la nature de la scène.Lundi, Déploiement de code Après avoir lu le principe du mode de conception, nous essaierons d'utiliser le code pour le faire. Il y a une bibliothèque de files d'attente pré-fabriquées en général Senior Language. Parce qu'il est utilisé en mode de production de consommation, le blocage de la file d'attente est bloqué et bien sûr une file d'attente non bloquée. Nous devons comprendre avant de l'utiliser. Si la file d'attente est utilisée, la file d'attente entraînera un problème avec l'ensemble du programme. En Python, notre file d'attente la plus populaire est une file d'attente congestionnée
pour prendre en charge plusieurs flux, nous allons donc l'utiliser directement.
Parce que cette conception est très simple, ce code n'est pas long:
Nous verrons que la faisabilité est réalisable et due à la première file d'attente
Tien -eng Limitée, ouiIl est possible de s'assurer que la chaîne
du contenu est lue par les canaux de consommation est compatible avec l'ordre du produit généré
.from queue import Queuefrom threading import Threaddef producer(que): data = 0 while True: data += 1 que.put(data) def consumer(que): while True: data = que.get() print(data) que = Queue()t1 = Thread(target=consumer, args=(que, ))t2 = Thread(target=producer, args=(que, ))t1.start()t2.start() Si nous exécutons ce code, il constatera que cela ne se terminera pas, car les consommateurs et les fabricants utilisent la boucle morte construite par True, supposons que nous espérons vérifier le contrôle de la fin du programme, comment il devrait gérer dans ]
C'est également très simple, nous pouvons également utiliser la file d'attente. Nous créons une option spéciale et le programme arrêtera le programme lorsque les consommateurs acceptent cette valeur spéciale. De cette manière, nous devons mettre fin au programme, nous devons juste ajouter ce sémaphore à la file d'attente. C'est que nous sommes dans les consommateurs, lors de la lecture d'un singulateur, nous avons remis à la file d'attente avant de sortir de la boucle. La raison est aussi très simple, car parfois des thèmes de consommation ne sont pas seulement un, ce flux le plus bas
est uniquement défini, ne sera lu que par un thème,D'autres flux ne savent pas que les messages simples ont été obtenus, alors continuez toujours.
et lorsque les consommateurs sont éteints, vous pouvez assurer chaque consommation. La fin du signal de fin est lue à nouveau avant la fermeture. Surmonter ainsi, vous pouvez vous assurer que tous les consommateurs sont désactivés.
Il existe également un petit détail, bien que la file d'attente puisse résoudre le problème des fabricants de consommateurs et des communications, mais le fabricant en amont ne sait pas si le consommateur abaissé Save a été terminé. Que dois-je faire si nous voulons savoir?
singal = object()def producer(que): data = 0 while data < 20: data += 1 que.put(data) que.put(singal) def consumer(que): while True: data = que.get() if data is singal: # 继续插入singal que.put(singal) break print(data) Les concepteurs de Python ont également déclaré avec ce problème. Ils ont donc ajouté
Rask_Done et des méthodes pour participer à cette file d'attente de classe . Avec Task_Done, les consommateurs peuvent annoncer la file d'attente, cette tâche est terminée. Et en appelant la participation, vous pouvez attendre que tous les consommateurs complètent.
En plus de l'utilisation de Task_Done, nous pouvons également ajouter un événement dans le messageTransféré avec des bâtons afin que nous puissions continuer à voir la situation à chaque événement.
Troisième, File d'attêtes de priorité et autres paramètres
Lorsque nous avons introduit certains systèmes de planification dispersés, nous avons dit que dans la coordination du système, le coordinateur utilisera la file d'attente prioritaire. Gérer toutes les tâches. Lorsqu'il y a une machine inactive, il limitera les tâches prioritaires élevées.
En fait, ce système de planification est également basé sur le modèle de production de consommateurs, nous venons d'introduire, mais changera la file d'attente d'expédition des files d'attente normales accédant à la file d'attente prioritaire
. Donc, si nous espérons que nos consommateurs peuvent modifier l'ordre d'exécution basé sur le niveau de priorité de la tâche, vous pouvez également utiliser des files d'attente prioritaires pour gérer les tâches.from queue import Queuefrom threading import Threaddef producer(que): data = 0 while data < 20: data += 1 que.put(data) def consumer(que): while True: data = que.get() print(data) que.task_done() que = Queue()t1 = Thread(target=consumer, args=(que, ))t2 = Thread(target=producer, args=(que, ))t1.start()t2.start()que.join() Nous sommes habitués à déployer des files d'attente prioritaires, mais il y a un problème que nous devons effectuer une suspension suspendue. C'est nousPeut y parvenir, mais il peut être atteint en appelant des bibliothèques connexes. Par exemple, des conditions d'écoulement intégrées,
est une variable conditionnelle pour notifier d'autres flux ou vous pouvez accrocher en attente
.a finalement introduit les autres réglages de la file d'attente, tels que nous pouvons
définir la taille de la file d'attente via le paramètre Taille
, car il s'agit des blocs d'attente, donc si nous définissons la taille. de la file d'attente a ensuite bloqué lorsque la file d'attente est remplie et manipulée dans laquelle les données insérées sont également bloquées. À ce stade, le thème du producteur sera suspendu et jusqu'à ce que la file d'attente ne soit plus pleine.Bien sûr, nous pouvons également passer des blocs de ginseng Définir le fonctionnement de la file d'attente pour ne pas bloquer . Par exemple: que.get (block = false), une file d'attente est une exception vide lorsque la file d'attente est vide. De même, que.put (données, bloc = false aura également l'exception de la file d'attente.
Résumé Aujourd'hui, postCela écrit principalement des modèles de consommateurs classiques dans de nombreuses scènes de flux, ce modèle est utilisé dans de nombreuses scènes. Par exemple, un système de messagerie comme Kafka, ainsi que des fibres et d'autres systèmes de planification, V.V presque tant que cela implique une communication en amont, elle est souvent utilisée. Par conséquent, il est trop vaste à utiliser, de sorte que
apparaît souvent dans des entretiens différents, soit également l'un des régimes de conception de base que doivent connaître les ingénieurs.from threading import Thread, Conditionclass PriorityQueue: def __init__(self): self._queue = [] self._cv = Condition() def put(self, item, priority): with self._cv: heapq.heappush(self._queue, (-priority, self._count, item)) # 通知下游,唤醒wait状态的线程 self._cv.notify() def get(self): with self._cv: # 如果对列为空则挂起 while len(self._queue) == 0: self._cv.wait() # 否则返回优先级最大的 return heapq.heappop(self._queue)[-1] De plus, la file d'attente est également une structure de données qui apparaît souvent dans les conceptions et utilise des scripts. De côté, il illustre également pourquoi les algorithmes et les structures de données sont importants. De nombreuses grandes entreprises souhaitent poser quelques questions, car a des scènes d'utilisation réelles et a confirmé la capacité des ingénieurs en pensant. J'ai généralement un cas en utilisant le cas d'algorithmes et de structures de données, ce qui est un bon exemple.