码迷,mamicode.com
首页 > 其他好文 > 详细

一些tcp通讯代码

时间:2015-07-05 16:30:11      阅读:344      评论:0      收藏:0      [点我收藏+]

标签:

1,nginx-lua

需要设置nginx配置文件

    resolver 223.5.5.5 223.6.6.6;
    lua_package_path "/usr/local/nginx/conf/lua-resty-http/lib/?.lua;/usr/local/nginx/conf/lua-resty-string/lib/?.lua;;";
    init_worker_by_lua_file /usr/local/nginx/conf/init.lua;

代码

local sleepTime = 5
local remoteIp = "127.0.0.1"
local remotePort = 9527

local function getLocalClient()
    return "I am Client One" , nil
end

local function getSessionKey(client)
    local http = require("resty.http")
    local httpc = http.new()

    local res, err = httpc:request_uri("http://www.ciaos.com/service/token", {
        method = "POST",
        body = "param=" .. ngx.encode_base64(client),
        headers = {
                ["Content-Type"] = "application/x-www-form-urlencoded",
        }
    })
    if not res or res.status ~= 200 then
        return nil, false
    else
    local body = res.body
    local skey = ngx.decode_base64(body)
        return skey, nil
    end
end

local function encryptMessage(skey, msg)
    local aes = require "resty.aes"
    local str = require "resty.string"
    local aes_128_cbc_with_iv = assert(aes:new(skey, nil, aes.cipher(128,"cbc"), {iv="1234567890123456"}))
    local encrypted = ngx.encode_base64(aes_128_cbc_with_iv:encrypt(msg))
    return encrypted
end

local function decryptMessage(skey, msg)
    local aes = require "resty.aes"
    local str = require "resty.string"
    local aes_128_cbc_with_iv = assert(aes:new(skey, nil, aes.cipher(128,"cbc"), {iv="1234567890123456"}))
    local decrypted = aes_128_cbc_with_iv:decrypt(ngx.decode_base64(msg))
    return decrypted
end

local function modifySleeptime()
    sleepTime = sleepTime * 2
    if sleepTime > 600 then
        sleepTime = 5
    end
end

local function sendMessage(sock, skey, msg, isconn)
    enmsg = encryptMessage(skey, msg)
    if isconn then
        msg = msg .. " " .. enmsg
    else
        msg = enmsg
    end
    local len = string.len(msg)
    local res={0,0,0,0}
    local num = len
    local n = math.ceil(select(2,math.frexp(num))/8)
    for k=n,1,-1 do
        local mul=2^(8*(k-1))
        res[k]=math.floor(num/mul)
            num=num-res[k]*mul
    end
    
    sock:send(string.char(res[1]) .. string.char(res[2]) .. string.char(res[3]) .. string.char(res[4]) .. msg)
end

local function recvMessage(sock, skey)
    sock:settimeout(600000)
    local line, err, partial = sock:receive(4)
    if not line then
        return nil, false
    end

    local len = string.byte(line,1) + string.byte(line,2)*256 + string.byte(line,3)*256*256 + string.byte(line,4)*256*256*256
    local msg, err, partial = sock:receive(len)
    if not msg then
        return nil, false
    end
    msg = decryptMessage(skey, msg)
    return msg, nil
end

local function handle(sock, client, skey)
    sendMessage(sock, skey, client, true)
    while true do
        if ngx.worker.exiting() then
            return "exit"
        end

        local msg, err = recvMessage(sock, skey)
        if not msg then
            sock:close()
            return "receive timeout or connection closed"
        else
            msg = string.gsub(msg, "^%s*(.-)%s*$", "%1")

            local http = require("resty.http")
            local httpc = http.new()
            local res, err = httpc:request_uri("http://inner.ciaos.net"..msg, {
                method = "GET",
                headers = {
                        ["Content-Type"] = "application/x-www-form-urlencoded",
                }
            })
            local result
            if not res then
                result = err
            else
                result = res.body
            end
            sendMessage(sock, skey, result, false)
        end
    end
end

local function loop()
    local client, err = getLocalClient()
    if not client then
        ngx.timer.at(sleepTime, loop)
        modifySleeptime()
        return
    end
    local skey, err = getSessionKey(client)
    if not skey then
        ngx.timer.at(sleepTime, loop)
        modifySleeptime()
        return
    end

        while true do
        if ngx.worker.exiting() then
            break
        end
            local sock = ngx.socket.tcp()
            local ok, err = sock:connect(remoteIp, remotePort)
            if not ok then
                ngx.timer.at(sleepTime, loop)
                modifySleeptime()
                return
            else
            local err = handle(sock, client, skey)
            if err then
                ngx.timer.at(sleepTime, loop)
                modifySleeptime()
                break
            end
        end
    end
end

ngx.timer.at(3, loop)

golang(需要依赖gotcp项目)

代码

// server/server.go

package main

import (
    "encoding/binary"
    "fmt"
    "log"
    "net"
    "os"
    "os/signal"
    "runtime"
    "syscall"
    "time"

    "auth"
    "gotcp"
    "proto"
)

var clientMap map[uint32]*gotcp.Conn

type Callback struct{}

func (this *Callback) OnConnect(c *gotcp.Conn) bool {
    addr := c.GetRawConn().RemoteAddr()
    c.PutExtraData(addr)
    fmt.Println("OnConnect: ", addr)
    return true
}

func (this *Callback) OnMessage(c *gotcp.Conn, p gotcp.Packet) bool {
    rcPacket := p.(*proto.RcPacket)

    var msgtype string
    if c.GetFirstPackageFlag() {
        msgtype = "auth"

        c.SetFirstPackageFlag()
        uclientid, err := auth.GetClientId(rcPacket.GetBody())
        if err != nil {
            fmt.Printf("OnMessage:  %v MsgType[%v] MsgLen[%v] MsgBody[%v] AuthError[%v]\n", c.GetRawConn().RemoteAddr(), msgtype, rcPacket.GetLength(), string(rcPacket.GetBody()), err.Error())
            c.Close()
            return false
        }
        c.SetClientId(uclientid)
        clientMap[uclientid] = c
    } else {
        msgtype = "resp"

        c.GetRelateConn().AsyncWritePacket(proto.NewRcPacket(rcPacket.Serialize(), true), time.Second)
        c.Cmutex.Unlock()
    }

    fmt.Printf("OnMessage:  %v ClientId[%v] MsgType[%v] MsgLen[%v] MsgBody[%v]\n", c.GetRawConn().RemoteAddr(), c.GetClientId(), msgtype, rcPacket.GetLength(), string(rcPacket.GetBody()))

    return true
}

func (this *Callback) OnClose(c *gotcp.Conn) {
    fmt.Println("OnClose:", c.GetExtraData())
    if c.GetRelateConn() != nil {
        c.GetRelateConn().Close()
    }
    delete(clientMap, c.GetClientId())
}

type InnerCallback struct{}

func (this *InnerCallback) OnConnect(c *gotcp.Conn) bool {
    addr := c.GetRawConn().RemoteAddr()
    c.PutExtraData(addr)
    fmt.Println("Inner OnConnect:", addr)
    return true
}

func (this *InnerCallback) OnMessage(c *gotcp.Conn, p gotcp.Packet) bool {
    rcPacket := p.(*proto.RcPacket)

    clientid := binary.LittleEndian.Uint32(rcPacket.GetBody()[0:4])
    fmt.Printf("OnMessage:  %v ClientId[%v] MsgType[%v] MsgLen[%v] MsgBody[%v]\n", c.GetRawConn().RemoteAddr(), clientid, "req", rcPacket.GetLength(), string(rcPacket.GetBody()))
    if conn, ok := clientMap[clientid]; ok {
        conn.Cmutex.Lock()
        conn.SetRelateConn(c)
        conn.AsyncWritePacket(proto.NewRcPacket(rcPacket.GetBody()[4:], false), time.Second)
    } else {
        c.Close()
        return false
    }
    return true
}

func (this *InnerCallback) OnClose(c *gotcp.Conn) {
    fmt.Println("Inner OnClose:", c.GetExtraData())
}

func outer() *gotcp.Server {
    // creates a tcp listener
    tcpAddr, err := net.ResolveTCPAddr("tcp4", ":9527")
    checkError(err)
    listener, err := net.ListenTCP("tcp", tcpAddr)
    checkError(err)

    // creates a server
    config := &gotcp.Config{
        PacketSendChanLimit:    5,
        PacketReceiveChanLimit: 5,
    }
    srv := gotcp.NewServer(config, &Callback{}, &proto.RcProtocol{})

    // starts service
    go srv.Start(listener, time.Second)
    fmt.Println("listening:", listener.Addr())

    return srv
}

func inner() *gotcp.Server {
    // creates a tcp listener
    tcpAddr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1:9528")
    checkError(err)
    listener, err := net.ListenTCP("tcp", tcpAddr)
    checkError(err)

    // creates a server
    config := &gotcp.Config{
        PacketSendChanLimit:    20,
        PacketReceiveChanLimit: 20,
    }
    srv := gotcp.NewServer(config, &InnerCallback{}, &proto.RcProtocol{})

    // starts service
    go srv.Start(listener, time.Second)
    fmt.Println("listening:", listener.Addr())

    return srv
}

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())

    clientMap = make(map[uint32]*gotcp.Conn)

    innerSvr := inner()
    outerSvr := outer()

    // catchs system signal
    chSig := make(chan os.Signal)
    signal.Notify(chSig, syscall.SIGINT, syscall.SIGTERM)
    fmt.Println("Signal: ", <-chSig)

    // stops service
    innerSvr.Stop()
    outerSvr.Stop()
}

func checkError(err error) {
    if err != nil {
        log.Fatal(err)
    }
}
// auth/auth.go

package auth

import (
    "io/ioutil"
    "net/http"
    "strconv"
    "strings"
)

func GetClientId(message []byte) (uint32, error) {
    res, err := http.Post("http://www.ciaos.com/service/auth", "application/x-www-form-urlencoded", strings.NewReader("token="+string(message)))
    if err != nil {
        return 0, err
    }
    result, err := ioutil.ReadAll(res.Body)
    res.Body.Close()
    if err != nil {
        return 0, err
    }
    clientid, err := strconv.Atoi(string(result))
    uclientid := uint32(clientid)
    return uclientid, nil
}

gotcp中conn.go文件修改如下

type Conn struct {
    srv               *Server
    conn              *net.TCPConn  // the raw connection
    extraData         interface{}   // to save extra data
    closeOnce         sync.Once     // close the conn, once, per instance
    closeFlag         int32         // close flag
    closeChan         chan struct{} // close chanel
    packetSendChan    chan Packet   // packet send chanel
    packetReceiveChan chan Packet   // packet receive chanel

    isFirstPackage bool       // first packet
    clientId       uint32     // remote client id
    relateConn     *Conn      // the relate controll connection
    Cmutex         sync.Mutex // mutex
}

php代码如下

<?php if ( ! defined(‘BASEPATH‘)) exit(‘No direct script access allowed‘);

class Service extends CI_Controller {

    private function genToken($clientid)
    {
        return "abcdefghijklmnop";
    }

    private function getClientId($client)
    {
        return 1002;
    }

    //Get Token
    public function token()
    {
        $param = $this->input->post("param");
        if($param)
        {
            $client = base64_decode($param);
            $clientid = self::getClientId($client);

            $token = self::genToken($clientid);

            $this->redis->select(1);
            $this->redis->setex($clientid, 1800, $token);
            echo base64_encode($token);    
        }
        else{
            show_404();
        }
    }

    //Auth Token (From Inner Server)
    public function auth()
    {
        $token = $this->input->post("token");
        if(!$token){
            show_404();
            return;
        }
        $info = explode(‘ ‘,$token);

        $uid = self::getClientId($info[0]);

        $this->redis->select(1);
        $stoken = $this->redis->get($uid);
        if(is_null($stoken)){
            echo "false";
            return;
        }
        $message = substr($token, strlen($info[0])+1);
        $message = str_replace(‘ ‘,‘+‘,$message);
        $client = openssl_decrypt(base64_decode($message), ‘aes-128-cbc‘, $stoken, true, ‘1234567890123456‘);
        if($info[0] == $client){
            echo $uid;    
        } else{
            echo false;
        }
    }

    //Controll Message
    public function console()
    {
        $id = $this->input->get("id");
        $command = $this->input->get("cmd");
        if(is_null($command) or $command == false or $id === false or is_null($id)){
            echo "invalid cmd";
            return;
        }
        $port = 9528;
        $ip = "127.0.0.1";
        $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
        if ($socket < 0) {
            echo "internal server error:1";
            return;
        }

        $result = socket_connect($socket, $ip, $port);
        if (!$result) {
            echo "internal server error:2";
            return;
        }

        $this->redis->select(1);
        $stoken = $this->redis->get($id);
        if(is_null($stoken)){
            echo "internal server error:3";
            return;
        }
        $message = openssl_encrypt($command,‘aes-128-cbc‘,$stoken, true,‘1234567890123456‘);
        $message = base64_encode($message);
        $len = strlen($message)+4;
        $bin_head = pack("ii", $len, $id);
        socket_write($socket, $bin_head, strlen($bin_head));
        $in = $message;
        if(!socket_write($socket, $in, strlen($in))) {
            echo "internal server error:4";
            return;
        }

        $out = socket_read($socket, 4);
        if($out && strlen($out)>0){
            $data = unpack("i", $out);
            $out = socket_read($socket, $data[1]);
            if($out){
                $message = openssl_decrypt(base64_decode($out), ‘aes-128-cbc‘, $stoken, true, ‘1234567890123456‘);
                echo $message;
                socket_close($socket);
                return;
            }
        }
        socket_close($socket);
        echo "internal server error:5";
    }
}

/* End of file welcome.php */
/* Location: ./application/controllers/welcome.php */

 

一些tcp通讯代码

标签:

原文地址:http://www.cnblogs.com/ciaos/p/4622156.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!