1、add video stream demo

2、add config read string function
This commit is contained in:
wangjian 2022-10-18 14:41:01 +08:00
parent c86bf21aa7
commit 6717831eb6
8 changed files with 320 additions and 0 deletions

View File

@ -43,6 +43,10 @@ func LoadWorkflowConfig(path string) (*WorkflowConfig, error) {
return load(buffer) return load(buffer)
} }
func LoadWorkflowConfigForString(cfg string) (*WorkflowConfig, error) {
return load([]byte(cfg))
}
func load(data []byte) (*WorkflowConfig, error) { func load(data []byte) (*WorkflowConfig, error) {
var config = &WorkflowConfig{} var config = &WorkflowConfig{}
err := yaml.Unmarshal(data, config) err := yaml.Unmarshal(data, config)
@ -73,6 +77,21 @@ func ParseWorkflowConfig(config string) (*WorkflowConfig, error) {
return wfConf, nil return wfConf, nil
} }
func ParseWorkflowConfigContent(config string) (*WorkflowConfig, error) {
wfConf, err := LoadWorkflowConfigForString(config)
if err != nil {
return nil, err
}
// validate
err = validateWorkflowConfig(wfConf)
if err != nil {
return nil, err
}
return wfConf, nil
}
func validateWorkflowConfig(wfConf *WorkflowConfig) error { func validateWorkflowConfig(wfConf *WorkflowConfig) error {
if wfConf == nil { if wfConf == nil {
return errors.New("workflow: config nil") return errors.New("workflow: config nil")

View File

@ -0,0 +1,146 @@
package main
import (
"bytes"
"crypto/sha1"
"encoding/json"
"fmt"
"git.hpds.cc/Component/network/log"
"git.hpds.cc/pavement/hpds_node"
"io"
"os"
"strconv"
"time"
"github.com/disintegration/imaging"
ffmpeg "github.com/u2takey/ffmpeg-go"
)
func main() {
// connect to HPDS-MQ.
ap := hpds_node.NewAccessPoint(
"hpds-ap",
hpds_node.WithMqAddr("localhost:27188"),
hpds_node.WithCredential("token:z1"),
)
err := ap.Connect()
if err != nil {
log.Printf("[AccessPoint] Emit the data to HPDS-MQ failure with err: %v", err)
return
}
//defer ap.Close()
ap.SetDataTag(0x10)
//获取本地livego的直播流
send := func(id int, img []byte) {
if len(img) > 0 {
//log.Printf("send img %s", HexToString(img))
str := fmt.Sprintf("%d", time.Now().UnixNano()/1e6)
tick := time.Now().Format("20060102150405") + str[10:]
go WriteFile(fmt.Sprintf("./%s.jpeg", tick), img)
n, err := ap.Write(img)
if err != nil {
log.Printf("Send image-%v to mq failure with err: %v", id, err)
} else {
log.Printf("Send image-frame-%v to mq, n: %d, hash=%s, img_size=%v", id, n, genSha1(img), len(img))
}
}
//time.Sleep(1 * time.Millisecond)
}
video := VideoImage{}
ffStream := ffmpeg.Input("rtmp://ns8.indexforce.com/home/mystream")
i := 0
quit := make(chan int)
//tick := time.Tick(time.Second)
select {
case <-quit:
fmt.Println("quit")
return
case <-time.After(time.Millisecond * 42):
if i%24 == 0 {
img, err := video.ExtractImageBytes(ffStream, 1)
if err != nil {
fmt.Printf("ExtractImage64 error: %v\n", err)
}
send(i, img)
//log.Debugf("send img %#v", img)
}
i++
}
}
func genSha1(buf []byte) string {
h := sha1.New()
h.Write(buf)
return fmt.Sprintf("%x", h.Sum(nil))
}
func WriteFile(fn string, data []byte) {
err := os.WriteFile(fn, data, 777)
if err != nil {
log.Printf("write file error =%v \n", err)
}
}
type VideoImage struct {
}
func (v *VideoImage) ExtractImageBytes(stream *ffmpeg.Stream, frameNum int) ([]byte, error) {
reader := v.extractImage(stream, frameNum)
img, err := imaging.Decode(reader)
if err != nil {
return nil, err
}
imgBuf := new(bytes.Buffer)
err = imaging.Encode(imgBuf, img, imaging.JPEG)
if err != nil {
return nil, err
}
return imgBuf.Bytes(), nil
}
func (v *VideoImage) extractImage(stream *ffmpeg.Stream, frameNum int) io.Reader {
buf := bytes.NewBuffer(nil)
err := stream.
Filter("select", ffmpeg.Args{fmt.Sprintf("gte(n,%d)", frameNum)}).
Output("pipe:", ffmpeg.KwArgs{"vframes": 1, "format": "image2", "vcodec": "mjpeg"}).
//WithOutput(buf, os.Stdout).
WithOutput(buf, nil).
Run()
if err != nil {
log.Errorf("stream error %s", err)
panic(err)
}
return buf
}
func (v *VideoImage) GetFrameCount(inFileName string) (int, error) {
data, _ := ffmpeg.Probe(inFileName)
var m map[string]interface{}
err := json.Unmarshal([]byte(data), &m)
if err != nil {
return 0, err
}
var strInt string
items := m["streams"].([]interface{})
for _, item := range items {
v := item.(map[string]interface{})
if v["profile"] == "Main" || v["profile"] == "High" {
strInt = v["nb_frames"].(string)
break
}
}
if len(strInt) == 0 {
return 0, fmt.Errorf("not find profile(Main).nb_frames")
}
num, err := strconv.Atoi(strInt)
if err != nil {
return 0, nil
}
return num, nil
}

View File

@ -0,0 +1,33 @@
package main
import (
"git.hpds.cc/Component/network/log"
"git.hpds.cc/pavement/hpds_node"
"os"
)
func main() {
mq, err := hpds_node.NewMq("mq_1.yaml")
if err != nil {
panic(err)
}
mq.InitOptions(hpds_node.WithAuth("token", "z1"))
defer mq.Close()
// add Downstream mq
mq.AddDownstreamMq(hpds_node.NewDownstreamMq(
"mq-2",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithCredential("token:z2"),
))
// start mq service
log.Printf("Server has started!, pid: %d", os.Getpid())
go func() {
err := mq.ListenAndServe()
if err != nil {
panic(err)
}
}()
select {}
}

View File

@ -0,0 +1,5 @@
name: mq-1
host: 0.0.0.0
port: 27188
functions:
- name: echo-sf

View File

@ -0,0 +1,28 @@
package main
import (
"git.hpds.cc/Component/network/log"
"git.hpds.cc/pavement/hpds_node"
"os"
)
func main() {
mq := hpds_node.NewMqWithOptions(
"mq-2",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithAuth("token", "z2"),
)
defer mq.Close()
mq.ConfigWorkflow("mq_2.yaml")
// start mq service
log.Printf("Server has started!, pid: %d", os.Getpid())
go func() {
err := mq.ListenAndServe()
if err != nil {
panic(err)
}
}()
select {}
}

View File

@ -0,0 +1,5 @@
name: mq-2
host: 0.0.0.0
port: 27188
functions:
- name: echo-sf

View File

@ -0,0 +1,78 @@
package main
import (
"crypto/sha1"
"fmt"
"git.hpds.cc/Component/network/log"
"git.hpds.cc/pavement/hpds_node"
"os"
"sync/atomic"
"time"
)
const ImageDataKey = 0x10
var (
counter uint64
)
func main() {
sf := hpds_node.NewStreamFunction(
"echo-sf",
hpds_node.WithMqAddr("localhost:27188"),
hpds_node.WithObserveDataTags(ImageDataKey),
hpds_node.WithCredential("token:z1"),
)
defer sf.Close()
sf.SetHandler(Handler)
err := sf.Connect()
if err != nil {
log.Printf("Connect to MQ failure: ", err)
os.Exit(1)
}
select {}
}
// Handler process the data in the stream
func Handler(img []byte) (byte, []byte) {
// Initialize WasmEdge's VM
//vmConf, vm := initVM()
//bg := bindgen.Instantiate(vm)
//defer bg.Release()
//defer vm.Release()
//defer vmConf.Release()
//
//// recognize the image
//res, err := bg.Execute("infer", img)
//if err == nil {
// fmt.Println("GO: Run bindgen -- infer:", string(res[0].([]byte)))
//} else {
// fmt.Println("GO: Run bindgen -- infer FAILED")
//}
str := fmt.Sprintf("%d", time.Now().UnixNano()/1e6)
tick := time.Now().Format("20060102150405") + str[10:]
go WriteFile(fmt.Sprintf("./%s.jpeg", tick), img)
// print logs
hash := genSha1(img)
log.Printf("received image-%d hash %v, img_size=%d \n", atomic.AddUint64(&counter, 1), hash, len(img))
return 0x11, nil
}
// genSha1 generate the hash value of the image
func genSha1(buf []byte) string {
h := sha1.New()
h.Write(buf)
return fmt.Sprintf("%x", h.Sum(nil))
}
func WriteFile(fn string, data []byte) {
err := os.WriteFile(fn, data, 777)
if err != nil {
log.Printf("write file error =%v \n", err)
}
}

6
go.mod
View File

@ -4,17 +4,21 @@ go 1.19
require ( require (
git.hpds.cc/Component/network v0.0.0-20221012021659-2433c68452d5 git.hpds.cc/Component/network v0.0.0-20221012021659-2433c68452d5
github.com/disintegration/imaging v1.6.2
github.com/lucas-clemente/quic-go v0.29.1 github.com/lucas-clemente/quic-go v0.29.1
github.com/stretchr/testify v1.8.0 github.com/stretchr/testify v1.8.0
github.com/u2takey/ffmpeg-go v0.4.1
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )
require ( require (
git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect git.hpds.cc/Component/mq_coder v0.0.0-20221010064749-174ae7ae3340 // indirect
github.com/aws/aws-sdk-go v1.38.20 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/golang/mock v1.6.0 // indirect github.com/golang/mock v1.6.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/text v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect
github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect
@ -23,11 +27,13 @@ require (
github.com/onsi/ginkgo v1.16.4 // indirect github.com/onsi/ginkgo v1.16.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/u2takey/go-utils v0.3.1 // indirect
go.uber.org/atomic v1.7.0 // indirect go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.23.0 // indirect go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect