码迷,mamicode.com
首页 > 其他好文 > 详细

erlang的tcp批量客户端模板

时间:2015-07-08 09:26:14      阅读:161      评论:0      收藏:0      [点我收藏+]

标签:

tcp_client.erl

-module(tcp_client).

-export([start/2, stop/1]).

start({Ip,Port}, ClientNumber) ->
    io:format("~p,~p~n",[Ip,Port]),

    case tcp_client_handler_sup:start_link({Ip,Port}) of
        {ok, Pid} ->
            IndexList = lists:seq(1, ClientNumber),
            lists:foreach(
                fun(_Index) ->
                    tcp_client_handler_sup:start_child()
                end,
                IndexList
            ),
            {ok, Pid};
        _ ->
            {error, failed}
    end.

stop(_S) ->
    ok.

 

tcp_client_options.erl

-module(tcp_client_options).

-export([get_tcp_options/0, get_tcp_recv_timeout/0, get_tcp_conn_timeout/0]).

get_tcp_options() ->
    [
        {active, false},

        binary,
        {packet, 4},

        {nodelay,true},
        {send_timeout, 5*1000},
        {send_timeout_close, true},

        {reuseaddr, true},
        {keepalive, true},
        {packet_size, 4096}
    ].

get_tcp_recv_timeout() ->
    5*1000.

get_tcp_conn_timeout() ->
    5*1000.

 

tcp_client_handler_sup.erl

-module(tcp_client_handler_sup).
-behaviour(supervisor).

-export([start_link/1, start_child/0]).
-export([init/1]).

-define(SERVER, ?MODULE).

start_link({Ip,Port}) ->
    supervisor:start_link({local, ?SERVER}, ?MODULE, [{Ip,Port}]).

start_child() ->
    supervisor:start_child(?SERVER, []).

init([{Ip,Port}]) ->

    RestartMode = simple_one_for_one,
    MaxRestarts = 0,
    MaxSecondsBetweenRestarts = 1,

    RestartStrategy = {RestartMode, MaxRestarts, MaxSecondsBetweenRestarts},

    Restart = temporary,
    Shutdown = brutal_kill,
    Type = worker,

    Child = {
        tcp_client_handler,
        {tcp_client_handler, start_link, [{Ip,Port}]},
        Restart, Shutdown, Type,
        [tcp_client_handler]
    },

    Children = [Child],
    {ok, {RestartStrategy, Children}}.

 

tcp_client_handler.erl

-module(tcp_client_handler).
-behaviour(gen_server).

-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-export([loop_receive/2]).
-record(socket_info_record, {socket, ip, port}).

start_link({Ip,Port}) ->
    gen_server:start_link(?MODULE, [{Ip,Port}], []).

init([{Ip,Port}]) ->
    {ok, #socket_info_record{ip = Ip, port = Port}, 0}.

handle_call(Msg, _From, State) ->
    {reply, {ok, Msg}, State}.

%% 发数据包
handle_cast({send_socket_msg, Cmd, InfoBin}, State) ->
    send_socket_msg(Cmd, InfoBin,State);

%% 异步通知循环接收
handle_cast({loop_receive,ClientSocket}, State) ->
    loop_receive(ClientSocket,State);

handle_cast(stop, State) ->
    {stop, normal, State}.

handle_info({tcp_closed, _Socket}, #socket_info_record{ip = Addr} = StateData) ->
    HandlerMod = behavior_config:get_socket_handler_module(),
    HandlerMod:on_disconnected(Addr),
    {stop, normal, StateData};

handle_info(timeout, #socket_info_record{ip = Ip, port = Port} = State) ->
    case try_to_connect(Ip,Port,State) of
        {noreply, NewState} ->
            #socket_info_record{socket = Socket} = NewState,
            gen_server:cast(self(), {loop_receive,Socket}),
            {noreply, NewState};
        _ ->
            {stop, normal, State}
    end;

handle_info({delay, Module, CallBack, Args}, State) ->
    Module:CallBack(Args),
    {noreply, State};

handle_info(_Info, StateData) ->
    {noreply, StateData}.

terminate(_Reason, #socket_info_record{socket = Socket}) ->
    io:format("socket process terminated~n"),
    (catch gen_tcp:close(Socket)),
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%% ---------------------------------------------------------------------------------------------------------------------
%% 尝试连接服务器
try_to_connect(Ip,Port,State) ->
    Options = tcp_client_options:get_tcp_options(),
    case gen_tcp:connect(Ip, Port, Options, tcp_client_options:get_tcp_conn_timeout() ) of
        {ok, Socket} ->
            {ok, {IP, _Port}} = inet:peername(Socket),

            io:format("~p连接~n",[IP]),

            {noreply, State#socket_info_record{socket = Socket}};
        {error, timeout}  ->
            {stop, normal, State}

    end.

%% 循环包处理函数
loop_receive(Socket, #socket_info_record{ip = Ip} = State) ->
    case gen_tcp:recv(Socket, 0, tcp_client_options:get_tcp_recv_timeout() ) of
        {ok, Data} ->
            io:format("收到远程数据~p~n",[Data]),

            loop_receive(Socket,State);
        {error, _Reason} ->
            io:format("连接~p断开~n",[Ip]),

            {stop, normal, State}
    end.

send_socket_msg(Cmd, InfoBin,#socket_info_record{socket = Socket} = State) ->
    Data = <<Cmd:16, InfoBin/binary>>,
    gen_tcp:send(Socket, Data),

    {noreply, State}.

 

tcp_send.erl

-module(tcp_send).

-export([send_data/3]).

send_data(Pid, Cmd, JsonBin) ->
    case erlang:is_pid(Pid) of
        true ->
            gen_server:cast(Pid, {send_socket_msg, Cmd, JsonBin});
        false ->
            io:format("socket pid invalid, ignore~n")
    end.

 

erlang的tcp批量客户端模板

标签:

原文地址:http://www.cnblogs.com/ziyouchutuwenwu/p/4629181.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!