</tr>
</tbody>
</table>
让我们来看看每个方法的具体代码实现。
第一步
[GraphManager#createIfAbsent(graphId, input)][GraphManager_createIfAbsent_graphId_ input] 方法,创建一个 Graph 对象,并添加到 Graph 映射。代码如下:
- INSTANCE 属性,单例。
- allGraphs 属性,Graph 映射。其中映射的 KEY 为每个 Graph 全局唯一编号。在 [JvmMetricStreamGraph][] 、[RegisterStreamGraph][] 、[TraceStreamGraph][] 类中,枚举了实际使用的 Graph 编号们。
- 第 50 至 58 行:当 Graph 映射里不存在指定 Graph 编号时,创建 Graph 对象,并返回。
第二步
[Graph#addNode(WayToNode)][Graph_addNode_WayToNode] 方法,创建该 Graph 的首个 Node 对象。代码如下:
- id 属性,Graph 编号。
- entryWay,首个提交数据给 Node 的方式。
- 第 58 行 :将方法参数 entryWay 赋值给 this.entryWay 属性。在下分享的 Graph#start(input) 方法里,我们会看到这是 Graph 启动的入口,首个提交给 Node 的方式。
- 第 60 至 62 行 :调用 WayToNode#buildDestination(Graph) 方法,创建 Node 对象,并返回该 Node 。在上文中,我们已经说过创建的 Node 对象,为该 Graph 的首个 Node 。
[WayToNode#buildDestination()][WayToNode_buildDestination] 方法,创建该 WayToNode 的 Node 对象。代码如下:
-
destination 属性,目标 Node 。即该 WayToNode 提交数据到的 Node 。
-
destinationHandler 属性,目标 Node 的处理器。见 [#out(INPUT)][out_INPUT] 方法。
-
第 42 行:创建 Node 对象。
-
目前,destinationHandler 属性,除了用于创建 Node 对象,无其他用途。
[Node 构造方法][Node] 方法,代码如下:
- nodeProcessor 属性,节点处理器。
- next 属性,包含 WayToNode 数组,即 Node 提交数据给 Next 的 Node 数组的方式。
- 第 44 行:调用 Graph#checkForNewNode(Node) 方法,校验 Node 的 NodeProcessor 在其 Graph 里,编号唯一。
[Graph#checkForNewNode(Node)][Graph_checkForNewNode_Node] 方法,校验 Node 的 NodeProcessor 在 Graph 里,编号唯一,代码如下:
- nodeIndex 属性,处理器编号与 Node 的映射。其中映射的 KEY 为 NodeProcessor#id() 。
- 第 72 至 78 行:校验 Node 的 NodeProcessor 在 Graph 里,编号唯一。
第三步
[Node#addNext(WayToNode)][Node_addNext_WayToNode] 方法,创建该 Node 的下一个 Node 对象。代码如下:
- 第 54 行:调用 WayToNode#buildDestination(Graph) 方法,创建该 Node 的下面的 Node 对象。
- 第 56 行:添加创建的 Node 对象到 next 属性。
- 第 58 行:返回创建的 Node 对象。
2.2 Graph 启动
创建Graph 的顺序图如下:
![ ][nbsp 7]
<table>
<thead>
<tr>
<th>数据流向</th>
<th>FROM</th>
<th>TO</th>
<th>逻辑</th>
</tr>
</thead>
<tbody>
<tr>
<td>第一步</td>
<td>Graph</td>
<td>WayToNode</td>
<td></td>
</tr>
<tr>
<td>第二步</td>
<td>WayToNode</td>
<td>Node</td>
<td></td>
</tr>
<tr>
<td>第三步</td>
<td>Node</td>
<td>NodeProcessor</td>
<td></td>
</tr>
<tr>
<td>第四步</td>
<td>NodeProcessor</td>
<td>Next</td>
<td>根据具体实现,若到 Next ,重复第一步</td>
</tr>
</tbody>
</table>
![ ][nbsp 8]
第一步
[Graph#start(input)][Graph_start_input] 方法,启动 Graph ,处理数据。代码如下:
- 第 49 行:调用 WayToNode#in(input) 方法,输入数据给 WayToNode 。
[WayToNode#in(input)][WayToNode_in_input] 抽象方法,以 [DirectWay#in(input)][DirectWay_in_input] 实现方法举例子,代码如下:
- 第 30 行:调用 super#out(input) 方法,直接输出数据,调用 Node#execute(input) 方法,提交数据给 Node ,进行处理。
第二步
[Node#execute][Node_execute] 方法,调用 NodeProcessor#process(input, next) 方法,处理数据。
第三步
[NodeProcessor#process(input, next)][NodeProcessor_process_input_ next] 接口方法,以 [AbstractWorker#process(input, next)][AbstractWorker_process_input_ next] 实现方法举例子,代码如下:
- 第 64 行:将方法参数 next 赋值给 this.next 属性。this.next 属性,用于封装的 #onNext(OUTPUT) 方法,提交数据给当前 Node 的 Next ( 下面的 Node 们 )继续处理数据。
- 第 67 行:调用 [#onWork][onWork] 抽象方法,处理数据。当 AbstractWorker 抽象类的实现类需要继续讲数据提交给 Next 时,需要在 #onWork 方法里,调用 #onNext(OUTPUT) 方法,例如 [ApplicationRegisterRemoteWorker#onWork(Application)][ApplicationRegisterRemoteWorker_onWork_Application] 。
第四步
[Next#execute(INPUT)][Next_execute_INPUT] 方法,循环 WayToNode 数组,输入数据给 WayNode ,相当于”重回“【第一步】。
3. apm-collector-stream
在文章的开头,我们提到了 apm-collector-stream 模块,在 graph 包的基础上,提供异步、跨节点等等的流式处理的封装。主要在 WayToNode 、NodeProcessor 的实现类上做文章。
3.1 WayToNode 实现类
整体类图如下:
![ ][nbsp 9]
3.1.1 WorkerRef
[org.skywalking.apm.collector.stream.worker.base.WorkerRef][org.skywalking.apm.collector.stream.worker.base.WorkerRef] ,Worker 引用抽象类。
在apm-collector-stream 模块里,我们会发现类的命名从 Node / NodeProcessor 转向了 Worker ?这是为什么呢?关于这一点,我们特意采访( 请教 )了官方大佬。
>Worker 更具业务含义
>Node / Processor 更偏技术含义
目前,WorkerRef 无具体的方法。
3.1.2 LocalAsyncWorkerRef
org.skywalking.apm.collector.stream.worker.base.LocalAsyncWorkerRef ,异步 Worker 引用实现类,提供了异步的流式处理封装。
我们回到 [「2.2 Graph 创建」][3. apm-collector-stream] 的【第一步】。
[LocalAsyncWorkerRef#in(INPUT)][LocalAsyncWorkerRef_in_INPUT] 方法,代码如下:
- [queueEventHandler][queueEventHandler] 属性,队列事件处理器。在 [《SkyWalking 源码分析 —— Collector Queue 队列组件》][SkyWalking _ _ Collector Queue] 我们会详细解析它的代码实现,这里只简单介绍下。
- 第 47 行:将输入的数据,作为”事件“,提交到队列事件处理器中,不再执行后续逻辑。此后,队列事件处理器,会在后台处理到该”事件“( 数据 ),回调 [LocalAsyncWorkerRef#execute][LocalAsyncWorkerRef_execute] 方法,从而提交数据到 Worker ( Node )。详细参见 [DisruptorEventHandler#onEvent(…)][DisruptorEventHandler_onEvent] 方法。
那么为什么会回调呢?LocalAsyncWorkerRef 实现了 [org.skywalking.apm.collector.queue.base.QueueExecutor][org.skywalking.apm.collector.queue.base.QueueExecutor] 接口,它自身被设置到 QueueEventHandler 中, 作为”事件“的执行器。
整体流程如下:
![ ][nbsp 10]
3.1.3 RemoteWorkerRef
org.skywalking.apm.collector.stream.worker.base.RemoteWorkerRef ,远程 Worker 引用实现类,提供了远程跨节点的流式处理的封装。
我们再回到 [「2.2 Graph 创建」][3. apm-collector-stream] 的【第一步】。
[RemoteWorkerRef#in(INPUT)][RemoteWorkerRef_in_INPUT] 方法,代码如下:
- remoteSenderService 属性,远程发送服务。在 [《SkyWalking 源码分析 —— Collector Remote 远程通信服务》「3.2 GRPCRemoteSenderService」][SkyWalking _ _ Collector Remote _3.2 GRPCRemoteSenderService] 我们会详细解析它的代码实现,这里只简单介绍下。
- remoteWorker 属性,远程 Worker 。在下文会详细分享它的实现。
- 第 56 行:调用 RemoteSenderService#send(…) 方法,根据远程 Worker 的 [Selector 选择器][Selector],选择一个 Worker 进行发送。
- 第 58 至 60 行:当选择的 Worker 为本地模式( [Mode][] )时,调用 #out(INPUT) 方法,提交数据到本地的 Worker ( Node )。
3.2 NodeProcessor 实现类
整体类图如下:
![ ][nbsp 11]
- [org.skywalking.apm.collector.stream.worker.base.Provider][org.skywalking.apm.collector.stream.worker.base.Provider] ,Worker 供应者接口,用于创建 Worker 和 WorkerRef 对象的工厂。
3.2.1 AbstractWorker
AbstractWorker 的代码实现,在 [「2.2 Graph 启动」][3. apm-collector-stream] 已经详细解析。
[org.skywalking.apm.collector.stream.worker.base.AbstractWorkerProvider][org.skywalking.apm.collector.stream.worker.base.AbstractWorkerProvider] ,Worker 供应者抽象类,定义了 [#workerInstance(ModuleManager)][workerInstance_ModuleManager] 抽象方法,用于创建 Worker 对象。
3.2.2 AbstractLocalAsyncWorker
[org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker][org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker] ,异步 Worker 抽象类。
目前,AbstractLocalAsyncWorker 无具体的方法。
实际使用时,继承 AbstractLocalAsyncWorker 类,实现 #work(INPUT) 方法,例如:[ApplicationRegisterSerialWorker][] 。
org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider ,LocalAsyncWorker 供应者抽象类。
-
[queueCreatorService][queueCreatorService] 属性,队列创建服务,用于创建 QueueEventHandler 对象。
-
[#queueSize()][queueSize] 抽象方法,声明队列大小。
-
[#create(WorkerCreateListener)][create_WorkerCreateListener] 实现方法,创建 AbstractLocalAsyncWorker 和 LocalAsyncWorkerRef 对象。
-
第 51 行:创建 AbstractLocalAsyncWorker 实现类的对象。参见 [ApplicationRegisterSerialWorker.Factory#workerInstance(ModuleManager)][ApplicationRegisterSerialWorker.Factory_workerInstance_ModuleManager] 方法。
-
第 54 行:添加 AbstractLocalAsyncWorker 到 WorkerCreateListener ( Worker 创建监听器 )。WorkerCreateListener 在 [《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「4.1 WorkerCreateListener」][SkyWalking _ _ Collector Streaming Computing _4.1 WorkerCreateListener] 详细解析。
-
第 57 行:创建 LocalAsyncWorkerRef 对象。
-
第 60 行:调用 QueueCreatorService#create(…) 方法,创建 QueueEventHandler 对象,并设置 LocalAsyncWorkerRef 作为它的执行器。
-
第 63 行:设置 LocalAsyncWorkerRef 的 QueueEventHandler 属性。
3.2.3 AbstractRemoteWorker
[org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker][org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker] ,远程 Worker 抽象类,定义了 [#selector()][selector] 抽象方法,获得选择器。RemoteSenderService 根据选择器,调用 [RemoteClientSelector#select(…)][RemoteClientSelector_select_…] 方法,选择好远程节点,而后进行发送数据。
实际使用时,继承 AbstractLocalAsyncWorker 类,实现 #work(INPUT) 方法,例如:[ApplicationRegisterRemoteWorker][] 。
org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider ,AbstractRemoteWorker 供应者抽象类。
-
[remoteSenderService][remoteSenderService] 属性,远程发送服务。
-
[#create(WorkerCreateListener)][create_WorkerCreateListener 1] 实现方法,创建 AbstractRemoteWorker 和 RemoteWorkerRef 对象。
-
第 58 行:创建 AbstractRemoteWorker 实现类的对象。参见 [ApplicationRegisterRemoteWorker.Factory#workerInstance(ModuleManager)][ApplicationRegisterRemoteWorker.Factory_workerInstance_ModuleManager] 方法。
-
第 61 行:添加 AbstractLocalAsyncWorker 到 WorkerCreateListener ( Worker 创建监听器 )。WorkerCreateListener 在 [《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「4.1 WorkerCreateListener」][SkyWalking _ _ Collector Streaming Computing _4.1 WorkerCreateListener] 详细解析。
-
第 64 行:创建 RemoteWorkerRef 对象。
版权声明:本文不是「本站」原创文章,版权归原作者所有 | [原文地址:][Link 1]
[nbsp]: https://cdn.cxykk.com/images/2024/2/8/1538/1707377933983.png
[nbsp 1]: https://cdn.cxykk.com/images/2024/2/8/1538/1707377939436.png
[https_github.com_apache_incubating-skywalking]: https://github.com/apache/incubating-skywalking
[nbsp 2]: https://cdn.cxykk.com/images/2024/2/8/1539/1707377945277.png
[nbsp 3]: https://cdn.cxykk.com/images/2024/2/8/1539/1707377951031.png
[Storm_Spark_Samza]: http://www.csdn.net/article/2015-03-09/2824135
[Storm]: https://storm.apache.org/
[nbsp 4]: https://cdn.cxykk.com/images/2024/2/8/1539/1707377956681.png
[nbsp 5]: https://cdn.cxykk.com/images/2024/2/8/1539/1707377962170.png
[3. apm-collector-stream]: https://www.iocoder.cn/SkyWalking/collector-streaming-first/#
[nbsp 6]: https://cdn.cxykk.com/images/2024/2/8/1539/1707377967794.png
[GraphManager_createIfAbsent_graphId_ input]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/GraphManager.java#L50
[JvmMetricStreamGraph]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java
[RegisterStreamGraph]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java
[TraceStreamGraph]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java
[Graph_addNode_WayToNode]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Graph.java#L56
[WayToNode_buildDestination]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/WayToNode.java#L41
[out_INPUT]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/WayToNode.java#L47
[Node]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Node.java#L40
[Graph_checkForNewNode_Node]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Graph.java#L71
[Node_addNext_WayToNode]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Node.java#L51
[nbsp 7]: https://cdn.cxykk.com/images/2024/2/8/1539/1707377973401.png
[nbsp 8]: https://cdn.cxykk.com/images/2024/2/8/1539/1707377979302.png
[Graph_start_input]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Graph.java#L48
[WayToNode_in_input]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/WayToNode.java#L45
[DirectWay_in_input]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/DirectWay.java#L29
[Node_execute]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Node.java#L62
[NodeProcessor_process_input_ next]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/NodeProcessor.java#L34
[AbstractWorker_process_input_ next]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorker.java#L62
[onWork]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorker.java#L60
[ApplicationRegisterRemoteWorker_onWork_Application]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterRemoteWorker.java#L46
[Next_execute_INPUT]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Next.java#L50
[nbsp 9]: https://cdn.cxykk.com/images/2024/2/8/1539/1707377985132.png
[org.skywalking.apm.collector.stream.worker.base.WorkerRef]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerRef.java
[LocalAsyncWorkerRef_in_INPUT]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java#L50
[queueEventHandler]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java#L36
[SkyWalking _ _ Collector Queue]: http://www.iocoder.cn/SkyWalking/collector-queue-module/?self
[LocalAsyncWorkerRef_execute]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java#L46
[DisruptorEventHandler_onEvent]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorEventHandler.java#L54
[org.skywalking.apm.collector.queue.base.QueueExecutor]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java
[nbsp 10]: https://cdn.cxykk.com/images/2024/2/8/1539/1707377990654.png
[RemoteWorkerRef_in_INPUT]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/RemoteWorkerRef.java#L53
[SkyWalking _ _ Collector Remote _3.2 GRPCRemoteSenderService]: http://www.iocoder.cn/SkyWalking/collector-remote-module?self
[Selector]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/Selector.java
[Mode]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteSenderService.java#L36
[nbsp 11]: https://cdn.cxykk.com/images/2024/2/8/1539/1707377996356.png
[org.skywalking.apm.collector.stream.worker.base.Provider]: https://github.com/YunaiV/skywalking/blob/23c2146c134e0ef0a37a43758a1e04727de7697a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/Provider.java
[org.skywalking.apm.collector.stream.worker.base.AbstractWorkerProvider]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorkerProvider.java
[workerInstance_ModuleManager]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorkerProvider.java#L46
[org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java
[ApplicationRegisterSerialWorker]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterSerialWorker.java#L56
[queueCreatorService]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java#L35
[queueSize]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java#L40
[create_WorkerCreateListener]: https://www.iocoder.cn/SkyWalking/collector-streaming-first/
[ApplicationRegisterSerialWorker.Factory_workerInstance_ModuleManager]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterSerialWorker.java#L90
[SkyWalking _ _ Collector Streaming Computing 4.1 WorkerCreateListener]: http://www.iocoder.cn/SkyWalking/collector-streaming-second/?self
[org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorker.java
[selector]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorker.java#L45
[RemoteClientSelector_select…]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RemoteClientSelector.java#L31
[ApplicationRegisterRemoteWorker]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterRemoteWorker.java
[remoteSenderService]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorkerProvider.java#L40
[create_WorkerCreateListener 1]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorkerProvider.java#L56
[ApplicationRegisterRemoteWorker.Factory_workerInstance_ModuleManager]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterRemoteWorker.java#L61
[Link 1]: https://blog.csdn.net/weixin_42073629/article/details/119703379
<p><span>上一篇:</span><a href="https://cxykk.com/?p=10449" rel="nofollow">13、SkyWalking源码分析CollectorStreamingComputing流式处理(二)</a></p>
<p><span>下一篇:</span><a href="https://cxykk.com/?p=10445" rel="nofollow">11、SkyWalking源码分析CollectorStorage存储组件</a></p>
<div style=" display: none;">
<div></div>
<div>
<div>
阅读全文
</div>
</div>
</div>
<div style="text-align: center;">
<span>江小北的笔记 <a href="https://aijiangsir.com/" title="江小北的笔记" rel="nofollow">AIJIANGSIR.COM</a> -<a href="https://beian.miit.gov.cn" style="color: rgb(46, 179, 255); font-weight:100;" rel="nofollow">沪ICP备2023041623号-1</a></span>
</div>
</tr>
</tbody>
</table>