mmap, munmap - map or unmap files or devices into memory
#include <sys/mman.h> void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset); int munmap(void *addr, size_t length); See NOTES for information on feature test macro requirements.
Sender 端:
int ipc_sender_init() { if (sender_info.initialized) return 0; bzero(&sender_info, sizeof(ipc_sender_t)); sender_info.fd = open(MAP_FILE, O_CREAT|O_RDWR|O_TRUNC, 00777); if (sender_info.fd < 0) { write_log(LOG_INFO, "open %s FAILED: %d\n", MAP_FILE, errno); return -1; } lseek(sender_info.fd, MAP_SIZE - 1,SEEK_SET); write(sender_info.fd ,"", 1); sender_info.map = mmap(0, MAP_SIZE, PROT_READ|PROT_WRITE, MAP_SHARED , sender_info.fd, 0); if (sender_info.map == MAP_FAILED) { close(sender_info.fd); write_log(LOG_INFO, "mmap FAILED %d\n", errno); return -1; } init_pkt_share_buffer(sender_info.map, MAP_SIZE); sender_info.initialized = true; write_log(LOG_INFO, "ipc_sender_init fd:%d\n", sender_info.fd); return 0; } int ipc_sender_send(void *data) { if (!sender_info.initialized) return -1; pkt_unit unit; memcpy(&unit.msg, data, sizeof(VqmonDataMessage)); int ret = input_pkt_unit(&unit, sender_info.map); if (ret != PKT_SHARE_BUFFER_NOERR) { write_log(LOG_INFO, "input_pkt_unit FAILED: %d\n", ret); } return 0; } int ipc_sender_exit() { if (!sender_info.initialized) return 0; munmap(sender_info.map, MAP_SIZE); close(sender_info.fd); bzero(&sender_info, sizeof(ipc_sender_t)); return 0; }
Recieve 端:
static void handler(int sig) { write_log(LOG_ERROR, "capture a signal %d , thread:%u \n",sig, gettid()); exit(0); } int ipc_receiver_init() { write_log(LOG_INFO, "ipc_receiver_init \n"); if (receiver_info.initialized) return 0; struct sigaction act; act.sa_handler = handler; sigemptyset(&act.sa_mask); act.sa_flags = 0; sigaction(SIGSEGV, &act, 0); sigaction(SIGFPE, &act, 0); bzero(&receiver_info, sizeof(ipc_receiver_t)); receiver_info.fd = open(MAP_FILE, O_CREAT|O_RDWR, 00777); if (receiver_info.fd < 0) { write_log(LOG_ERROR, "open %s FAILED: %d\n", MAP_FILE, errno); return -1; } receiver_info.map = mmap(0, MAP_SIZE, PROT_READ|PROT_WRITE, MAP_SHARED , receiver_info.fd, 0); if (receiver_info.map == MAP_FAILED) { close(receiver_info.fd); write_log(LOG_ERROR, "mmap FAILED %d\n", errno); return -1; } receiver_info.initialized = true; return 0; } int ipc_receiver_run() { if (!receiver_info.initialized) return -1; if (pthread_create(&receiver_info.thread_id, NULL, ipc_loop, NULL) < 0) { write_log(LOG_ERROR, "pthread_create FAILED\n", errno); return -1; } return 0; } int ipc_receiver_exit() { if (!receiver_info.initialized) return 0; receiver_info.loop_done = true; pthread_join(receiver_info.thread_id, NULL); munmap(receiver_info.map, MAP_SIZE); close(receiver_info.fd); bzero(&receiver_info, sizeof(ipc_receiver_t)); return 0; } static void *ipc_loop(void *arg) { while (true) { if (receiver_info.loop_done) break; pkt_unit unit; int ret = output_pkt_unit(&unit, receiver_info.map); if (ret == PKT_SHARE_BUFFER_NOERR) { handle_msg(&unit.msg); } else if (ret == PKT_SHARE_BUFFER_ERR_READ_NODATA) { usleep(10 * 1000); } else { write_log(LOG_ERROR, "output_pkt_unit FAILED: %d\n", ret); } } return NULL; }
#include "pktShareBuffer.h" #include "util.h" #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <time.h> #include <stdio.h> #include <sys/time.h> #ifdef __cplusplus extern "C" { #endif #define PKT_SHARE_BUFFER_HEAD_CODE 0xCE01 #define CAST_EASY(a,b) (struct b *)(a) #define CHECK_BUFFER(a) (PKT_SHARE_BUFFER_HEAD_CODE == ((CAST_EASY(a, pkt_share_buffer))->head_code)) typedef enum pkt_unit_stat{ PKT_UNIT_STAT_NOUSE = 0, PKT_UNIT_STAT_WRITING, PKT_UNIT_STAT_READY, PKT_UNIT_STAT_READING, PKT_UNIT_STAT_READOUT } pkt_unit_stat; typedef struct pkt_share_buffer{ uint32 head_code; /* * total size of the buffer */ uint32 total_size; /* * max number of pkt_unit can be include in buffer */ uint32 index_max; /* * unused buffer size at the end of buffer */ uint32 remain_size; /* * read pos */ uint32 r_pos; /* * w_pos_old: the oldest packet index in buffer * w_pos_new:the latest packet index in buffer */ uint32 w_pos_new; uint32 w_pos_old; uint32 w_pos_nxt; pkt_unit body[0]; } pkt_share_buffer; static inline pkt_unit * index_pkt_unit(uint32 index, void *mem) { pkt_share_buffer *buf_p = CAST_EASY(mem, pkt_share_buffer); if(index > buf_p->index_max){ return NULL; } return &(buf_p->body[index]); } static inline void __copy_pkt_unit(pkt_unit *dst, pkt_unit *src) { dst->pkt_len = src->pkt_len; dst->tv_sec = src->tv_sec; dst->tv_usec = src->tv_usec; dst->seq = src->seq; cts_memcpy(dst->raw_data, src->raw_data, src->pkt_len); } static int __recv_to_pkt_unit(pkt_unit *dst, int fd, uint32 size) { int rc = 0; uint32 len = 0; struct sockaddr_in fromAddr; struct timeval tv; rc = recvfrom(fd, dst->raw_data, size, 0, (struct sockaddr *)&fromAddr, (socklen_t *)&len); if(rc < 0){ return rc; } gettimeofday(&tv, NULL); dst->tv_sec = tv.tv_sec; dst->tv_usec = tv.tv_usec; dst->pkt_len = rc; return rc; } static inline void __update_head_after_write(uint32 index, pkt_share_buffer *buf_p) { buf_p->w_pos_new = index; buf_p->w_pos_nxt = (buf_p->w_pos_nxt + 1)%(buf_p->index_max); /* * update old flag for the first time... */ if(buf_p->w_pos_old == 0xffff){ buf_p->w_pos_old = index; return; } /* * new flag catch old flag after a cycle..... */ if(buf_p->w_pos_old == index){ buf_p->w_pos_old = (index + 1)%(buf_p->index_max); } return; } static inline void __update_head_after_read(uint32 index, pkt_share_buffer *buf_p) { buf_p->r_pos = (index+1)%(buf_p->index_max); return; } static int __sync_write_pkt_unit(pkt_unit *dst, pkt_unit *src, pkt_share_buffer *buf_p, uint32 index) { int ret = PKT_SHARE_BUFFER_NOERR; uint32 u32_tmp = 0; uint32 w_cover_flag = 0; /* * try to set writing flag if current stat = PKT_UNIT_STAT_NOUSE */ u32_tmp = __sync_val_compare_and_swap (&dst->stat, PKT_UNIT_STAT_READOUT, PKT_UNIT_STAT_WRITING); /* *current stat is readout and we have successfully set writing flag * we copy the data here! */ if(u32_tmp == PKT_UNIT_STAT_READOUT){ __copy_pkt_unit(dst, src); __update_head_after_write(index, buf_p); goto w_success; } /* * try to set writing flag if current stat = PKT_UNIT_STAT_NOUSE */ u32_tmp = __sync_val_compare_and_swap (&dst->stat, PKT_UNIT_STAT_NOUSE, PKT_UNIT_STAT_WRITING); /* *current stat is nouse and we have successfully set writing flag * we copy the data here! */ if(u32_tmp == PKT_UNIT_STAT_NOUSE){ __copy_pkt_unit(dst, src); __update_head_after_write(index, buf_p); goto w_success; } /* * try to set writing flag if current stat = PKT_UNIT_STAT_READY */ u32_tmp = __sync_val_compare_and_swap (&dst->stat, PKT_UNIT_STAT_READY, PKT_UNIT_STAT_WRITING); /* *current stat is ready and we have successfully set writing flag * we copy the data here! */ if(u32_tmp == PKT_UNIT_STAT_READY){ __copy_pkt_unit(dst, src); __update_head_after_write(index, buf_p); /* *set cover flag */ w_cover_flag = 1; goto w_success; } /* * stat is not in valid three(nousing/ready/readout),so cast a write blocking err */ ret = PKT_SHARE_BUFFER_ERR_WRITE_BLOCKING; return ret; w_success: /* * set stat with readout........ */ u32_tmp = __sync_val_compare_and_swap (&dst->stat, PKT_UNIT_STAT_WRITING, PKT_UNIT_STAT_READY); if(u32_tmp != PKT_UNIT_STAT_WRITING){ /* * this should nerver occour, or threr maybe have a big error...... */ __sync_lock_test_and_set (&dst->stat,PKT_UNIT_STAT_READY); return PKT_SHARE_BUFFER_ERR_OTHER; } ret = w_cover_flag? PKT_SHARE_BUFFER_ERR_COVER_UNREAD_DATA:PKT_SHARE_BUFFER_NOERR; return ret; } static int __sync_read_pkt_unit(pkt_unit *dst, pkt_unit *src, pkt_share_buffer *buf_p, uint32 index) { uint32 u32_tmp; /* *try to set reading flag if current stat = PKT_UNIT_STAT_READY */ u32_tmp = __sync_val_compare_and_swap (&src->stat, PKT_UNIT_STAT_READY, PKT_UNIT_STAT_READING); /* *current stat is no use and we have successfully set writing flag * we copy the data here! */ if(u32_tmp == PKT_UNIT_STAT_READY){ __copy_pkt_unit(dst, src); __update_head_after_read(index, buf_p); goto r_success; } /* * stat is not in valid one(ready),so cast a write blocking err */ if(u32_tmp == PKT_UNIT_STAT_WRITING || u32_tmp == PKT_UNIT_STAT_READING){ return PKT_SHARE_BUFFER_ERR_READ_BLOCKING; } else{ return PKT_SHARE_BUFFER_ERR_READ_NODATA; } r_success: /* * try to set stat with readout */ u32_tmp = __sync_val_compare_and_swap (&src->stat, PKT_UNIT_STAT_READING, PKT_UNIT_STAT_READOUT); if(u32_tmp != PKT_UNIT_STAT_READING){ __sync_lock_test_and_set (&src->stat,PKT_UNIT_STAT_READOUT); return PKT_SHARE_BUFFER_ERR_OTHER; } return PKT_SHARE_BUFFER_NOERR; } static int __sync_recv_to_pkt_unit(pkt_unit *dst, pkt_share_buffer *buf_p, uint32 index, int fd, uint32 size, uint32 seq) { int ret = PKT_SHARE_BUFFER_NOERR; uint32 u32_tmp = 0; uint32 w_cover_flag = 0; uint32 old_stat = 0; /* * try to set writing flag if current stat = PKT_UNIT_STAT_NOUSE */ u32_tmp = __sync_val_compare_and_swap (&dst->stat, PKT_UNIT_STAT_READOUT, PKT_UNIT_STAT_WRITING); /* *current stat is readout and we have successfully set writing flag * we copy the data here! */ if(u32_tmp == PKT_UNIT_STAT_READOUT){ if(__recv_to_pkt_unit(dst, fd, size) < 0){ goto w_sock_err; } __update_head_after_write(index, buf_p); dst->seq = seq; goto w_success; } /* * try to set writing flag if current stat = PKT_UNIT_STAT_NOUSE */ u32_tmp = __sync_val_compare_and_swap (&dst->stat, PKT_UNIT_STAT_NOUSE, PKT_UNIT_STAT_WRITING); /* *current stat is nouse and we have successfully set writing flag * we copy the data here! */ if(u32_tmp == PKT_UNIT_STAT_NOUSE){ if(__recv_to_pkt_unit(dst, fd, size) < 0){ goto w_sock_err; } __update_head_after_write(index, buf_p); dst->seq = seq; goto w_success; } /* * try to set writing flag if current stat = PKT_UNIT_STAT_READY */ u32_tmp = __sync_val_compare_and_swap (&dst->stat, PKT_UNIT_STAT_READY, PKT_UNIT_STAT_WRITING); /* *current stat is ready and we have successfully set writing flag * we copy the data here! */ if(u32_tmp == PKT_UNIT_STAT_READY){ if(__recv_to_pkt_unit(dst, fd, size) < 0){ goto w_sock_err; } __update_head_after_write(index, buf_p); dst->seq = seq; w_cover_flag = 1; goto w_success; } /* * stat is not in valid three(nousing/ready/readout),so cast a write blocking err */ ret = PKT_SHARE_BUFFER_ERR_WRITE_BLOCKING; return ret; w_sock_err: /* * cover stat with old value..... */ old_stat = u32_tmp; u32_tmp = __sync_val_compare_and_swap (&dst->stat, PKT_UNIT_STAT_WRITING, old_stat); if(u32_tmp != PKT_UNIT_STAT_WRITING){ /* * this should nerver occour, or threr maybe have a big error...... */ __sync_lock_test_and_set (&dst->stat, old_stat); return PKT_SHARE_BUFFER_ERR_OTHER; } return PKT_SHARE_BUFFER_ERR_IOERR; w_success: /* * set stat with ready........ */ u32_tmp = __sync_val_compare_and_swap (&dst->stat, PKT_UNIT_STAT_WRITING, PKT_UNIT_STAT_READY); if(u32_tmp != PKT_UNIT_STAT_WRITING){ /* * this should nerver occour, or threr maybe have a big error...... */ __sync_lock_test_and_set (&dst->stat,PKT_UNIT_STAT_READY); return PKT_SHARE_BUFFER_ERR_OTHER; } ret = w_cover_flag? PKT_SHARE_BUFFER_ERR_COVER_UNREAD_DATA:PKT_SHARE_BUFFER_NOERR; return ret; } static void _print_buffer_info(void *mem) { pkt_share_buffer *p = CAST_EASY(mem, pkt_share_buffer); /* * common information data */ printf("| HEAD CODE = 0x%2X ", p->head_code); printf("| INDEX_MAX = %d ", p->index_max); printf("| TOTAL_SIZE = %d ", p->total_size); printf("| REMAIN_SIZE = %d |\n", p->remain_size); /* * write offset */ printf("| WRITE_NEXT = %d ", p->w_pos_nxt); printf("| WRITE_OLD = %d ", p->w_pos_old); printf("| WRITE_LATEST = %d ", p->w_pos_new); /* * read offset */ printf("| READ_NEXT = %d |\n", p->r_pos); //printf("sizeof buffer_head = %d; sizeof pkt_unit = %d\n", sizeof(pkt_share_buffer), sizeof(pkt_unit)); return; } int init_pkt_share_buffer(void *mem, uint32 size) { pkt_share_buffer *buf_p = NULL; /* * min size is buffer with just 1 pkt_unit; */ if(!mem || size < sizeof(pkt_share_buffer)+sizeof(pkt_unit)){ return PKT_SHARE_BUFFER_ERR_INVALIDE_PARM; } //clean all buffer first memset(mem, 0x00, size); /* * write buffer head... */ buf_p = CAST_EASY(mem, pkt_share_buffer); buf_p->total_size = size; buf_p->head_code = PKT_SHARE_BUFFER_HEAD_CODE; buf_p->index_max = (size - sizeof(pkt_share_buffer))/sizeof(pkt_unit); buf_p->remain_size = (size - sizeof(pkt_share_buffer))%sizeof(pkt_unit); buf_p->r_pos = 0; buf_p->w_pos_old = 0xffff; buf_p->w_pos_new = 0xffff; buf_p->w_pos_nxt = 0; #ifdef CT_PKT_SHARE_BUFFER_DEBUG _print_buffer_info(buf_p); #endif return PKT_SHARE_BUFFER_NOERR; } int input_pkt_unit(pkt_unit *p, void *mem) { int ret = 0; uint32 in_pos = 0; pkt_share_buffer * buf_p = NULL; pkt_unit *unit_p = NULL; if(!p || !mem){ return PKT_SHARE_BUFFER_ERR_INVALIDE_PARM; } buf_p = CAST_EASY(mem, pkt_share_buffer); if(!CHECK_BUFFER(buf_p)){ return PKT_SHARE_BUFFER_ERR_NOT_INIT; } in_pos = buf_p->w_pos_nxt; unit_p = index_pkt_unit(in_pos, mem); ret = __sync_write_pkt_unit(unit_p, p, buf_p, in_pos); #ifdef CT_PKT_SHARE_BUFFER_DEBUG _print_buffer_info(buf_p); #endif return ret; } int recv_to_pkt_share_buffer(int fd, uint32 size, uint32 seq, void *mem) { int ret = 0; uint32 in_pos = 0; pkt_share_buffer * buf_p = NULL; pkt_unit *unit_p = NULL; if(fd <= 0 || !mem){ return PKT_SHARE_BUFFER_ERR_INVALIDE_PARM; } buf_p = CAST_EASY(mem, pkt_share_buffer); if(!CHECK_BUFFER(buf_p)){ return PKT_SHARE_BUFFER_ERR_NOT_INIT; } in_pos = buf_p->w_pos_nxt; unit_p = index_pkt_unit(in_pos, mem); if(size > CAPTURE_PKT_MAX_SIZE){ size = CAPTURE_PKT_MAX_SIZE; } ret = __sync_recv_to_pkt_unit(unit_p, buf_p, in_pos, fd, size, seq); #ifdef CT_PKT_SHARE_BUFFER_DEBUG _print_buffer_info(buf_p); #endif if (ret != PKT_SHARE_BUFFER_NOERR) { printf("Failed recv_to_pkt_share_buffer, ret=%d\n", ret); } return ret; } int output_pkt_unit(pkt_unit *p, void *mem) { int ret = 0; uint32 out_pos = 0; pkt_share_buffer * buf_p = NULL; pkt_unit *unit_p = NULL; if(!p || !mem){ return PKT_SHARE_BUFFER_ERR_INVALIDE_PARM; } buf_p = CAST_EASY(mem, pkt_share_buffer); if(!CHECK_BUFFER(buf_p)){ return PKT_SHARE_BUFFER_ERR_NOT_INIT; } out_pos = buf_p->r_pos; unit_p = index_pkt_unit(out_pos, mem); ret = __sync_read_pkt_unit(p, unit_p, buf_p, out_pos); //#ifdef CT_PKT_SHARE_BUFFER_DEBUG // _print_buffer_info(buf_p); //#endif return ret; } int reset_output_pos(void *mem) { pkt_share_buffer * buf_p = NULL; uint32 u32_tmp = 0; buf_p = CAST_EASY(mem, pkt_share_buffer); u32_tmp = buf_p->w_pos_new; buf_p->r_pos = u32_tmp; return PKT_SHARE_BUFFER_NOERR; } #ifdef __cplusplus } #endif
#ifndef _PKT_SHARE_BUFFER_H_ #define _PKT_SHARE_BUFFER_H_ #include "commDef.h" #ifdef __cplusplus extern "C" { #endif #define CAPTURE_PKT_MAX_SIZE (1500) typedef enum pkt_share_buff_err { PKT_SHARE_BUFFER_NOERR = 0, PKT_SHARE_BUFFER_ERR_INVALIDE_PARM, PKT_SHARE_BUFFER_ERR_IOERR, PKT_SHARE_BUFFER_ERR_NOT_INIT, PKT_SHARE_BUFFER_ERR_COVER_UNREAD_DATA, PKT_SHARE_BUFFER_ERR_WRITE_BLOCKING, PKT_SHARE_BUFFER_ERR_READ_BLOCKING, PKT_SHARE_BUFFER_ERR_READ_NODATA, PKT_SHARE_BUFFER_ERR_OTHER, PKT_SHARE_BUFFER_ERR_LAST }pkt_share_buff_err; typedef struct pkt_unit { uint32 stat; uint32 pkt_len; uint32 tv_sec; uint32 tv_usec; uint64 seq; uint8 raw_data[CAPTURE_PKT_MAX_SIZE]; uint8 pad[12]; } pkt_unit;//total size = 1536 byte /*************************************************** /* * init function of buffer... * nomally called by the data writer * @mem: buffer pointer * @size: buffer size we malloced * @return val: 0->success other->err */ int init_pkt_share_buffer(void *mem, uint32 size); /**************************************** /* * input the pkt_unit into buffer * @p: input packet * @mem:buffer entry * @return val: 0->success other->err */ int input_pkt_unit(pkt_unit *p, void *mem); /******************************************************************** /* * use buffer space to recv data from a file descriptor * @fd: file descriptor we receive data from * @size: max size we receive * @seq: what seq we want the received data be * @mem:buffer entry * @return val: 0->success other->err */ int recv_to_pkt_share_buffer(int fd, uint32 size, uint32 seq, void *mem); /****************************************** /* * output the pkt_unit from buffer * @p: output packet unit * @mem:buffer entry * @return val: 0->success other->err */ int output_pkt_unit(pkt_unit *p, void *mem); /********************************************* /* *this function should be call when reader recved no ordered pkt unit *and this function will set the read offset with the latest pkt_unit index we have written *NOTICE:this function can be called only in READ thread...... */ int reset_output_pos(void *mem); #ifdef __cplusplus } #endif #endif//_PKT_SHARE_BUFFER_H_