hpds_node/example/video_streaming/sf/main.go

81 lines
1.7 KiB
Go
Raw Normal View History

package main
import (
"crypto/sha1"
"fmt"
2024-12-13 11:50:56 +08:00
"git.hpds.cc/Component/network/frame"
"git.hpds.cc/Component/network/log"
"git.hpds.cc/pavement/hpds_node"
"os"
"sync/atomic"
"time"
)
const ImageDataKey = 0x10
var (
counter uint64
)
func main() {
sf := hpds_node.NewStreamFunction(
"echo-sf",
hpds_node.WithMqAddr("localhost:27187"),
hpds_node.WithObserveDataTags(ImageDataKey),
hpds_node.WithCredential("token:z1"),
)
defer sf.Close()
2024-12-13 11:50:56 +08:00
_ = sf.SetHandler(Handler)
err := sf.Connect()
if err != nil {
log.Printf("Connect to MQ failure: ", err)
os.Exit(1)
}
select {}
}
// Handler process the data in the stream
2024-12-13 11:50:56 +08:00
func Handler(img []byte) (frame.Tag, []byte) {
// Initialize WasmEdge's VM
//vmConf, vm := initVM()
//bg := bindgen.Instantiate(vm)
//defer bg.Release()
//defer vm.Release()
//defer vmConf.Release()
//
//// recognize the image
//res, err := bg.Execute("infer", img)
//if err == nil {
// fmt.Println("GO: Run bindgen -- infer:", string(res[0].([]byte)))
//} else {
// fmt.Println("GO: Run bindgen -- infer FAILED")
//}
id := atomic.AddUint64(&counter, 1)
str := fmt.Sprintf("%d", time.Now().UnixNano()/1e6)
tick := time.Now().Format("20060102150405") + str[10:]
go WriteFile(fmt.Sprintf("./%d_%s.jpeg", id, tick), img)
// print logs
hash := genSha1(img)
log.Printf("received image-%d hash %v, img_size=%d \n", id, hash, len(img))
return 0x11, nil
}
// genSha1 generate the hash value of the image
func genSha1(buf []byte) string {
h := sha1.New()
h.Write(buf)
return fmt.Sprintf("%x", h.Sum(nil))
}
func WriteFile(fn string, data []byte) {
err := os.WriteFile(fn, data, 777)
if err != nil {
log.Printf("write file error =%v \n", err)
}
}