hpds_node/example/video_streaming/ap/main.go

167 lines
3.7 KiB
Go
Raw Permalink Normal View History

package main
import (
"bytes"
"crypto/sha1"
"encoding/json"
"fmt"
"git.hpds.cc/Component/network/log"
"git.hpds.cc/pavement/hpds_node"
"github.com/disintegration/imaging"
ffmpeg "github.com/u2takey/ffmpeg-go"
"io"
"os"
"strconv"
"time"
)
func main() {
// connect to HPDS-MQ.
ap := hpds_node.NewAccessPoint(
"hpds-ap",
hpds_node.WithMqAddr("localhost:27188"),
hpds_node.WithCredential("token:z1"),
)
err := ap.Connect()
if err != nil {
log.Printf("[AccessPoint] Emit the data to HPDS-MQ failure with err: %v", err)
return
}
//defer ap.Close()
ap.SetDataTag(0x10)
//获取本地livego的直播流
send := func(id int, img []byte) {
if len(img) > 0 {
//log.Printf("send img %s", HexToString(img))
str := fmt.Sprintf("%d", time.Now().UnixNano()/1e6)
tick := fmt.Sprintf("%d_%s", id, time.Now().Format("20060102150405")+str[10:])
go WriteFile(fmt.Sprintf("./%s.jpeg", tick), img)
n, err := ap.Write(img)
if err != nil {
log.Printf("Send image-%v to mq failure with err: %v", id, err)
} else {
log.Printf("Send image-frame-%v to mq, n: %d, hash=%s, img_size=%v", id, n, genSha1(img), len(img))
}
}
//time.Sleep(1 * time.Millisecond)
}
video := VideoImage{}
ffStream := ffmpeg.Input("rtmp://ns8.indexforce.com/home/mystream")
//i := 0
//quit := make(chan int)
tick := time.NewTicker(time.Millisecond * 42)
defer tick.Stop()
go func() {
i := 0
for range tick.C {
if i%24 == 0 {
img, err := video.ExtractImageBytes(ffStream, i)
if err != nil {
fmt.Printf("ExtractImage64 error: %v\n", err)
}
fmt.Printf("i ========== %d \n", i)
send(i, img)
//log.Debugf("send img %#v", img)
}
i++
}
}()
select {}
//ch := make(chan int, 1)
//
//select {
//case <-quit:
// fmt.Println("quit")
// close(ch)
// return
//case <-tick.C:
// if i%24 == 0 {
// img, err := video.ExtractImageBytes(ffStream, i)
// if err != nil {
// fmt.Printf("ExtractImage64 error: %v\n", err)
// }
// fmt.Printf("i ========== %d \n", i)
// send(i, img)
// //log.Debugf("send img %#v", img)
// }
// i++
//}
}
func genSha1(buf []byte) string {
h := sha1.New()
h.Write(buf)
return fmt.Sprintf("%x", h.Sum(nil))
}
func WriteFile(fn string, data []byte) {
err := os.WriteFile(fn, data, 777)
if err != nil {
log.Printf("write file error =%v \n", err)
}
}
type VideoImage struct {
}
func (v *VideoImage) ExtractImageBytes(stream *ffmpeg.Stream, frameNum int) ([]byte, error) {
reader := v.extractImage(stream, frameNum)
img, err := imaging.Decode(reader)
if err != nil {
return nil, err
}
imgBuf := new(bytes.Buffer)
err = imaging.Encode(imgBuf, img, imaging.JPEG)
if err != nil {
return nil, err
}
return imgBuf.Bytes(), nil
}
func (v *VideoImage) extractImage(stream *ffmpeg.Stream, frameNum int) io.Reader {
buf := bytes.NewBuffer(nil)
err := stream.
Filter("select", ffmpeg.Args{fmt.Sprintf("gte(n,%d)", frameNum)}).
Output("pipe:", ffmpeg.KwArgs{"vframes": 1, "format": "image2", "vcodec": "mjpeg"}).
//WithOutput(buf, os.Stdout).
WithOutput(buf, nil).
Run()
if err != nil {
log.Errorf("stream error %s", err)
panic(err)
}
return buf
}
func (v *VideoImage) GetFrameCount(inFileName string) (int, error) {
data, _ := ffmpeg.Probe(inFileName)
var m map[string]interface{}
err := json.Unmarshal([]byte(data), &m)
if err != nil {
return 0, err
}
var strInt string
items := m["streams"].([]interface{})
for _, item := range items {
v := item.(map[string]interface{})
if v["profile"] == "Main" || v["profile"] == "High" {
strInt = v["nb_frames"].(string)
break
}
}
if len(strInt) == 0 {
return 0, fmt.Errorf("not find profile(Main).nb_frames")
}
num, err := strconv.Atoi(strInt)
if err != nil {
return 0, nil
}
return num, nil
}