码迷,mamicode.com
首页 > 其他好文 > 详细

Socekt的TCP通信

时间:2015-07-27 09:25:03      阅读:167      评论:0      收藏:0      [点我收藏+]

标签:

本示例讲解如何通过go语言的net包实现TCP通信的。

在服务端创建监听地址,接收发送过来的数据信息,为了解决粘包问题,使用包头+数据的格式,根据包头信息读取到需要分析的数据。形式如下图: 技术分享

golang粘包问题包头定义

从数据流中读取数据的时候,只要根据包头和数据长度就能取到需要的数据。这个其实就是平时说的协议(protocol),只是这个数据传输协议非常 简单,不像tcp、ip等协议有较多的定义。在实际的过程中通常会定义协议类或者协议文件来封装封包和解包的过程。下面代码演示了封包和解包的过程:

 

技术分享
//通讯协议处理,主要处理封包和解包的过程
package protocol

import (
    "bytes"
    "encoding/binary"
)

const (
    ConstHeader         = "www.01happy.com"
    ConstHeaderLength   = 15
    ConstSaveDataLength = 4
)

//封包
func Packet(message []byte) []byte {
    return append(append([]byte(ConstHeader), IntToBytes(len(message))...), message...)
}

//解包
func Unpack(buffer []byte, readerChannel chan []byte) []byte {
    length := len(buffer)

    var i int
    for i = 0; i < length; i = i + 1 {
        if length < i+ConstHeaderLength+ConstSaveDataLength {
            break
        }
        if string(buffer[i:i+ConstHeaderLength]) == ConstHeader {
            messageLength := BytesToInt(buffer[i+ConstHeaderLength : i+ConstHeaderLength+ConstSaveDataLength])
            if length < i+ConstHeaderLength+ConstSaveDataLength+messageLength {
                break
            }
            data := buffer[i+ConstHeaderLength+ConstSaveDataLength : i+ConstHeaderLength+ConstSaveDataLength+messageLength]
            readerChannel <- data

            i += ConstHeaderLength + ConstSaveDataLength + messageLength - 1
        }
    }

    if i == length {
        return make([]byte, 0)
    }
    return buffer[i:]
}

//整形转换成字节
func IntToBytes(n int) []byte {
    x := int32(n)

    bytesBuffer := bytes.NewBuffer([]byte{})
    binary.Write(bytesBuffer, binary.BigEndian, x)
    return bytesBuffer.Bytes()
}

//字节转换成整形
func BytesToInt(b []byte) int {
    bytesBuffer := bytes.NewBuffer(b)

    var x int32
    binary.Read(bytesBuffer, binary.BigEndian, &x)

    return int(x)
}
protocol-协议

 

注意: 解包的过程中要注意数组越界的问题;另外包头要注意唯一性。

 

技术分享
// SocketService project main.go
package Socket

import (
    "fmt"
    "net"
    "os"
)

//默认管道大小为16
var DataSourceChannel chan []byte = make(chan []byte, 16)
var ReplyChannel chan []byte = make(chan []byte) //回复管道
var netListen net.Listener

func StartAccept() {
    var err error
    netListen, err = net.Listen("tcp", ":9123")
    CheckError(err)

    defer netListen.Close()

    for {
        conn, err := netListen.Accept()
        if err != nil {
            continue

        }

        Log(conn.RemoteAddr().String(), " tcp connect success")

        go handleConnection(conn)
        go ReplyMessage(conn)
    }

}

func ReplyMessage(conn net.Conn) {
    var message string
    for {
        //fmt.Print("Reply")
        message = string(<-ReplyChannel)
        _, err := conn.Write(Packet([]byte(conn.RemoteAddr().String() + message)))
        if err != nil {
            fmt.Println("ReplyMessage Falid:", err.Error())
            break
        } else {
            fmt.Println("Reply Successful")
            WriteLine("回复信息成功" + conn.RemoteAddr().String())
        }
    }

}

func handleConnection(conn net.Conn) {
    tempBuffer := make([]byte, 0)          //临时缓冲区,用来存储被截断的数据
    readerChannel := make(chan []byte, 16) //声明一个管道用于接收解包的数据
    go reader(readerChannel)
    buffer := make([]byte, 1024)

    for {
        n, err := conn.Read(buffer)
        if err != nil {
            WriteLine(conn.RemoteAddr().String() + " connection error: " + err.Error())
            Log(conn.RemoteAddr().String(), " break! ")
            //delete(connMapping, conn.RemoteAddr().String())
            break
        }
        tempBuffer = Unpack(append(tempBuffer, buffer[:n]...), readerChannel)
    }

}

//读取解包后的数据存储下来
func reader(readerChannel chan []byte) {

    for {
        select {
        case data := <-readerChannel:
            DataSourceChannel <- data

        }
    }
}

func Log(v ...interface{}) {
    fmt.Println(v...)
}

func CheckError(err error) {
    if err != nil {
        fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
    }
}
服务端代码

 

服务端支持多客户端连接,也可对客户端回复信息,并且在客户端断开以后可以会自动连接上

技术分享
// client
package Socket

import (
    "fmt"
    "net"
    _ "os"
    "time"
)

var First int = 0
var IsConnectioned bool = false
var DataSourceChannel chan []byte = make(chan []byte, 16)

//发消息
func sender(message string, Connection *net.TCPConn) {

    _, err := Connection.Write(Packet([]byte(message)))
    if err != nil {
        fmt.Println("sender fail:", err.Error())
        IsConnectioned = false
        //Connection.Close()
        return
    } else {
        fmt.Println("sender success number:", 1)
    }

}

func ConnectionService() net.TCPConn {
    server := "127.0.0.1:9123"
    tcpAddr, err := net.ResolveTCPAddr("tcp4", server)
    if err != nil {
        WriteLine("tcpAddr Faild ")
        IsConnectioned = false

    }
    conn, err := net.DialTCP("tcp", nil, tcpAddr)
    if err != nil {
        WriteLine("DialTCP Faild" + err.Error())
        IsConnectioned = false

    }
    if err != nil {
        IsConnectioned = false
        //conn.Close()
        return net.TCPConn{}
    } else {
        IsConnectioned = true
        return *conn
    }

}

///心跳包
func Timering(conn *net.TCPConn) {
    timer := time.NewTicker(5 * time.Second)

    for {
        select {
        case <-timer.C:
            if IsConnectioned == false {
                if First == 1 {
                    fmt.Println("connectioning......")
                    First = 0

                }
                //    conn = new(net.TCPConn)
                *conn = ConnectionService()

            } else {
                if First == 0 {
                    fmt.Println("connection successfull  ")
                    First = 1

                }
            }
        }
    }

}

func accept(conn *net.TCPConn) {
    handleConnection(conn)
}

func handleConnection(conn net.Conn) {
    tempBuffer := make([]byte, 0)          //临时缓冲区,用来存储被截断的数据
    readerChannel := make(chan []byte, 16) //声明一个管道用于接收解包的数据
    go reader(readerChannel)
    buffer := make([]byte, 1024)

    for {
        n, err := conn.Read(buffer)
        if err != nil {
            WriteLine(conn.RemoteAddr().String() + " connection error: " + err.Error())
            fmt.Println(conn.RemoteAddr().String(), " break! ")
            break
        }
        tempBuffer = Unpack(append(tempBuffer, buffer[:n]...), readerChannel)
    }
}

//读取解包后的数据存储下来
func reader(readerChannel chan []byte) {
    for {
        select {
        case data := <-readerChannel:
            //fmt.Println(string(data))
            DataSourceChannel <- data
        }
    }
}

///发送信息
func SenderMessage(message string, conn *net.TCPConn) {
    for {
        if IsConnectioned == true {
            sender(message, conn)
            break
        }
    }
}

//接收信息
func AcceptMessage(conn *net.TCPConn) {

    if IsConnectioned == true {
        accept(conn)
    }

}
客户端代码

测试用例:

  在服务端测试程序如下:

技术分享
// Test
package main

import "fmt"
import "Socket-Service"
import "runtime"
import "sync"
import "encoding/json"

func main() {
    var wg sync.WaitGroup
    maxProcess := runtime.NumCPU()
    runtime.GOMAXPROCS(maxProcess)
    fmt.Println("Waiting for clients")
    wg.Add(1)
    go Request()
    wg.Add(1)
    go Socket.StartAccept()
    wg.Wait()
}

func Request() {
    for {
        select {
        case data := <-Socket.DataSourceChannel:
            var dataMap map[string]string = make(map[string]string)
            json.Unmarshal([]byte(data), &dataMap)
            if string(dataMap["Ip"]) != "" {
                fmt.Println("接收信息:" + string(data))
                var str string = "{\"Ip\":\"" + dataMap["Ip"] + "\",\"Name\":\"" + string(data) + "\"}"
                Socket.ReplyChannel <- []byte(str)
            }

        }
    }

}
服务端测试程序

  客户端测试程序:

技术分享
// SocketClient project main.go
package main

import (
    "Socket-Client"
    "fmt"
    "net"
    "runtime"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    maxProcess := runtime.NumCPU()
    runtime.GOMAXPROCS(maxProcess)

    fmt.Println("connectioning......")
    Socket.IsConnectioned = false
    var conn net.TCPConn
    for {
        conn = Socket.ConnectionService()
        if Socket.IsConnectioned != false {
            fmt.Println("connection successfull  ")
            Socket.First = 1
            break
        }

    }

    wg.Add(1)
    go Socket.Timering(&conn)

    wg.Add(1)
    go Socket.AcceptMessage(&conn)

    wg.Add(1)
    go Sender(&conn)

    Accept()
    wg.Wait()
}

func Sender(conn *net.TCPConn) {
    message := "{\"Ip\":\"" + "123" + "\",\"Name\":\"123456\"}"
    for {
        timer := time.NewTimer(3 * time.Second)
        select {
        case <-timer.C:
            Socket.SenderMessage(message, conn)
        }
    }
}

//读取接收数据
func Accept() {
    for {
        select {
        case data := <-Socket.DataSourceChannel:
            fmt.Println(string(data))
        }
    }
}
客户端测试程序

修改发送不同的信息,重新生成一个客户端。(服务端连接两个客户端)

技术分享

技术分享

技术分享

 

Socekt的TCP通信

标签:

原文地址:http://www.cnblogs.com/wangjunqiao/p/4679166.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!