×

签到

分享到微信

打开微信,使用扫一扫进入页面后,点击右上角菜单,

点击“发送给朋友”或“分享到朋友圈”完成分享

【CN-SDK03】寒武纪 CNStream 使用介绍 小飞人2023-07-24 16:35:18 回复 查看 社区交流 干货资源
【CN-SDK03】寒武纪 CNStream 使用介绍
分享到:

知乎链接:https://zhuanlan.zhihu.com/p/614071148

若是初学者,建议先初级部分,本次的主要内容:

  • 主要内容:CNStream 概述,框架,内置模块以及工具。

  • 主要目的:对 CNStream 整体框架,内部模块功能及其自定义参数,以及工具有初步的了解。

  • 依赖的知识点:了解寒武纪硬件平台,对寒武纪基础软件CNToolkit,CNCV,MagicMind有概念,了解基础的图像和人工智能相关知识。

1、概述

CNStream 是面向寒武纪开发平台的数据流处理 SDK。用于快速搭建基于寒武纪硬件平台的人工智能应用。比如目标检测,视频结构化等等。图片所示为一个典型的视频结构化应用,包括解码,目标检测,目标追踪,目标属性识别以及结果处理。



1 CNStream简介

1)cnstream的软件栈

在cnstream之上的是用户应用。cnstream依赖于easydk(Easy development kit),easydk是基于寒武纪基础软件包开发的一套工具包,以子仓的形式存在于CNStream仓库中。同时,cnstream依赖于MagicMind,CNCV,CNCodec,CNRT等寒武纪基础软件包。最底层则是driver,kernel。



2)典型场景流程图

对数据流依次进行解码,推理,追踪,编码和渲染。首先通过json配置文件,构建一条如上所述的pipeline。数据流解码后,数据帧被推到下游队列中,随后,推理插件从上游队列中拿取到解码后的数据,进行推理,推理结束后再将数据推入下游队列,依此类推。数据帧在pipeline中流动,依次被每个插件处理。当视频流处理结束后,或发生异常时,应用将拿到pipeline发给应用的消息,并进行相应的处理。



3)配置文件

CNStream支持通过配置文件搭建pipeline。

例如一个目标检测业务的Json配置,通过profiler_config配置是否对pipeline进行profile或trace,主要用于性能统计及调优。

{
    "profiler_config" : {
        "enable_profiling" : true,
        "enable_tracing" : true
    },
    "source" : {
        "class_name" : "cnstream::DataSource",
        "next_modules" : ["detector"],
        "custom_params" : {
            "bufpool_size" : 16,
            "interval" : 1,
            "device_id" : 0
        }
    },
    "detector" : {
        "class_name" : "cnstream::Inferencer",
        "next_modules" : ["osd"],
        "parallelism" : 1,
        "max_input_queue_size" : 10,
        "custom_params" : {
            "model_path" : "../../../data/models/yolov3_v0.13.0_4b_rgb_uint8.magicmind",
            "preproc"  : "name=PreprocYolov3;use_cpu=false",
            "postproc" : "name=PostprocYolov3;threshold=0.5",
            "batch_timeout" : 200,
            "engine_num" : 4,
            "model_input_pixel_format" : "RGB24",
            "device_id" : 0
        }
    }
    "osd" : {
        "class_name" : "cnstream::Osd",
        "next_modules" : ["venc"],
        "parallelism" : 1,
        "max_input_queue_size" : 10,
        "custom_params" : {
            "label_path" : "../../../../data/models/label_map_coco.txt"
        }
    },
    "venc" : {
        "class_name" : "cnstream::VEncode",
        "parallelism" : 1,
        "max_input_queue_size" : 10,
        "custom_params" : {
            "rtsp_port" : 9554,
            "device_id": 0
        }
    }}

其他四个部分为pipeline的组成模块,起始模块source,class_name为模块类名,这里用到反射机制,所以我们可以知道source是一个DataSource模块,负责获取输入流并解码。next_modules指向本模块的下游模块,即detector。custom_params则是每个模块的自定义参数,例如device_id代表设备号。

detector的class_name是Inferencer即推理,parallelism代表模块的并行度,但由于DataSource模块为起始模块不由pipeline调度,DataSource模块无需设置该参数。detector指向下游模块osd做标签叠加。osd又指向下游模块venc做编码。

使用CNStream提供的接口pipeline::BuildPipelineByJSONFile,传入这个Json配置文件,将得到一条目标检测pipeline。

2 EasyDK 模块

CNStream 子仓 EasyDK(Cambricon Easy Development Kit)提供了一套面向寒武纪硬件设备的接口,用于面向寒武纪硬件平台快速开发和部署人工智能应用。

EasyDK支持如下特性:

  • BufSurface:描述及管理buffers

  • Decode:视频与图片的硬件解码和缩放

  • Encode:视频与图片的硬件编码和缩放

  • Transform:基于CNCV(寒武纪计算机视觉库)的图片转换,包括:

    • YUV420spToRGBx, Resize, ROI, (可选 MeanStd)

    • RGBxToYuv420sp, ROI

    • Yuv420spResize, ROI

    • MeanStd

  • InferServer:提供了一套类似服务器的推理接口,以及模型加载与管理,推理任务调度等功能。

    • 支持同步异步请求推理

    • 支持自定义前处理,后处理。可使用CnedkTransform接口加速前处理

    • 支持加载可变模型,但对于输入可变模型,需要在加载模型时给定输入形状

    • 不支持绑核

CNStream对EasyDK的依赖:

  • CNStream中的图片数据帧保存在BufSurface中

  • DataSource模块基于Decode

  • VEncode模块基于Encode

  • Inference模块基于InferServer

  • 预处理示例基于Transform

2、框架

CNStream 基于模块化和流水线的设计思想,模块之间相互连接形Pipeline,并使用事件总线实现事件监听。下图是一个pipeline示例,Module之间相互连接形成pipeline。支持分叉和汇聚的情况。使用离线模型,直接运行加速后的MLU机器指令。cnstream支持多种并行模式,拥有高效的数据处理能力。



1 框架简介

CNStream分为三个层次:应用、模块库和框架



  • 应用app主要实现了业务流程的组装。可以通过json配置文件搭建pipeline。

  • 模块库,cnstream提供的内置模块有以下几个,我们将在后续章节详细展开。

  • 框架层,主要有两大机制,pipeline和通信。pipeline实现了,pipeline搭建,module调度执行,模块间数据传递等。通信机制主要有,EventBus,StreamMsgObserver和IModuleObserver。分别实现了模块和pipeline,pipeline和app以及模块和app之间的通信。

2 框架核心成员

  • 模块的基类module。内置模块全部继承于module基类。

  • pipeline类主要功能时管理模块,调度执行任务以及模块间的数据传递。

  • connector连接器,是模块之间数据传递的载体。

  • cn info是数据基本单元,包含图像数据及结构化信息等等,模块间传递的数据,既是cn info的智能指针(shared_ptr)。

  • 通信机制:EventBus用于module与pipeline通信; StreamMsgObserver用于pipeline与app通信以及IModuleObserver用于module与app通信。

1)Module

module的功能实现有以下三个方法,Open,Process和Close。



模块的生命周期:

  1. 首先实例化一个模块,然后调用Open函数,初始化模块,加载资源。

  2. 接着重复调用Process函数,处理数据CN Info,直到Close方法被调用,close方法会对加载的资源进行清理。

  3. Process函数是模块的主要处理逻辑实现。它的输入和输出都是cn info的智能指针。以推理inferencer插件为例,输入的是一帧图像数据。经过推理模块的处理后,我们将得到的是图像图像和推理结果,包括检测框的位置,id,置信度等等。这些信息都将被保存在cn info结构体中。

2)Pipeline

pipeline的主要功能有搭建pipeline,调度执行模块和传递数据。

① 搭建



以图为例,我们搭建一条简单的pipeline,模块a,b,c依次连接。对于一份数据来说,数据将依次被每个模块处理。对于一个模块来说,模块将依次处理上有connector中的数据,处理结束后将数据推到下游connector

注意,pipeline最大支持64个模块。

② 调度执行

模块并行度 parallelism。以下图的小例子说明,左边只开启一个窗口,有很多人在排队。这时候如果我多开设一个窗口,那么排队的人数就会减半,大大提高了效率。所以简单的来说我们可以通过给每个模块设置并行度,启动多个TaskLoop,并行的执行任务,来提高效率。



3)Connector

  • 各个模块之间的连接器,是传输数据的载体

  • Connector由Conveyor组成,Conveyor的个数等于下游模块的并行度

  • 分配策略:conveyor_idx = stream_idx % parallelism

Conveyor 的个数等于下游模块的并行度,数据的分发策略是stream_idx % parallelism。可以理解为一个线程安全的 queue。通过设置 queue深度,可平衡 Pipeline中各模块间的数据流量。

  • 上游插件处理速度快,queue会被推满,上游插件数据将被阻塞

  • 下游插件处理速度快,queue会被弹空,下游插件将获得不到数据



4)CN Info

模块间传递的数据是cn info的智能指针。主要需要关注的一个字段collection,支持保存用户自定义的任意数据类型。CNStream针对智能视频分析场景专⻔定义了CNData 用于保存视频帧数据,和CNInfer 用于保存神经网络推理结果。

struct CN Info {
  …
  std::string stream_id;
  int64_t timestamp;
  Collection collection;
  …};int value = 1;info.collection.Add("my_tag", value);int result = info.collection.Get<int>("my_tag");

5)EventBus

模块与pipeline通信的总线EventBus,用户层不感知。



在pipeline启动时,会起一个线程EventLoop,轮询queue中的event。当有模块向queue中推event时,EventLoop会获取到event,并通知watchers。

6)StreamMsgObserver

StreamMsgObserver实现了pipeline与app的通信。



在pipeline启动时,会起一个线程StreamMsgHandleFunc,轮询queue中的message。pipeline将msg推到stream msg queue中,StreamMsgHandleFunc拿到message后,通知app。应用层需要定义类继承自 StreamMsgObserver,并重载 Update 接口,用于接收和处理 pipeline 发送给应用的信息,包括 error,eos 等。

7)IModuleObserver

IModuleObserver实现了module与app的通信。

class MyObserver : public IModuleObserver {
 public:
  void Notify(cnstream::CN InfoPtr data) override {
    // process data ...  }};Module module_a("module_a");MyObserver my_observer;module_a.SetObserver(&my_observer);
  • 首先需要定义一个 Observer,继承自 IModuleObserver,并 override 基类的 notify 函数,实现拿到数据之后的处理逻辑。然后,通过 module 的 SetObserver 接口设置 observer。

  • 每帧数据被模块处理完后,pipeline调用模块的NotifyObserver接口,

  • 传递cn info的智能指针给observer。

主要的使用场景是当应用层需要获取 CN InfoPtr 时,例如在推理模块后获取推理结果。

3、内置模块

针对常规的视频结构化领域, CNStream提供了以下核心功能模块:

  • 数据源模块 DataSource

  • 推理模块 Inferencer

  • 追踪模块 Tracker

  • 叠加推理结果模块 Osd(On-Screen Display)

  • 编码模块 VEncode

  • Kafka模块 Kafka

DataSource为起始模块,VEncode和kafka模块则为sink模块一般为pipeline的最后一个模块。

1 DataSource

  • DataSource模块是起始模块,是数据的输入

  • 内置decoder,支持硬解码(硬解码不占用推理性能)

  • 解码格式: JPEG, H.264, H.265

  • 支持获取多种形式的资源,本地文件,网络流,内存数据裸流等

  • 支持对解码后的数据进行缩放

  • 支持添加多路数据流到一个DataSource中

  • 支持动态添加和移除数据流

  • 每一路数据流需要由用户指定唯一标识stream_id (string)

  • 模块内部会为每路数据流启动一个线程

  • DataSource模块没有输入队列;不由Pipeline调度执行;由模块本身负责向下游模块传递数据

1)添加输入流的方式:

  • 创建 SourceHandler:不同形式的资源需要创建不同的 handler

  • 调用 DataSource::AddSource接口:传入创建好的 handler

2)移除输入流的方式:

  • 调用 DataSource::RemoveSource接口传入 handler 或 stream_id

  • 支持快速移除,不再处理 Pipeline 中未处理完的帧

  • 调用 DataSource::RemoveSources接口移除所有流

Pipeline pipeline("my_pipeline");// build pipeline with DataSource which name is source ...std::string source_name = "source"; cnstream::DataSource* source =
    dynamic_cast<cnstream::DataSource*>(pipeline.GetModule(source_name));std::string stream_id = "stream_0";cnstream::FileSourceParam param;param.filename = "your_video_path";param. rate = 25;// resize after decoding (optional)cnstream::Resolution out_resolution;out_resolution.width = 1920;out_resolution.height = 1080;param.out_res = out_resolution;source->AddSource(cnstream::CreateSource(source, stream_id, param));// force remove sourcebool force = true;source->RemoveSource(stream_id, force);

上面为添加及移除一路本地文件的流程代码。实例化一个pipeline,build pipeline,然后获取DataSource模块指针。定义一个FileSourceParam,然后通过该param设置文件url, rate输入帧率,out_resolution输出分辨率等。然后通过CreateSource接口,创建SourceHandler。并调用AddSource传入handler。完成添加。也可以调用RemoveSource接口移除。force=true代表快速移除。

当设置out_resolution参数时,解码后的图片将被缩放至该分辨率。同时,将使用内存池的方式去保存缩放后的图片数据。如未设置out_resolution参数,则在DataSource中为每一帧数据申请一份内存,并在pipeline处理完毕后释放。

3)DataSource典型配置参数

我们可以通过json文件配置source参数。

{
    "source" : {
        "class_name" : "cnstream::DataSource",
        "custom_params" : {
            "bufpool_size" : 16,
            "interval" : 1,
            "device_id" : 0
        }
    }}
  • DataSource 的自定义参数有:bufpool_size,存放解码后的数据的内存池大小。注意仅当创建 handler时,设置 out_res 参数后生效。若未设置 out_res 则不使用内存池。

  • interval,例如我们设置3,则每3帧数据,只处理其中一帧。

  • device_id,设备号。

2 Inferencer

1) Inferencer 介绍

推理模块基于EasyDK的推理服务实现推理功能,主要包括三个部分,前处理,推理和后处理。支持常见检测网络,分类网络等神经网络推理,支持二级网络推理。另外,可自定义前处理和后处理,使用反射机制,通过json配置类名便可以使用自定义的前后处理。二级网络推理时,还支持自定义过滤条件,比如只对一级检测出的车目标进行二级推理,同样使用反射机制。

高效

  • 并行执行前处理,推理和后处理

  • 使用离线模型进行推理

  • 支持batch推理

灵活

  • 可自定义前处理和后处理

  • 二级网络推理支持自定义过滤条件,比如只对车进行推理

2) Inferencer 配置

{
    "detector" : {
        "class_name" : "cnstream::Inferencer",
        "parallelism" : 1,
        "max_input_queue_size" : 20,
        "custom_params" : {
            "model_path" : "../../../data/models/yolov3_v0.10.1_4b_rgb_uint8.magicmind",
            "preproc"  : "name=PreprocYolov3;use_cpu=false",
            "postproc" : "name=PostprocYolov3;threshold=0.5",
            "batch_timeout" : 200,
            "engine_num" : 4,
            "model_input_pixel_format" : "RGB24",
            "device_id" : 0
        }
    }}

推理的一些自定义参数,比如离线模型路径,前处理和后处理的类名字符串,batch_timeout,推理引擎,模型输入颜色空间以及设备号。batch_timeout,拼batch时最大等待时间,也就是说,当超过batch_timeout时间还没有拼满,则不再等待,直接进行推理。当这个数值设置的过大时,可能会造成延时的增加,设置的过小时,可能会拼不满batch导致硬件资源浪费。数值的设置一般来说和batch_size,解码和推理的速度有关。

3 Tracker

追踪模块一般连接在刚讲的推理模块之后,对检测到的目标进行追踪。

具体而言:追踪一般分两步,特征提取+特征匹配。特征提取是离线网络完成的,特征匹配的实现有两种算法,cnstream支持Feature Match 和 IOU Match 。默认使用 FeatureMatch算法

{
    "tracker" : {
        "class_name" : "cnstream::Tracker",
        "parallelism" : 1,
        "max_input_queue_size" : 20,
        "custom_params" : {
            "model_path" : "../../../data/models/feature_extract_v0.10.1_4b_rgb_uint8.magicmind",
            "track_name": "FeatureMatch",
            "max_cosine_distance": "0.06",
            "engine_num" : 2,
            "device_id" : 0
        }
    }}

上面是一个tracker的json配置,使用过程中,我们使用 feature_extract_v0.10.1_4b_rgb_uint8.magicmind 该离线模型进行特征提取,FeatureMatch 进行特征匹配。 当然也可以根据自己实际场景进行替换。

4 Osd

将推理结果叠加在原图上显示。支持如下:

  • 分类网络:类别,置信度

  • 检测网络:检测框,类别,置信度

  • 追踪 :追踪id

  • 支持叠加次级网络结果

  • 支持自定义OsdHandler

  • (可选) 可配置label大小

  • (可选) 可配置字体

{
    "osd" : {
        "class_name" : "cnstream::Osd",
        "parallelism" : 1,
        "max_input_queue_size" : 20,
        "custom_params" : {
            "label_path" : "../../../../data/models/label_map_coco.txt"
        }
    }}

5 VEncode

VEncode模块支持软编码和硬编码。支持的编码格式有jpeg,h264和h265。支持对输入进行缩放后编码,通过自定义参数dst_width和dst_height设置dst分辨率,默认使用原始分辨率。

对于编码后的数据,支持保存到本地文件,支持封装mp4。需要用户通过自定义参数file_name设定要保存的文件的名字,同时模块将根据文件名来区分编码格式和是否需要封装为mp4。如右侧两个json配置,一个将编码mp4视频,另一个则编码jpeg图片。

除此之外,VEncode还支持RTSP推流。需要用户设置自定义参数rtsp_port。推流url默认为rtsp://本机ip:端口号/live。

支持单图模式和拼图模式。默认单图模式。

{
    "venc" : {
        "class_name" : "cnstream::VEncode",
        "parallelism" : 1,
        "max_input_queue_size" : 10,
        "custom_params" : {
            "file_name" : "output/output.mp4",
            "view_cols" : 3,
            "view_rows" : 3,
            "device_id": 0
        }
    }}

上述第7行换成以下形式,可以使用 jpeg或推流形式。

            "file_name" : "output/output.jpg",
            "rtsp_port" : 9554,

当我们希望使用拼图模式时,设置自定义参数view_cols和view_rows,他们代表格子的行列个数。注意格子数目一定要大于输入路数。



6 Kafka

Kafka模块支持发送kafka消息数据到集群中供消费者消费。支持自定义 KafkaHandler ,根据业务需求将需要的数据转换为 Kafka 消息数据。例如结构化信息等。

{
    "kafka_producer" : {
        "class_name" : "cnstream::Kafka",
        "parallelism" : 1,
        "max_input_queue_size" : 20,
        "custom_params" : {
            "handler" : “DefaultKafkaHandler",
            "topic" : "CnstreamData",
            "brokers" : "localhost:9092"
        }
    }}


   {
    "StreamName": "stream_0",
    " Count": 119,
    " s": [
        {
            "Label": "2",
            "Score": 0.94384765625,
            "BBox": [
                0.8779296875,
                0.4668387770652771,
                0.12060546875,
                0.5147867798805237
            ]
        },
        {
            "Label": "2",
            "Score": 0.62890625,
            "BBox": [
                0.1103515625,
                0.8813707232475281,
                0.284423828125,
                0.11862927675247193
            ]
        }
    ]}

4、工具

1 Inspect工具

inspect tool可以用来获得CNStream的版本信息,查看支持的内置模块以及查询每个模块支持的自定义参数。



  • 提供 Pipeline各部分的 时延和吞吐 信息

2 性能统计

  • 性能统计通过在 Pipeline中各个处理过程的开始和结束点上打桩,记录打桩时间点,并基于打桩时间点来计算时延、吞吐等信息。

  • 性能统计机制以某个处理过程为对象进行性能统计。一个处理过程可以是一个函数调用、一段代码,或是 Pipeline中两个处理节点之间的过程。在 CNStream的性能统计机制中,每个处理过程通过字符串进行映射。

  • 对于每个 Pipeline实例,通过创建一个 PipelineProfiler实例来管理该 Pipeline 中性能统计的运作。通过 PipelineProfiler实例为 Pipeline中的每个模块创建一个 ModuleProfiler实例来管理模块中性能统计。并通过 ModuleProfiler的 RegisterProcessName接口注册需要进行性能统计的处理过程,使用 ModuleProfiler::RecordProcessStart和 ModuleProfiler::RecordProcessEnd在各处理过程的开始和结束时间点打桩,供 CNStream进行性能统计。

字段名称描述
completed表示已经处理完毕的数据总量,不包括丢弃的数据帧。
dropped表示被丢弃的数据总量。当某一个数据被记录开始后,较之更晚记录开始的16个数据已记录到结束,则视为该数据帧已经丢弃。
counter表示统计到的对应处理过程已经处理完毕的数据的总量。被丢弃的数据也视为处理完毕的数据,会被累加在到counter上。 counter = completed + dropped
ongoing表示正在处理,但是未被处理完毕的数据总量。即已经记录到开始时间 但是未记录到结束时间的数据总量。
latency平均时延,单位为毫秒。
maximum_latency最大处理时延,单位为毫秒。
minimum_latency最小处理时延,单位为毫秒。
fps平均吞吐速度,单位为帧/秒。

CNStream 提供的示例中,可通过设置 perf_level 参数,控制性能打印的详细程度。0到3,打印的数据越来越详细,默认值为0:

  • 当 perf_level为0时,只打印各处理过程的 counter统计值与 fps(吞吐)统计值。

  • 当 perf_level为1时,在0的基础上加上 latency、maximum_latency、 minimum_latency三个统计值的打印。

  • 当 perf_level为2时,打印 ProcessProfile结构中的所有性能统计值。

  • 当 perf_level为3时,在2的基础上打印每路数据流的性能统计数据。

平均性能

当我们执行示例时,将在终端上打印性能数据,如图。Whole代表从程序启动以来的平均性能。提供各模块的性能和pipeline的整体性能。


SyntaxHighlighter.all();

版权所有 © 2024 寒武纪 Cambricon.com 备案/许可证号:京ICP备17003415号-1