From 6717831eb6b87154260dffea517ac092fe60b67c Mon Sep 17 00:00:00 2001 From: wangjian Date: Tue, 18 Oct 2022 14:41:01 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81add=20video=20stream=20demo=202?= =?UTF-8?q?=E3=80=81add=20config=20read=20string=20function?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/workflow.go | 19 +++++ example/video_streaming/ap/main.go | 146 ++++++++++++++++++++++++++++++++++ example/video_streaming/mq1/main.go | 33 ++++++++ example/video_streaming/mq1/mq_1.yaml | 5 ++ example/video_streaming/mq2/main.go | 28 +++++++ example/video_streaming/mq2/mq_2.yaml | 5 ++ example/video_streaming/sf/main.go | 78 ++++++++++++++++++ go.mod | 6 ++ 8 files changed, 320 insertions(+) create mode 100644 example/video_streaming/ap/main.go create mode 100644 example/video_streaming/mq1/main.go create mode 100644 example/video_streaming/mq1/mq_1.yaml create mode 100644 example/video_streaming/mq2/main.go create mode 100644 example/video_streaming/mq2/mq_2.yaml create mode 100644 example/video_streaming/sf/main.go diff --git a/config/workflow.go b/config/workflow.go index 1bf8ed6..4e5916c 100644 --- a/config/workflow.go +++ b/config/workflow.go @@ -43,6 +43,10 @@ func LoadWorkflowConfig(path string) (*WorkflowConfig, error) { return load(buffer) } +func LoadWorkflowConfigForString(cfg string) (*WorkflowConfig, error) { + return load([]byte(cfg)) +} + func load(data []byte) (*WorkflowConfig, error) { var config = &WorkflowConfig{} err := yaml.Unmarshal(data, config) @@ -73,6 +77,21 @@ func ParseWorkflowConfig(config string) (*WorkflowConfig, error) { return wfConf, nil } +func ParseWorkflowConfigContent(config string) (*WorkflowConfig, error) { + wfConf, err := LoadWorkflowConfigForString(config) + if err != nil { + return nil, err + } + + // validate + err = validateWorkflowConfig(wfConf) + if err != nil { + return nil, err + } + + return wfConf, nil +} + func validateWorkflowConfig(wfConf *WorkflowConfig) error { if wfConf == nil { return errors.New("workflow: config nil") diff --git a/example/video_streaming/ap/main.go b/example/video_streaming/ap/main.go new file mode 100644 index 0000000..6b7c4ea --- /dev/null +++ b/example/video_streaming/ap/main.go @@ -0,0 +1,146 @@ +package main + +import ( + "bytes" + "crypto/sha1" + "encoding/json" + "fmt" + "git.hpds.cc/Component/network/log" + "git.hpds.cc/pavement/hpds_node" + "io" + "os" + "strconv" + "time" + + "github.com/disintegration/imaging" + ffmpeg "github.com/u2takey/ffmpeg-go" +) + +func main() { + // connect to HPDS-MQ. + ap := hpds_node.NewAccessPoint( + "hpds-ap", + hpds_node.WithMqAddr("localhost:27188"), + hpds_node.WithCredential("token:z1"), + ) + err := ap.Connect() + if err != nil { + log.Printf("[AccessPoint] Emit the data to HPDS-MQ failure with err: %v", err) + return + } + //defer ap.Close() + + ap.SetDataTag(0x10) + + //获取本地livego的直播流 + send := func(id int, img []byte) { + if len(img) > 0 { + //log.Printf("send img %s", HexToString(img)) + str := fmt.Sprintf("%d", time.Now().UnixNano()/1e6) + tick := time.Now().Format("20060102150405") + str[10:] + go WriteFile(fmt.Sprintf("./%s.jpeg", tick), img) + n, err := ap.Write(img) + if err != nil { + log.Printf("Send image-%v to mq failure with err: %v", id, err) + } else { + log.Printf("Send image-frame-%v to mq, n: %d, hash=%s, img_size=%v", id, n, genSha1(img), len(img)) + } + } + //time.Sleep(1 * time.Millisecond) + } + video := VideoImage{} + ffStream := ffmpeg.Input("rtmp://ns8.indexforce.com/home/mystream") + i := 0 + quit := make(chan int) + //tick := time.Tick(time.Second) + select { + case <-quit: + fmt.Println("quit") + return + case <-time.After(time.Millisecond * 42): + if i%24 == 0 { + img, err := video.ExtractImageBytes(ffStream, 1) + if err != nil { + fmt.Printf("ExtractImage64 error: %v\n", err) + } + send(i, img) + //log.Debugf("send img %#v", img) + } + i++ + } +} + +func genSha1(buf []byte) string { + h := sha1.New() + h.Write(buf) + return fmt.Sprintf("%x", h.Sum(nil)) +} + +func WriteFile(fn string, data []byte) { + err := os.WriteFile(fn, data, 777) + if err != nil { + log.Printf("write file error =%v \n", err) + } +} + +type VideoImage struct { +} + +func (v *VideoImage) ExtractImageBytes(stream *ffmpeg.Stream, frameNum int) ([]byte, error) { + reader := v.extractImage(stream, frameNum) + img, err := imaging.Decode(reader) + if err != nil { + return nil, err + } + imgBuf := new(bytes.Buffer) + err = imaging.Encode(imgBuf, img, imaging.JPEG) + if err != nil { + return nil, err + } + return imgBuf.Bytes(), nil +} + +func (v *VideoImage) extractImage(stream *ffmpeg.Stream, frameNum int) io.Reader { + buf := bytes.NewBuffer(nil) + err := stream. + Filter("select", ffmpeg.Args{fmt.Sprintf("gte(n,%d)", frameNum)}). + Output("pipe:", ffmpeg.KwArgs{"vframes": 1, "format": "image2", "vcodec": "mjpeg"}). + //WithOutput(buf, os.Stdout). + WithOutput(buf, nil). + Run() + if err != nil { + log.Errorf("stream error %s", err) + panic(err) + } + return buf +} + +func (v *VideoImage) GetFrameCount(inFileName string) (int, error) { + data, _ := ffmpeg.Probe(inFileName) + var m map[string]interface{} + err := json.Unmarshal([]byte(data), &m) + if err != nil { + return 0, err + } + + var strInt string + items := m["streams"].([]interface{}) + for _, item := range items { + v := item.(map[string]interface{}) + if v["profile"] == "Main" || v["profile"] == "High" { + strInt = v["nb_frames"].(string) + break + } + } + + if len(strInt) == 0 { + return 0, fmt.Errorf("not find profile(Main).nb_frames") + } + + num, err := strconv.Atoi(strInt) + if err != nil { + return 0, nil + } + + return num, nil +} diff --git a/example/video_streaming/mq1/main.go b/example/video_streaming/mq1/main.go new file mode 100644 index 0000000..7e8a8ab --- /dev/null +++ b/example/video_streaming/mq1/main.go @@ -0,0 +1,33 @@ +package main + +import ( + "git.hpds.cc/Component/network/log" + "git.hpds.cc/pavement/hpds_node" + "os" +) + +func main() { + mq, err := hpds_node.NewMq("mq_1.yaml") + if err != nil { + panic(err) + } + mq.InitOptions(hpds_node.WithAuth("token", "z1")) + defer mq.Close() + + // add Downstream mq + mq.AddDownstreamMq(hpds_node.NewDownstreamMq( + "mq-2", + hpds_node.WithMqAddr("localhost:27187"), + hpds_node.WithCredential("token:z2"), + )) + + // start mq service + log.Printf("Server has started!, pid: %d", os.Getpid()) + go func() { + err := mq.ListenAndServe() + if err != nil { + panic(err) + } + }() + select {} +} diff --git a/example/video_streaming/mq1/mq_1.yaml b/example/video_streaming/mq1/mq_1.yaml new file mode 100644 index 0000000..d983b05 --- /dev/null +++ b/example/video_streaming/mq1/mq_1.yaml @@ -0,0 +1,5 @@ +name: mq-1 +host: 0.0.0.0 +port: 27188 +functions: + - name: echo-sf diff --git a/example/video_streaming/mq2/main.go b/example/video_streaming/mq2/main.go new file mode 100644 index 0000000..a94b19b --- /dev/null +++ b/example/video_streaming/mq2/main.go @@ -0,0 +1,28 @@ +package main + +import ( + "git.hpds.cc/Component/network/log" + "git.hpds.cc/pavement/hpds_node" + "os" +) + +func main() { + mq := hpds_node.NewMqWithOptions( + "mq-2", + hpds_node.WithMqAddr("localhost:27187"), + hpds_node.WithAuth("token", "z2"), + ) + defer mq.Close() + + mq.ConfigWorkflow("mq_2.yaml") + + // start mq service + log.Printf("Server has started!, pid: %d", os.Getpid()) + go func() { + err := mq.ListenAndServe() + if err != nil { + panic(err) + } + }() + select {} +} diff --git a/example/video_streaming/mq2/mq_2.yaml b/example/video_streaming/mq2/mq_2.yaml new file mode 100644 index 0000000..e9dc984 --- /dev/null +++ b/example/video_streaming/mq2/mq_2.yaml @@ -0,0 +1,5 @@ +name: mq-2 +host: 0.0.0.0 +port: 27188 +functions: + - name: echo-sf diff --git a/example/video_streaming/sf/main.go b/example/video_streaming/sf/main.go new file mode 100644 index 0000000..e3076ae --- /dev/null +++ b/example/video_streaming/sf/main.go @@ -0,0 +1,78 @@ +package main + +import ( + "crypto/sha1" + "fmt" + "git.hpds.cc/Component/network/log" + "git.hpds.cc/pavement/hpds_node" + "os" + "sync/atomic" + "time" +) + +const ImageDataKey = 0x10 + +var ( + counter uint64 +) + +func main() { + sf := hpds_node.NewStreamFunction( + "echo-sf", + hpds_node.WithMqAddr("localhost:27188"), + hpds_node.WithObserveDataTags(ImageDataKey), + hpds_node.WithCredential("token:z1"), + ) + defer sf.Close() + + sf.SetHandler(Handler) + + err := sf.Connect() + if err != nil { + log.Printf("Connect to MQ failure: ", err) + os.Exit(1) + } + + select {} +} + +// Handler process the data in the stream +func Handler(img []byte) (byte, []byte) { + // Initialize WasmEdge's VM + //vmConf, vm := initVM() + //bg := bindgen.Instantiate(vm) + //defer bg.Release() + //defer vm.Release() + //defer vmConf.Release() + // + //// recognize the image + //res, err := bg.Execute("infer", img) + //if err == nil { + // fmt.Println("GO: Run bindgen -- infer:", string(res[0].([]byte))) + //} else { + // fmt.Println("GO: Run bindgen -- infer FAILED") + //} + str := fmt.Sprintf("%d", time.Now().UnixNano()/1e6) + tick := time.Now().Format("20060102150405") + str[10:] + go WriteFile(fmt.Sprintf("./%s.jpeg", tick), img) + + // print logs + hash := genSha1(img) + log.Printf("received image-%d hash %v, img_size=%d \n", atomic.AddUint64(&counter, 1), hash, len(img)) + + return 0x11, nil +} + +// genSha1 generate the hash value of the image +func genSha1(buf []byte) string { + h := sha1.New() + h.Write(buf) + return fmt.Sprintf("%x", h.Sum(nil)) +} + +func WriteFile(fn string, data []byte) { + err := os.WriteFile(fn, data, 777) + if err != nil { + log.Printf("write file error =%v \n", err) + } +} diff --git a/go.mod b/go.mod index 973ecc4..020d7b2 100644 --- a/go.mod +++ b/go.mod @@ -4,17 +4,21 @@ go 1.19 require ( git.hpds.cc/Component/network v0.0.0-20221012021659-2433c68452d5 + github.com/disintegration/imaging v1.6.2 github.com/lucas-clemente/quic-go v0.29.1 github.com/stretchr/testify v1.8.0 + github.com/u2takey/ffmpeg-go v0.4.1 gopkg.in/yaml.v3 v3.0.1 ) require ( git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect + github.com/aws/aws-sdk-go v1.38.20 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/golang/mock v1.6.0 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect @@ -23,11 +27,13 @@ require ( github.com/onsi/ginkgo v1.16.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect + github.com/u2takey/go-utils v0.3.1 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.23.0 // indirect golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect + golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect