net中的数据流模式
生产者 - 消费者模式 - 快速回顾
在Producer-Consumer模式的最简单实现中,我们有一个线程产生一些数据并将其放入队列,另一个线程消耗队列中的数据。
图1:生产者 - 消费者模式
该管道模式是生产者-消费者模式的变体。
这种模式允许消费者也是数据的生产者。此数据将放入第二个队列,另一个消费者将使用它。
图2:管道模式
在上面的示例中,我们有一个管道,它执行三个阶段的处理。
在第一阶段,产生一些数据,在第二阶段消耗该数据并产生一些其他数据。其他数据将在第三阶段消耗。
在上一篇文章中,我举了一个例子,在第一阶段,我们从一些商店(例如数据库)读取文档,在第二阶段我们翻译它们,在第三阶段,我们将翻译后的文档存储到某个目的地商店。在此示例中,第一个队列存储从商店读取的文档,而第二个队列存储已翻译的文档。
数据流模式
在管道模式中,我们可以拥有的阶段数量没有限制。但是,流量必须是线性的。
数据流模式消除了任何此类限制!
流程可以根据某些条件进行分支。或者它可以无条件地分支,以便将项目发送到两个处理节点,而不是一个。
图3:数据流中的分支
在该图中,在第一阶段中,读取文档。然后,根据文档的语言,它要么排队到西班牙文档的队列中,要么排队到德语文档的队列中。
特殊的消费者线程从第一个队列中读取西班牙文档并对其进行翻译。同时,另一个特殊的消费者线程从另一个队列中读取德语文档并进行翻译。
这两个消费者也是生产者,因为他们将翻译的文档添加到翻译的文档队列中。特殊的消费者线程读取翻译的文档并将它们写入商店(例如数据库)。
在继续这个例子之前,让我们先讨论为什么我们需要这个分支,例如为什么我们不简单地使用如下图所示的管道模式?
图4:使用管道而不是数据流
在这个图中,我们有一个简单的管道。在第一阶段,我们读取一个文档,并将其放在一个队列中,无论其编写语言如何。在第二阶段,我们确定文档语言,并决定是否调用一些代码来翻译西班牙文或其他代码来翻译德语。
注意这个阶段的并行度是多少。
在图3和图4的两个示例中,我们同时翻译了两个文档。
那么数据流方法给我们带来了哪些好处呢?
在数据流方法中,最多一个西班牙文档将在任何给定时间处理。此外,一次最多只能处理一份德国文件。
另一方面,在管道方法中,两个线程可能并行处理两个西班牙文档(或两个德语文档)。
因此,数据流方法使我们能够更好地控制每个操作的并行度。
注意:请参阅上一篇文章,了解您可能希望控制每个操作的并行度的原因。
此外,这种方法允许我们对队列大小进行更多控制。例如,考虑源存储中的西班牙文档非常大且德语文档很小的场景。我们可能决定在西班牙文档队列中最多包含10个未翻译的文档,在德国文档的队列中最多包含100个未翻译的文档。
在前面的示例中,分支是有条件的。也就是说,文档要么转到西班牙文档队列,要么转到德语文档队列。
另一种分支是无条件分支。在这种类型的分支中,该项目将无条件地双向发送。
例如,假设我们从源存储中读取的所有文档都是用英语编写的,我们希望将每个文档翻译成西班牙语和德语。在这种情况下,每个文档将被发送到两个队列。
接下来,让我们看看如何在.NET中实现第一个示例。
履行
在本节中,我们将介绍不同的实现选项。
BlockingCollection类
与前一篇文章一样,我将首先使用BlockingCollection类来实现Dataflow示例。
此示例与我在上一篇文章中提供的示例非常相似。我们只为三个队列创建三个BlockingCollection对象(加上一个用于输入队列),然后创建四个任务。
第一个从商店中读取文档,并根据其语言将它们放入适当的队列中。这是分支发生的地方。
接下来的两个任务从西班牙语和德语文档队列中获取文档,翻译它们,然后将它们放入已翻译的文档队列中。
第四个任务从已翻译的文档队列中获取文档并将它们保存到目标存储。
如果德语文档翻译方法(即TranslateGermanDocument)是异步IO绑定方法怎么办?即如果它有以下签名怎么办?
在这种情况下,如果我们在等待这个异步方法完成时不绑定线程会更好。
关于异步操作的注意事项:
例如,考虑西班牙文档翻译与我们的应用程序在同一台机器上进行。这意味着此操作是CPU密集型操作,因此需要一个线程。
另一方面,考虑通过商业Web服务调用完成德语文档翻译。这意味着此操作是I / O绑定操作,因此不需要线程。
线程是服务器应用程序中昂贵的资源,需要同时处理大量客户端请求。使用异步方法意味着我们可以保存这样的线程。
当我们创建德语文档翻译任务时,我们可以使用Task.Run的重载来接受异步操作,即Func
这样,在等待翻译方法完成时,我们不会占用一个线程。但是,我们仍然阻止这两个地方的当前线程:
当德语文档队列当前为空时,对germanDocumentsQueue.GetConsumingEnumerable()返回的可枚举上的IEnumerable
虽然我们使用异步操作作为Task.Run方法的参数,但这两个调用仍将同步完成。在我们具有高度并行性的操作的情况下,这种同步等待将导致我们不必要地占用许多线程,这将影响系统的可伸缩性。
我们可以通过使用类似于BlockingCollection类但支持从队列中异步获取和添加的类来解决这些问题。也就是说,允许我们异步阻止队列非空或非满的类。
一个这样的类可以在AsyncEx库中找到,称为AsyncProducerConsumerQueue。我们可以在前面的示例中使用此类替换BlockingCollection类以备用线程,同时等待队列变为非空或非满。我没有在这里详细说明如何做到这一点,因为我把这作为读者的练习。
请注意,即使使用同步操作(例如,CPU绑定文档转换),也可以异步阻止等待队列。
TPL Dataflow API
接下来,我将向您展示如何使用TPL Dataflow API实现相同的示例。
虽然未随.NET框架一起提供,但TPL Dataflow库是Microsoft专门为帮助我们构建数据流而创建的库。该库提供了一组块,每个块都具有特定功能。以下是一些示例块:
所述TransformBlock块处理数据接收,以产生其它数据。它包括一个输入和一个输出缓冲区。这些缓冲区类似于自上一篇文章以来我一直在讨论的队列。所述ActionBlock块处理数据接收,但不产生任何数据进行处理。它包括一个输入缓冲区。
有关TPL Dataflow库提供的不同块类型的更多信息,请查看TPL Dataflow文档。
在本文中,我将展示如何实现我在使用TPL Dataflow库之前使用的相同示例。我还将讨论API的不同功能,正如我解释的那样。
考虑以下代码:
此方法包含13条语句,在注释中正确标记。我现在就解释一下。
在1 日声明,我创建将读取的存储文档块。此块是TransformBlock
在这种情况下,字符串是文档ID。构造函数中的第一个参数是函数的委托,该函数将字符串转换为文档。我们给它一个lambda表达式来调用ReadDocumentFromSourceStore方法。第二个参数允许我们为此块配置更多选项。我已将MaxDegreeOfParallelism属性设置为1.此代码并非真正需要,因为默认值为1,但我仍然设置它只是为了向您展示如何控制此块的并行度。
在2 次和3 次的语句,我创建两个TransformBlock <文档,文档>对象。第一个翻译西班牙文件,第二个翻译德文文件。注意TransformBlock的构造函数如何允许我在西班牙文档的情况下指定同步转换函数,并为德语文档指定异步转换函数。
请注意,当我创建这两个TransformBlock对象时,我将分别为西班牙语和德语块设置有界容量为10和100。正如我之前提到的,TransformBlock块有一个输入缓冲区和一个输出缓冲区。有界容量设置允许我们控制这些缓冲区的大小。
在第4 个语句中,我创建了一个ActionBlock
我还没有连接任何这些块。现在,开始连接(或在TPL DataFlow库中调用的链接)。
在第5 个语句中,我使用LinkTo方法将文档读取块与西班牙文档转换块链接。这基本上意味着来自读取块的数据将被发送到西班牙文档翻译块。
这里有两点需要注意。
第一个是我传递一个DataflowLinkOptions对象,其PropagateCompletion设置为true。我会尽快解释。
第二个是我为LinkTo方法提供了一个谓词,用于过滤从源块发送到目标块的数据项。在这种特殊情况下,我只希望将西班牙文档发送到西班牙文档翻译块。
在第6 个陈述中,我对德语文档做了同样的事情。
在第7 和第 8 个语句中,我分别将西班牙语和德语翻译块链接到保存文档块。
这里有两点需要注意。
首先,我们将这两个转换块链接到同一个目标块。其次,这里我们不将PropagateCompletion设置为true。我会尽快解释原因。
在第9 个语句中,我只是获取要处理的文档ID列表。
在第10 个语句中,我遍历此列表并在文档读取块上调用Post方法。这将导致处理实际开始。发布到读取块的每个项目将流经块。这当然取决于我们如何将块链接在一起以及我们为不同块指定的设置。
现在让我们谈谈完成的概念。
在我的所有代码示例中(在本文和上一篇文章中),在处理结束时,我调用了一些代码来等待处理完成。在许多情况下都需要这样做,因为我们需要确保在执行其他操作之前已经处理了所有数据。或者只是告知用户所有数据都已处理完毕。
在许多情况下,这意味着我在离开方法之前在某个Task对象上调用了Wait方法。
TPL Dataflow库提供了Completion概念来解决此问题。
在第13 个语句中,我调用了saveDocumentsBlock.Completion属性的getter 。这将为我们提供一个Task对象,该对象在块完成时完成。然后我调用此Task对象上的Wait方法来阻止,直到此任务完成。基本上,我想等待所有数据完全处理。
saveDocumentsBlock如何阻止“完成”?即使它已经处理了它收到的每一件物品,它怎么知道没有更多的物品来了?
Dataflow块支持传播完成信号的概念。
在第12 个语句中,我在readBlock对象上调用Complete方法。这告诉这个块我们已经完成了向它发布新项目。不仅如此,如果它被配置为这样做,它将完成信号发送(或传播)到它链接到的其他块,当它处理了它收到的所有项目时。
我想要的是完成信号传播通过所有块,直到它到达最后一个块,即保存文档块。这样,一旦最后一个块收到完成信号并完成处理所有项目,saveDocumentsBlock.Completion返回的Task 将完成。
一个小问题是我们有一个分支。完成信号可以通过两种方式到达保存文档块(请参见图3)。第一种方式是通过西班牙文档翻译块。第二个是通过德文文档翻译块。
如果我们允许自动传播完成信号,这可能会导致保存文档块在处理所有文档之前完成(并停止接收新项目)。
例如,如果处理所有西班牙文档但仍有一些尚未处理的德语文档,则会发生这种情况。西班牙文档翻译块将完成信号传播到保存文档块并导致处理停止。
这就是为什么我没有设定结束的信号传播从翻译块在7保存文档块第 8 个语句。
为了解决这个问题,我在第11 个语句中使用标准TPL代码告诉系统在西班牙语和德语文档翻译块完成时(即,当他们收到完成时)在保存文档块上调用Complete方法来自读取文档块的信号已经处理了它们收到的所有文档。
对于使用TPL数据流的另一个示例,请考虑使用 DotNetCurry上的.NET 4.5 .NET .NET并行数据流库构建映像大小调整器。
流量清晰度的问题
在使用BlockingCollection类的实现和使用TPL Dataflow API的实现中,流逻辑都与API代码纠缠在一起。为了进一步解释这一点,让我向您展示如果我们使用简单的数据并行来处理我们的文档,我们的代码将是什么样子:
这段代码看起来更好,对吧?
流逻辑非常清晰。ForEach循环的主体向我们展示了处理单个文档所需的步骤。我们先阅读该文件。然后我们确定它的语言。然后我们根据语言进行分支; 我们将TranslateSpanishDocument称为西班牙文档,将TranslateGermanDocument称为德语文档。最后,我们调用SaveDocumentToDestinationStore来保存翻译的文档。
现在尝试回到本文中的数据流实现,并尝试找到这些逻辑。到处都是。
在基于BlockingCollection的示例中,读取文档并确定其语言位于一个位置。翻译西班牙语和德语文档还有另外两个地方。保存文件还在另一个地方。在所有这些代码段之间,我们拥有与流逻辑无关的基础架构代码。
在基于TPL DataFlow API的示例中,情况更糟。例如,分支逻辑在两个不同的LinkTo调用之间分配。
我们能解决这个问题吗
我们能否获得简单数据并行的清晰度,并获得基于生产者 - 消费者的模式提供的所有好处?
我想我们可以,并且我创建了一个名为ProceduralDataflow的库来演示这一点。
ProceduralDataflow库
让我们先看看代码,然后我会向你解释。
我首先创建四个Dataflow块。
该ProcDataflowBlock和AsyncProcDataflowBlock类来自ProceduralDataflow库。这两个类之间的区别在于ProcDataflowBlock类用于CPU绑定操作,AsyncProcDataflowBlock类用于异步操作,例如I / O绑定操作。
这与我们创建TPL Dataflow块的方式非常相似。
每个块都有一个输入队列。我为每个块指定队列大小和并行度。但请注意,这里我没有指定每个块将执行的代码。
接下来,我创建了四个本地函数:DfReadDocumentFromSourceStore,DfTranslateSpanishDocument,DfTranslateGermanDocument和DfSaveDocumentToDestinationStore。这些函数使用上面创建的块来执行相应的方法。
注意:本地函数是C#7中的新功能。有关C#7的入门读物,请阅读www.dotnetcurry.com/csharp/1286/csharp-7-new-expected-features上的“C#7 - 新功能”教程。
例如,DfReadDocumentFromSourceStore方法使用readDocumentsBlock来运行ReadDocumentFromSourceStore方法。它通过在块上调用Run方法来实现。这将返回DfTask。该DfTask类是从ProceduralDataflow库中的类,类似于任务在.NET类。虽然存在差异,但我不打算深入研究ProceduralDataflow库在本文中的工作原理。
下一部分是最有趣的部分:ProcessDocument本地函数。此函数的主体看起来与之前的ProcessDocumentsUsingParallelForEach方法中的ForEach循环非常相似。
它非常清楚地显示了每个文档将要经历的步骤。流逻辑以非常清晰的方式表达。
在这个函数中,我们调用方法的Df *版本。这些方法返回DfTask或DfTask
请注意,由于这些方法返回DfTask而不是Task,因此await关键字的行为方式不同。同样,我不打算深入研究细节,因为在本文中我将重点介绍如何使用ProceduralDataflow,而不是内部如何使用它。
以下是您如何思考此方法(ProcessDocument本地函数):它描述了以过程方式处理文档的各个阶段,即它使用标准过程代码,如调用方法,使用if语句等。每个四个块中的只执行此方法的一部分。
这是异步/等待的强大功能。它允许执行单个方法的某些部分,就好像它们是完全分离的一样。
不要让程序逻辑欺骗你!
在引擎盖下,它像数据流一样工作。通过不同块处理多个文档是并行完成的,每个块的并行度得到尊重,并且有一些队列允许慢速消费者减慢快速生产者的速度。
请注意,ProcessDocument本地函数接受文档ID并返回Task。此任务表示单个文档的整个过程。
注意:为方便起见,我在本例中使用了本地函数。这些方法可以是常规方法。
后面的代码调用GetDocumentIdsToProcess方法来获取文档ID,然后从ProceduralDataflow中调用一个名为EnumerableProcessor.ProcessEnumerable的方法来处理所有文档。
这种方法没什么特别之处。它枚举传递的枚举,并为每个项调用提供的委托(在此示例中调用ProcessDocument)。此方法的唯一目的是管理项目的处理,以便在项目数量巨大时我们不会在队列中一次排队所有数据。在这个特定的例子中,我将未完成任务的数量限制为100。
此方法还返回在完成所有任务后完成的任务。
现在,请考虑以下代码:
这是ProcessDocument本地函数的修改版本。它在转换方法的调用周围添加了一个try / catch块。如果转换中发生异常,将调用名为DfStoreDocumentInFaultedDocumentsStore的方法将此类文档存储在特殊数据库中。
该方法还具有相应的块,具有自己的并行度和队列大小。请注意,try块包围了两个翻译文档的调用(西班牙语和德语翻译方法)。如果其中任何一个失败,文档将被发送到此特殊数据库。
另请注意,在catch块中,我们可以访问documentId和文档变量。要在实现Dataflow时了解此方法的价值,请尝试使用BlockingCollection类或TPL Dataflow API 实现此示例。
ProceduralDataflow库中的其他功能
ProceduralDataflow库具有更多功能。
1.它支持无死锁循环。
如果处理流程有循环怎么办?例如,流程从第一个块到第二个块,然后返回到第一个块。这可能会导致死锁。
考虑下图:
图5:数据流中的循环
在此图中,在块2处理项目之后,处理结果可能会返回到块1的队列,或者它可能会根据某些条件转到最终块队列。
现在,假设所有队列都已满。如果没有循环,这可能不是问题,因为最终的块队列最终会因为最后一个块处理某些项而变为非满。
但是,对于循环,块1可能正在等待块2的队列变为非满,而块2可能正在等待块1的队列变为非满。
这会导致死锁。
ProceduralDataflow通过使每个块具有两个队列来修复此问题。
第一个队列用于未通过循环到达块的数据,第二个队列用于由于循环而进入队列的数据。
第二个队列的特殊之处在于,向这样的队列添加项不会导致生成器(将数据添加到队列中的块)阻塞,这可以防止死锁。换句话说,第二个队列不受限制。
此外,当获取要处理的下一个项目时,块有利于从第一个队列中的项目构成第二个队列的项目。ProceduralDataflow如何检测数据项是否通过循环的详细信息超出了本文的范围。
2.它支持无条件分支和加入:
ProceduralDataflow允许您并行处理具有两个或更多块的数据项。完成后,您可以加入这些块中的数据并将它们作为一个单元进行处理。
例如,您可以将单个英语文档翻译为西班牙语和德语(每个都通过其自己的块),然后异步等待两个操作完成,然后将两个翻译的文档作为单个数据项加入以创建包含的zip文件这两个文件在一个单独的块中。所有这些都可以在程序上完成。
这是一个代码示例:
在这个例子中,在我们阅读英文文档之后,我们调用DfTranslateEnglishDocumentToSpanish和DfTranslateEnglishDocumentToGerman而不等待返回的任务。这意味着在我们开始翻译成德语之前,我们不会等待翻译成西班牙语。
请注意,这些翻译中的每一个都将使用不同的块完成。
然后,我们使用DfTask.WhenAll方法异步等待两个翻译完成。然后,两个翻译的文档将被提供给另一个块以压缩并将两个文档保存在一起。
3.它支持子过程:如果流逻辑很大,则将其拆分为多个DfTask或DfTask
4.没有不必要的线程使用:在等待队列变为非空或非满时,ProceduralDataflow不会导致任何线程阻塞。此外,异步块中不使用任何线程。
如何获取ProceduralDataflow库
ProceduralDataflow库是开源的,您可以在以下GitHub存储库中找到它:https://github.com/ymassad/ProceduralDataflow
我还在Nuget发布了这个库。您可以通过YMassad.ProceduralDataflow的名称找到它。目前它在预发行版中,因此如果要在Visual Studio中使用它,请确保选中Visual Studio内Nuget包管理器中的“include prerelease”复选框。
我鼓励读者尝试这个库。我将不胜感激任何反馈或建议。
结论:
在本文中,我介绍了Consumer-Producer数据流模式。此模式是Producer-Consumer模式的变体。与仅允许块之间的线性数据流的管道模式不同,数据流模式允许流是非线性的。例如,它允许数据的条件和非条件分支。
我已经展示了如何使用BlockingCollection类和TPL DataFlow API 实现此模式的示例,并讨论了这些实现如何使流逻辑的可读性降低。
我还介绍了一个名为ProceduralDataflow的新库,并通过一个示例展示了它如何帮助我们实现Dataflow模式,同时保持流逻辑清晰可读。