标签:down read tail eve 返回值 调用 知识 scm 第一条
转自:http://blog.csdn.net/wdscq1234/article/details/51926808
写在前面
本文主要是分析kernel-3.8的源代码,主要集中在Network的netdevice层面,来贯穿interface传输数据包的流程,kernel 博大精深,这也仅仅是一点个人愚见,作为一个笔记形式的文章,如有错误或者表述不当之处,还请大家留言批评指正,非常感谢!
主要涉及的file:kernel-3.18/net/core/dev.c
kernel-3.18/net/sched/sch_generic.c
dev_queue_xmit分析
当上层的APP试图建立一个TCP的链接,或者发送一个封包的时候,在kernel的协议栈部分,在TCP/UDP层会组成一个网络的封包,然后通过IP进行路由选择以及iptables的Hook,之后 到neighbor层查询或者询问下一跳的链路层地址,然后通过调用dev_queue_xmit这个网络设备接口层函数发送给driver,本文就来分析一下dev_queue_xmit的相关流程,了解一个包是如何发送出去的!
- int dev_queue_xmit(struct sk_buff *skb)
- {
- return __dev_queue_xmit(skb, NULL);
- }
这个是直接调用的__dev_queue_xmit 传入的参数是一个skb 数据包
需要确认的几个点就是
1.设备在调用这个函数之前,必须设置设备优先级 和缓冲区buffer
2.如果此函数发送失败,会返回一个负数的Error number,不过即使返回正数,也不一定保证发送成功,封包也许会被网络拥塞给drop掉
3.这个函数也可以从队列规则中返回error,NET_XMIT_DROP, 这个错误是一个整数,所以错误也有可能是整数,也验证了点2 ,所以在协议栈的上一层使用这个函数的时候,可能需要注意error的处理部分
4. 不管这个函数返回什么值,这个skb最终的宿命就是被consume,也就是free掉了... 所以这个时候上层不要尝试重发了... 除非有协议栈的重传, 要不skb已经被free了,再去重新去调用,不要命了..此时kernel就挂掉了...
5.在调用这个函数的时候,必须打开中断,这些东西不是特别明白原因....
* When calling this method, interrupts MUST be enabled. This is because
* the BH enable code must have IRQs enabled so that it will not deadlock.
- static int __dev_queue_xmit(struct sk_buff *skb, void *accel_priv)
- {
- struct net_device *dev = skb->dev;
- struct netdev_queue *txq;
- struct Qdisc *q;
- int rc = -ENOMEM;
-
- skb_reset_mac_header(skb);
-
- if (unlikely(skb_shinfo(skb)->tx_flags & SKBTX_SCHED_TSTAMP))
- __skb_tstamp_tx(skb, NULL, skb->sk, SCM_TSTAMP_SCHED);
-
-
- rcu_read_lock_bh();
-
- skb_update_prio(skb);
-
-
-
- if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
- skb_dst_drop(skb);
- else
- skb_dst_force(skb);
-
-
- txq = netdev_pick_tx(dev, skb, accel_priv);
- q = rcu_dereference_bh(txq->qdisc);
-
- #ifdef CONFIG_NET_CLS_ACT
- skb->tc_verd = SET_TC_AT(skb->tc_verd, AT_EGRESS);
- #endif
- trace_net_dev_queue(skb);
-
-
- if (q->enqueue) {
- rc = __dev_xmit_skb(skb, q, dev, txq);
- goto out;
- }
-
-
-
-
- if (dev->flags & IFF_UP) {
- int cpu = smp_processor_id();
-
- if (txq->xmit_lock_owner != cpu) {
-
- if (__this_cpu_read(xmit_recursion) > RECURSION_LIMIT)
- goto recursion_alert;
- skb = validate_xmit_skb(skb, dev);
- if (!skb)
- goto drop;
-
- HARD_TX_LOCK(dev, txq, cpu);
-
- if (!netif_xmit_stopped(txq)) {
- __this_cpu_inc(xmit_recursion);
- skb = dev_hard_start_xmit(skb, dev, txq, &rc);
- __this_cpu_dec(xmit_recursion);
- if (dev_xmit_complete(rc)) {
- HARD_TX_UNLOCK(dev, txq);
- goto out;
- }
- }
- HARD_TX_UNLOCK(dev, txq);
- net_crit_ratelimited("Virtual device %s asks to queue packet!\n",
- dev->name);
- } else {
-
- recursion_alert:
- net_crit_ratelimited("Dead loop on virtual device %s, fix it urgently!\n",
- dev->name);
- }
- }
-
- rc = -ENETDOWN;
- drop:
- rcu_read_unlock_bh();
-
- atomic_long_inc(&dev->tx_dropped);
- kfree_skb_list(skb);
- return rc;
- out:
- rcu_read_unlock_bh();
- return rc;
- }
从对_dev_queue_xmit函数的分析来看,发送报文有2中情况:
1.有拥塞控制策略的情况,比较复杂,但是目前最常用
2.没有enqueue的状况,比较简单,直接发送到driver,如loopback等使用
先检查是否有enqueue的规则,如果有即调用__dev_xmit_skb进入拥塞控制的flow,如果没有且txq处于On的状态,那么就调用dev_hard_start_xmit直接发送到driver,好 那先分析带Qdisc策略的flow 进入__dev_xmit_skb
__dev_xmit_skb分析
- static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
- struct net_device *dev,
- struct netdev_queue *txq)
- {
- spinlock_t *root_lock = qdisc_lock(q);
- bool contended;
- int rc;
-
- qdisc_pkt_len_init(skb);
- qdisc_calculate_pkt_len(skb, q);
-
- contended = qdisc_is_running(q);
- if (unlikely(contended))
- spin_lock(&q->busylock);
-
- spin_lock(root_lock);
-
-
- if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
- kfree_skb(skb);
- rc = NET_XMIT_DROP;
- } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
- qdisc_run_begin(q)) {
-
-
- qdisc_bstats_update(q, skb);
-
-
- if (sch_direct_xmit(skb, q, dev, txq, root_lock, true)) {
- if (unlikely(contended)) {
- spin_unlock(&q->busylock);
- contended = false;
- }
- __qdisc_run(q);
- } else
- qdisc_run_end(q);
-
- rc = NET_XMIT_SUCCESS;
- } else {
-
-
- rc = q->enqueue(skb, q) & NET_XMIT_MASK;
- if (qdisc_run_begin(q)) {
- if (unlikely(contended)) {
- spin_unlock(&q->busylock);
- contended = false;
- }
- __qdisc_run(q);
- }
- }
- spin_unlock(root_lock);
- if (unlikely(contended))
- spin_unlock(&q->busylock);
- return rc;
- }
从上述分析来看,可以分成2个状况:
1. Qdisc满足上述3个条件,置位TCQ_F_CAN_BYPASS,0个包,没有running直接调用sch_direct_xmit,感觉这种状况,是一开始刚发第一个包的时候肯定是这种状况...
2.不满足上述的3个条件 一个或者多个,那就直接进行enqueue操作,然后运行qdisc
个人认为,第一种状况是在网络通畅的状况下遇到的状况,qdisc的队列基本上处于空的状态,都是直接传送给driver了,第二种情况是属于出现网络拥塞的情况,出现发送失败的状况了
Q里面还有一些待发送的数据包,为了保证Q中的数据按照Qdisc的规则发送,比如先进先出,就需要enqueue操作,然后再去dequeue发送出去!
sch_direct_xmit
下面来分析sch_direct_xmit,这个函数可能传输几个数据包,因为在不经过queue状况下和经过queue的状况下都会调通过这个函数发送,如果是queue状况,肯定是能够传输多个数据包了,本文后面也有分析,并按照需求处理return的状态,需要拿着__QDISC___STATE_RUNNING bit,只有一个CPU 可以执行这个函数, 在这里有可能会出现BUSY的状况!
- int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
- struct net_device *dev, struct netdev_queue *txq,
- spinlock_t *root_lock, bool validate)
- {
- int ret = NETDEV_TX_BUSY;
-
-
- spin_unlock(root_lock);
-
-
- if (validate)
- skb = validate_xmit_skb_list(skb, dev);
-
- if (likely(skb)) {
- HARD_TX_LOCK(dev, txq, smp_processor_id());
-
- if (!netif_xmit_frozen_or_stopped(txq))
- skb = dev_hard_start_xmit(skb, dev, txq, &ret);
-
- HARD_TX_UNLOCK(dev, txq);
- } else {
- spin_lock(root_lock);
- return qdisc_qlen(q);
- }
- spin_lock(root_lock);
-
- if (dev_xmit_complete(ret)) {
-
-
- ret = qdisc_qlen(q);
- } else if (ret == NETDEV_TX_LOCKED) {
-
- ret = handle_dev_cpu_collision(skb, txq, q);
- } else {
-
- if (unlikely(ret != NETDEV_TX_BUSY))
- net_warn_ratelimited("BUG %s code %d qlen %d\n",
- dev->name, ret, q->q.qlen);
-
- ret = dev_requeue_skb(skb, q);
- }
-
-
- if (ret && netif_xmit_frozen_or_stopped(txq))
- ret = 0;
-
- return ret;
- }
这个函数就是在txq没有被stop的状况下,直接发送给driver,如果遇到无法发送的状况,要么是fail的,要么出现Tx Busy就requeue,使用拥塞的方式进行发送。
dev_hard_start_xmit
继续看dev_hard_start_xmit,这个函数比较简单,调用xmit_one来发送一个到多个数据包了
- struct sk_buff *dev_hard_start_xmit(struct sk_buff *first, struct net_device *dev,
- struct netdev_queue *txq, int *ret)
- {
- struct sk_buff *skb = first;
- int rc = NETDEV_TX_OK;
-
- while (skb) {
-
- struct sk_buff *next = skb->next;
-
- skb->next = NULL;
-
-
- rc = xmit_one(skb, dev, txq, next != NULL);
-
-
- if (unlikely(!dev_xmit_complete(rc))) {
- skb->next = next;
- goto out;
- }
-
- skb = next;
-
-
- if (netif_xmit_stopped(txq) && skb) {
- rc = NETDEV_TX_BUSY;
- break;
- }
- }
-
- out:
- *ret = rc;
- return skb;
- }
对于xmit_one这个来讲比较简单了,下面代码中列出了xmit_one, netdev_start_xmit,__netdev_start_xmit 这个三个函数,其目的就是将封包送到driver的tx函数了..中间在送往driver之前,还会经历抓包的过程,本文不介绍抓包的流程了。
- static int xmit_one(struct sk_buff *skb, struct net_device *dev,
- struct netdev_queue *txq, bool more)
- {
- unsigned int len;
- int rc;
-
-
- if (!list_empty(&ptype_all))
- dev_queue_xmit_nit(skb, dev);
-
- len = skb->len;
- trace_net_dev_start_xmit(skb, dev);
-
- rc = netdev_start_xmit(skb, dev, txq, more);
- trace_net_dev_xmit(skb, rc, dev, len);
-
- return rc;
- }
-
- static inline netdev_tx_t netdev_start_xmit(struct sk_buff *skb, struct net_device *dev,
- struct netdev_queue *txq, bool more)
- {
- const struct net_device_ops *ops = dev->netdev_ops;
- int rc;
-
- rc = __netdev_start_xmit(ops, skb, dev, more);
-
-
- if (rc == NETDEV_TX_OK)
- txq_trans_update(txq);
-
- return rc;
- }
-
- static inline netdev_tx_t __netdev_start_xmit(const struct net_device_ops *ops,
- struct sk_buff *skb, struct net_device *dev,
- bool more)
- {
- skb->xmit_more = more ? 1 : 0;
- return ops->ndo_start_xmit(skb, dev);
- }
分析到这里已经形成了一条线,__dev_xmit_skb中对于qlen为0的状态下直接调用sch_direct_xmit去发送,这个时候sch_direct_xmit也直接调用了dev_hard_start_xmit来直接送到driver,如果发送成功会返回NETDEV_TX_OK,如果由于一些原因产生TX_BUSY的话,重新将这个包requeu到Qdisc中,再使用拥塞的方式发送
拥塞发送方式分析
Qdisc简介
接下来再分析下另外一条线,如果不满足上述的3个条件,即interface配置不允许直接发送,或者是有发送失败的包或者积累的封包等,就需要被enqueue了,进入Qdisc的操作!
从code中看,直接是q->enqueue这样的钩子函数调用,那么这个函数在哪里被赋值的!
1. 注册的flow,默认是没有Qdisc的
register_netdevice-->dev_init_scheduler--->dev->qdisc = &noop_qdisc; //默认是没有qdisc
可以看下Code中对noop_qdisc的规定...
- struct Qdisc noop_qdisc = {
- .enqueue = noop_enqueue,
- .dequeue = noop_dequeue,
- .flags = TCQ_F_BUILTIN,
- .ops = &noop_qdisc_ops,
- .list = LIST_HEAD_INIT(noop_qdisc.list),
- .q.lock = __SPIN_LOCK_UNLOCKED(noop_qdisc.q.lock),
- .dev_queue = &noop_netdev_queue,
- .busylock = __SPIN_LOCK_UNLOCKED(noop_qdisc.busylock),
- };
- static struct Qdisc_ops noqueue_qdisc_ops __read_mostly = {
- .id = "noqueue",
- .priv_size = 0,
- .enqueue = noop_enqueue,
- .dequeue = noop_dequeue,
- .peek = noop_dequeue,
- .owner = THIS_MODULE,
- };
enqueue/dequeue规则 不能使用哈!
-
- static int noop_enqueue(struct sk_buff *skb, struct Qdisc *qdisc)
- {
- kfree_skb(skb);
- return NET_XMIT_CN;
- }
-
- static struct sk_buff *noop_dequeue(struct Qdisc *qdisc)
- {
- return NULL;
- }
2.在打开设备的时候,赋予default
实际上刚开始注册的时候 就是这样 ,并没有任何可用的规则,但是在真正打开interface的时候,系统还是给赋予系统默认的!
dev_open-->__dev_open-->dev_activate
-
- f (dev->qdisc == &noop_qdisc)
- attach_default_qdiscs(dev);
- static void attach_default_qdiscs(struct net_device *dev)
- {
- struct netdev_queue *txq;
- struct Qdisc *qdisc;
-
- txq = netdev_get_tx_queue(dev, 0);
-
-
- if (!netif_is_multiqueue(dev) || dev->tx_queue_len == 0) {
- netdev_for_each_tx_queue(dev, attach_one_default_qdisc, NULL);
- dev->qdisc = txq->qdisc_sleeping;
- atomic_inc(&dev->qdisc->refcnt);
- } else {
-
-
- qdisc = qdisc_create_dflt(txq, &mq_qdisc_ops, TC_H_ROOT);
- if (qdisc) {
- dev->qdisc = qdisc;
- qdisc->ops->attach(qdisc);
- }
- }
- }
我们在使用的netdevice基本上都是单队列的,默认情况下都是这个pfifo_fast_ops队列,具体Qdisc的知识这边不做深究!
- struct Qdisc_ops pfifo_fast_ops __read_mostly = {
- .id = "pfifo_fast",
- .priv_size = sizeof(struct pfifo_fast_priv),
- .enqueue = pfifo_fast_enqueue,
- .dequeue = pfifo_fast_dequeue,
- .peek = pfifo_fast_peek,
- .init = pfifo_fast_init,
- .reset = pfifo_fast_reset,
- .dump = pfifo_fast_dump,
- .owner = THIS_MODULE,
- };
在这里我们可以看到TCQ_F_CAN_BYPASS,这个flag置位,就表明数据包发送不一定非得走队列的规则,可以by pass这个规则,直接通过发送到driver,不过在一般没有阻塞的通讯状况下,有了这个flag,基本就都是直接发送出去了!
- static int pfifo_fast_init(struct Qdisc *qdisc, struct nlattr *opt)
- {
- int prio;
- struct pfifo_fast_priv *priv = qdisc_priv(qdisc);
-
- for (prio = 0; prio < PFIFO_FAST_BANDS; prio++)
- __skb_queue_head_init(band2list(priv, prio));
-
-
- qdisc->flags |= TCQ_F_CAN_BYPASS;
- return 0;
- }
Enqueue
回归到正题,假设interface就是使用的pfifo_fast Qdisc规则,那么我们调用的enqueue直接走到pfifo_fast_enqueue,在里面就直接放到队列里,如果超出了最大的积攒数量就DROP掉了,返回NET_XMIT_DROP
- static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc)
- {
-
- if (skb_queue_len(&qdisc->q) < qdisc_dev(qdisc)->tx_queue_len) {
- int band = prio2band[skb->priority & TC_PRIO_MAX];
- struct pfifo_fast_priv *priv = qdisc_priv(qdisc);
- struct sk_buff_head *list = band2list(priv, band);
-
- priv->bitmap |= (1 << band);
- qdisc->q.qlen++;
-
- return __qdisc_enqueue_tail(skb, qdisc, list);
- }
-
- return qdisc_drop(skb, qdisc);
- }
__qdisc_run
再回到__dev_xmit_skb 这个函数,enqueue完毕之后,如果这个Qdisc不是Running状态,就开启Running状态,然后调用了这个函数__qdisc_run(q);
在其中主要是调用了qdisc_restart来从队列中dequeue出封包,然后再调用sch_direct_xmit函数去直接发送封包,这个函数我们上面有分析过,就是直接发送给driver了
然后出现BUSY的就requeue到队列中。因为有可能从队列中取封包,所以这个函数可能发若干个包,要注意的是这些发送封包的过程都是出于process context
- void __qdisc_run(struct Qdisc *q)
- {
- int quota = weight_p;
- int packets;
-
-
- while (qdisc_restart(q, &packets)) {
-
-
- quota -= packets;
- if (quota <= 0 || need_resched()) {
- __netif_schedule(q);
- break;
- }
- }
-
- qdisc_run_end(q);
- }
-
- static inline int qdisc_restart(struct Qdisc *q, int *packets)
- {
- struct netdev_queue *txq;
- struct net_device *dev;
- spinlock_t *root_lock;
- struct sk_buff *skb;
- bool validate;
-
-
-
- skb = dequeue_skb(q, &validate, packets);
- if (unlikely(!skb))
- return 0;
-
- root_lock = qdisc_lock(q);
- dev = qdisc_dev(q);
- txq = skb_get_tx_queue(dev, skb);
-
- return sch_direct_xmit(skb, q, dev, txq, root_lock, validate);
- }
ok,假设我们已经发送了若干个封包了,已经超过64个,那么会调用__netif_schedule去打开softirq利用软中断去发送在queue的封包
- void __netif_schedule(struct Qdisc *q)
- {
- if (!test_and_set_bit(__QDISC_STATE_SCHED, &q->state))
- __netif_reschedule(q);
- }
- static inline void __netif_reschedule(struct Qdisc *q)
- {
- struct softnet_data *sd;
- unsigned long flags;
-
- local_irq_save(flags);
-
-
- sd = this_cpu_ptr(&softnet_data);
- q->next_sched = NULL;
-
- *sd->output_queue_tailp = q;
- sd->output_queue_tailp = &q->next_sched;
- raise_softirq_irqoff(NET_TX_SOFTIRQ);
- local_irq_restore(flags);
- }
net_tx_action
net_tx_action便是软中断的执行函数,主要是做2件事情
第一件事情就是free使用完的skb,driver一般发送完数据之后,就会调用dev_kfree_skb_irq 届时这个地方就是来free的
第二件事情就是调用qdisc_run发送数据包了,重复之前上面的分析了...
- static void net_tx_action(struct softirq_action *h)
- {
- struct softnet_data *sd = this_cpu_ptr(&softnet_data);
-
-
-
- if (sd->completion_queue) {
- struct sk_buff *clist;
-
- local_irq_disable();
- clist = sd->completion_queue;
- sd->completion_queue = NULL;
- local_irq_enable();
-
- while (clist) {
- struct sk_buff *skb = clist;
- clist = clist->next;
-
- WARN_ON(atomic_read(&skb->users));
- if (likely(get_kfree_skb_cb(skb)->reason == SKB_REASON_CONSUMED))
- trace_consume_skb(skb);
- else
- trace_kfree_skb(skb, net_tx_action);
- __kfree_skb(skb);
- }
- }
-
- if (sd->output_queue) {
- struct Qdisc *head;
-
- local_irq_disable();
- head = sd->output_queue;
- sd->output_queue = NULL;
- sd->output_queue_tailp = &sd->output_queue;
- local_irq_enable();
-
- while (head) {
- struct Qdisc *q = head;
- spinlock_t *root_lock;
-
- head = head->next_sched;
-
- root_lock = qdisc_lock(q);
- if (spin_trylock(root_lock)) {
- smp_mb__before_atomic();
- clear_bit(__QDISC_STATE_SCHED,
- &q->state);
- qdisc_run(q);
- spin_unlock(root_lock);
- } else {
- if (!test_bit(__QDISC_STATE_DEACTIVATED,
- &q->state)) {
- __netif_reschedule(q);
- } else {
- smp_mb__before_atomic();
- clear_bit(__QDISC_STATE_SCHED,
- &q->state);
- }
- }
- }
- }
- }
到这里,整个netdevice层面,Network的Tx flow已经基本分析完了,进行简单的总结!
总结
1.从宏观上来看,根本设备有无enqueue的方法,可以分成两种发送数据包的方式,第一就是有拥塞控制的数据传输,第二个就是什么都没有的直接传输到driver的,当然大部分的于外界沟通的interface都属于第一种,像loopback,tunnel一些设备就属于第二种没有enqueue的!
2. 对于有拥塞控制的数据传输,也有2条路径,第一条就是在满足3个前提条件下,直接发送数据包到硬件,和上述第二种case是一样的, 第二条就是出现拥塞的状况,就是有封包发送不成功,或者数据包量比较大的状况,这时候会用到enqueue,应该是保证顺序,所以一般q有包的状况 就都需要enqueue,然后再去dequeue发送到硬件,毕竟进程的上下文不会让你过多的占用时间,有一定的量的限制,限制条件到了就会中断发送,改用软中断的方式!
Linux网络之设备接口层:发送数据包流程dev_queue_xmit
标签:down read tail eve 返回值 调用 知识 scm 第一条
原文地址:http://www.cnblogs.com/x_wukong/p/6650046.html