标签:stream and def ice ali 比较 key rda com
服务端做心跳机制:服务端长时间没有反应,使用心跳机制,证明服务端还存在
思考点:
notify := w.(http.CloseNotifier).CloseNotify()
// log.Println("notify:",<- notify) 会直接堵住的,因为notify它接收连接中断信号
go func(){
// 太迷了,正确想法就是:只能接收异常的信号,就是网络中断的信号
fmt.Println("接收连接中断信号")
<-notify
userData[r.RemoteAddr] = r.RemoteAddr
offUser <- r.RemoteAddr
log.Println(r.RemoteAddr,"just close")
}()
// 接收发送给客户端数据
type RW struct{
Rw http.ResponseWriter
T time.Time
}
var rw = make(map[int64]*RW)
// 考虑使用map。记得当正确的数据发送给客户端之后要将对应的map键值删除
delete(rw,a) // 当发送完之后,就要将这个客户端删除了。a时键值
// 保活,心跳
go func(){
defer func(){
if err := recover();err!=nil{
fmt.Println(err)
}
}()
fmt.Println("开启保活")
keepAliveInterval := time.Duration(6000)
fmt.Println(keepAliveInterval)
ticker := time.NewTicker(3*time.Second)
for {
select{
case <-ticker.C:
fmt.Println("保活,心跳机制")
t1 := time.Now()
for _,value:= range rw{
fmt.Println(value)
if t1.Sub(value.T)>keepAliveInterval{
fmt.Println("进入保活")
f,ok:=value.Rw.(http.Flusher)
if !ok{
fmt.Fprintf(value.Rw,"不能用来做sse")
return
}
fmt.Fprintf(value.Rw,"data:请耐心等待,我正在努力的加载数据\n\n")
f.Flush()
}
}
}
}
}()
package main
import(
"fmt"
"log"
"time"
"sync"
"net/http"
)
// 接收发送给客户端数据
type RW struct{
Rw http.ResponseWriter
T time.Time
}
var offUser = make(chan string,0)
var userData = make(map[string]string)
var rw = make(map[int64]*RW)
var i int64 = 0
var lock sync.Mutex
func init(){
log.SetFlags(log.Ltime|log.Lshortfile)
}
func sseService(w http.ResponseWriter,r *http.Request){
var a int64 // 用来接收key值
defer func(){
if err := recover();err!=nil{
fmt.Println(err)
}
}()
lock.Lock()
i++
a=i
lock.Unlock()
// 提取get请求参数
fmt.Println("a =",a)
f,ok := w.(http.Flusher)
if !ok{
http.Error(w,"cannot support sse",http.StatusInternalServerError)
return
}
// 用于监听客户端时候已经断开了连接
notify := w.(http.CloseNotifier).CloseNotify()
// log.Println("notify:",<- notify) 会直接堵住的,因为notify它接收网络中断信号
go func(){
fmt.Println("接收关闭信号")
<-notify
offUser <- r.RemoteAddr
log.Println(r.RemoteAddr,"just close")
}()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Access-Control-Allow-Origin","*")
fmt.Fprintf(w,"data:welcome\n\n")
f.Flush()
// 将当前的w保存
fmt.Println("心跳")
t := time.Now()
rr := &RW{Rw:w,T:t}
fmt.Println("rr =",rr)
rw[a] = rr
// 模拟服务端接收发送数据阻塞
fmt.Println("模拟服务端发送数据阻塞")
time.Sleep(time.Second*30)
fmt.Fprintf(w,"data:12345加油\n\n")
f.Flush()
delete(rw,a) // 当发送完之后,就要将这个客户端删除了
}
func testClose(w http.ResponseWriter,r *http.Request){
fmt.Println("remoteAddr:",r.RemoteAddr)
fmt.Println("userData:",userData)
// 用于监听客户端时候已经断开了连接
notify := w.(http.CloseNotifier).CloseNotify()
go func(){
fmt.Println("接收连接中断信号")
<-notify
userData[r.RemoteAddr] = r.RemoteAddr
offUser <- r.RemoteAddr
log.Println(r.RemoteAddr,"just close")
}()
time.Sleep(time.Second*1)
fmt.Fprintln(w,"这里任意数字")
}
func main(){
fmt.Println("sse1")
// 获取中断的客户端
go func(){
fmt.Println("监听关闭的客户端")
for{
select{
case user:=<-offUser:
log.Println("userOff:",user)
}
}
}()
// 保活,心跳
go func(){
defer func(){
if err := recover();err!=nil{
fmt.Println(err)
}
}()
fmt.Println("开启保活")
keepAliveInterval := time.Duration(6000)
fmt.Println(keepAliveInterval)
ticker := time.NewTicker(3*time.Second)
for {
select{
case <-ticker.C:
fmt.Println("保活,心跳机制")
t1 := time.Now()
for _,value:= range rw{
fmt.Println(value)
if t1.Sub(value.T)>keepAliveInterval{
fmt.Println("进入保活")
f,ok:=value.Rw.(http.Flusher)
if !ok{
fmt.Fprintf(value.Rw,"不能用来做sse")
return
}
fmt.Fprintf(value.Rw,"data:请耐心等待,我正在努力的加载数据\n\n")
f.Flush()
}
}
}
}
}()
http.HandleFunc("/sse",sseService)
http.HandleFunc("/testClose",testClose)
http.ListenAndServe(":8080",nil)
}
sse(){
let that = this
if ("EventSource" in window){
console.log("可以使用EventSource")
}else{
return
}
var url = "http://localhost:8080/sse?pid="+12345
var es = new EventSource(url)
// 监听事件
// 连接事件
es.onopen = function(e:any){
console.log("我进来啦")
console.log(e)
}
// message事件
es.onmessage = function(e){
that.Data = e.data
if (e.data=="12345加油"){ // 后端通知前端结束发送信息
console.log("12345加油,这是服务端正确想发送的数据")
es.close()
}else{
console.log(e.data)
}
}
es.addEventListener("error",(e:any)=>{
// 这里的e要声明变量,否则回报没有readyState属性
console.log("e.target",e.target)
console.log("SSEERROR:",e.target.readyState)
if(e.target.readyState == 0){
// 重连
console.log("Reconnecting...")
es.close() // 不开启服务端,直接关闭
}
if(e.target.readyState==2){
// 放弃
console.log("give up.")
}
},false);
}
myTest = setInterval(()=>{
var i:number = 1
console.log("轮询还是心跳")
if(i===4){
return
}
i++
},1500) // 一旦实例化,就会直接运行
test(){
clearInterval(this.myTest) // 清除重复运行函数
}
keepAliveInterval := time.Duration(3)
// 打印数据值
3ns
ticker := time.NewTicker(500 * time.Millisecond)
done := make(chan bool)
go func(){
for{
select{
case <-done:
return
case t := <-ticker.C: // 500微秒轮询一次
fmt.Println("Tick at",t)
}
}
}()
time.Sleep(10*time.Second)
ticker.Stop()
done<-true
fmt.Println("ticker stopper")
golang+sse+angular的心跳机制、angullar的轮询机制、time.Duration和time.NewTicker的学习
标签:stream and def ice ali 比较 key rda com
原文地址:https://www.cnblogs.com/MyUniverse/p/11746159.html