红联Linux门户
Linux帮助

Linux内核学习:netlink的内核实现原理

发布时间:2014-12-12 15:26:22来源:linux网站作者:wanderer-zjhit

注:
当用户态进程发送数据时,调用sendmsg实现,其调用内核netlink_sendmsg函数完成,新建了sk_buff,然后给其cb私有缓存中保存了源地址信息,然后把数据拷贝到sk_buff中[nlmsghdr头部已经附在数据部分前面,作为数据部分了]然后利用netlink_unicast发送出去
而当内核态发送时,新建了一个sk_buff,头部填写了nlmsghdr[利用了nlmsg_put实现]结构信息,然后是数据部分,而且给其cb私有部分NETLINK_CB(skb_1)填写了本地信息,然后利用netlink_unicast发送出去


netlink结构体简析
netlink的实现主要在/net/netlink/af_netlink.c文件中,结构如下:
1 内核中的netlink协议类型,AF_NETLINK为netlink协议族
#define NETLINK_ROUTE  0 /* Routing/device hook    */
#define NETLINK_UNUSED  1 /* Unused number    */
#define NETLINK_USERSOCK 2 /* Reserved for user mode socket protocols  */
#define NETLINK_FIREWALL 3 /* Firewalling hook    */
#define NETLINK_INET_DIAG 4 /* INET socket monitoring   */
#define NETLINK_NFLOG  5 /* netfilter/iptables ULOG */
#define NETLINK_XFRM  6 /* ipsec */
#define NETLINK_SELINUX  7 /* SELinux event notifications */
#define NETLINK_ISCSI  8 /* Open-iSCSI */
#define NETLINK_AUDIT  9 /* auditing */
#define NETLINK_FIB_LOOKUP 10
#define NETLINK_CONNECTOR 11
#define NETLINK_NETFILTER 12 /* netfilter subsystem */
#define NETLINK_IP6_FW  13
#define NETLINK_DNRTMSG  14 /* DECnet routing messages */
#define NETLINK_KOBJECT_UEVENT 15 /* Kernel messages to userspace */
#define NETLINK_GENERIC  16
/* leave room for NETLINK_DM (DM Events) */
#define NETLINK_SCSITRANSPORT 18 /* SCSI Transports */
#define NETLINK_ECRYPTFS 19
#define MAX_LINKS 32 
2 netlink 地址格式
struct sockaddr_nl
{
 sa_family_t nl_family; /* AF_NETLINK */
 unsigned short nl_pad;  /* zero  */
 __u32  nl_pid;  /* port ID */
        __u32  nl_groups; /* multicast groups mask */
};
3 netlink消息头
struct nlmsghdr
{
 __u32  nlmsg_len; /* Length of message including header */
 __u16  nlmsg_type; /* Message content */
 __u16  nlmsg_flags; /* Additional flags */
 __u32  nlmsg_seq; /* Sequence number */
 __u32  nlmsg_pid; /* Sending process port ID */
};
4 netlink 套接字结构
struct netlink_sock {
 /* struct sock has to be the first member of netlink_sock */
 struct sock  sk;
 u32   pid; //内核自己的pid,=0
 u32   dst_pid;
 u32   dst_group;对方的组
 u32   flags;
 u32   subscriptions;
 u32   ngroups;组数量
 unsigned long  *groups;组号
 unsigned long  state;
 wait_queue_head_t wait; 进程在接收数据包时等待队列
 struct netlink_callback *cb;
 struct mutex  *cb_mutex;
 struct mutex  cb_def_mutex;
 void   (*netlink_rcv)(struct sk_buff *skb); //内核态接收到用户态信息后的处理函数
 struct module  *module;
};
5 skb_buff结构中对应的netlink相关的信息
内核态存储自己发送地址等信息,在数据报传到用户态后,用户态可能要获取其中信息
struct netlink_skb_parms
{
 struct ucred  creds;    /* Skb credentials */
 __u32   pid;
 __u32   dst_group;
 kernel_cap_t  eff_cap;
 __u32   loginuid; /* Login (audit) uid */
 __u32   sessionid; /* Session id (audit) */
 __u32   sid;  /* SELinux security id */
};
#define NETLINK_CB(skb)  (*(struct netlink_skb_parms*)&((skb)->cb))
6 内核中所有的netlink套接字存储在一个全局的哈新表中,该结构定义如下
static struct netlink_table *nl_table;其中每个协议对应一个哈希表,所有的同一种协议的数据报散列在同哈希表中
下面为一种协议所连接的哈希表结构:
struct netlink_table {
struct nl_pid_hash hash;   // 根据pid进行HASH的netlink sock链表, 相当于客户端链表
struct hlist_head mc_list; // 多播的sock链表
unsigned long *listeners;  // 监听者标志
unsigned int nl_nonroot;
unsigned int groups;  // 每个netlink的协议类型可以定义多个组, 8的倍数,最小是32
struct module *module;
int registered;
};
最大可有MAX_LINKS(32)个表,处理不同协议类型的netlink套接口, 注意由于是自身的通信, 本机同时作为服务器和客户端, 服务端需要一个套接口对应, 每个客户端也要有一个套接口对应, 多个客户端的套接口形成一个链表.
struct nl_pid_hash {
struct hlist_head *table; // 链表节点,每个桶中协议的sock连入其中,根据哈希值可得确定的sock
unsigned long rehash_time; // 重新计算HASH的时间间隔
unsigned int mask;
unsigned int shift;
unsigned int entries;     // 链表节点数
unsigned int max_shift;  // 最大幂值
u32 rnd;   // 随机数
};
7 一些与netlink相关的宏
#define NLMSG_ALIGNTO 4
#define NLMSG_ALIGN(len) ( ((len)+NLMSG_ALIGNTO-1) & ~(NLMSG_ALIGNTO-1) )
#define NLMSG_HDRLEN  ((int) NLMSG_ALIGN(sizeof(struct nlmsghdr)))
#define NLMSG_LENGTH(len) ((len)+NLMSG_ALIGN(NLMSG_HDRLEN))
#define NLMSG_SPACE(len) NLMSG_ALIGN(NLMSG_LENGTH(len)) 含len长度数据+nlmsghdr的总长度
#define NLMSG_DATA(nlh)  ((void*)(((char*)nlh) + NLMSG_LENGTH(0))) 获nlmsghdr头部之后的数据
#define NLMSG_NEXT(nlh,len)  ((len) -= NLMSG_ALIGN((nlh)->nlmsg_len), \
      (struct nlmsghdr*)(((char*)(nlh)) + NLMSG_ALIGN((nlh)->nlmsg_len)))
#define NLMSG_OK(nlh,len) ((len) >= (int)sizeof(struct nlmsghdr) && \
      (nlh)->nlmsg_len >= sizeof(struct nlmsghdr) && \
      (nlh)->nlmsg_len <= (len))
#define NLMSG_PAYLOAD(nlh,len) ((nlh)->nlmsg_len - NLMSG_SPACE((len)))
8 netlink的函数操作集合[挂载于socket->ops结构中]
static const struct proto_ops netlink_ops = {
 .family = PF_NETLINK,
 .owner = THIS_MODULE,
 .release = netlink_release,
 .bind =  netlink_bind,
 .connect = netlink_connect,
 .socketpair = sock_no_socketpair,
 .accept = sock_no_accept,
 .getname = netlink_getname,
 .poll =  datagram_poll,
 .ioctl = sock_no_ioctl,
 .listen = sock_no_listen,
 .shutdown = sock_no_shutdown,
 .setsockopt = netlink_setsockopt,
 .getsockopt = netlink_getsockopt,
 .sendmsg = netlink_sendmsg,   netlink 套接字实际的发送和接受函数
 .recvmsg = netlink_recvmsg,
 .mmap =  sock_no_mmap,
 .sendpage = sock_no_sendpage,
};


函数跟踪分析
1 内核中创建netlink函数
建立socket、sock结构并初始化
/*
 * We export these functions to other modules. They provide a
 * complete set of kernel non-blocking support for message
 * queueing.
 */

struct sock *
netlink_kernel_create(struct net *net, int unit, unsigned int groups,
        void (*input)(struct sk_buff *skb),struct mutex *cb_mutex, struct module *module)
{
 struct socket *sock;
 struct sock *sk;
 struct netlink_sock *nlk;
 unsigned long *listeners = NULL;

 BUG_ON(!nl_table);

 if (unit < 0 || unit >= MAX_LINKS)
  return NULL;

 if (sock_create_lite(PF_NETLINK, SOCK_DGRAM, unit, &sock)) //创建socket结构
  return NULL;

 /*
  * We have to just have a reference on the net from sk, but don't
  * get_net it. Besides, we cannot get and then put the net here.
  * So we create one inside init_net and the move it to net.
  */

 if (__netlink_create(&init_net, sock, cb_mutex, unit) < 0)  创建sock结构,并初始化
  goto out_sock_release_nosk;

 sk = sock->sk;
 sk_change_net(sk, net);

 if (groups < 32)
  groups = 32;

 listeners = kzalloc(NLGRPSZ(groups) + sizeof(struct listeners_rcu_head),
       GFP_KERNEL);
 if (!listeners)
  goto out_sock_release;

 sk->sk_data_ready = netlink_data_ready; //什么都不做
 if (input)
  nlk_sk(sk)->netlink_rcv = input; //设置内核态的接受函数

 if (netlink_insert(sk, net, 0))
  goto out_sock_release;

 nlk = nlk_sk(sk);取得sock嵌入的netlink_sock结构体
 nlk->flags |= NETLINK_KERNEL_SOCKET;

 netlink_table_grab();
 if (!nl_table[unit].registered) {
  nl_table[unit].groups = groups;
  nl_table[unit].listeners = listeners;
  nl_table[unit].cb_mutex = cb_mutex;
  nl_table[unit].module = module;
  nl_table[unit].registered = 1; 更新netlink_table结构体信息,每中协议对应一个netlink_table结构
 } else {
  kfree(listeners);
  nl_table[unit].registered++;
 }
 netlink_table_ungrab();
 return sk;

out_sock_release:
 kfree(listeners);
 netlink_kernel_release(sk);
 return NULL;

out_sock_release_nosk:
 sock_release(sock);
 return NULL;
}
EXPORT_SYMBOL(netlink_kernel_create);
1.1 创建socket结构
int sock_create_lite(int family, int type, int protocol, struct socket **res)
{
 int err;
 struct socket *sock = NULL;

 err = security_socket_create(family, type, protocol, 1); 空
 if (err)
  goto out;

 sock = sock_alloc();
 if (!sock) {
  err = -ENOMEM;
  goto out;
 }

 sock->type = type;
 err = security_socket_post_create(sock, family, type, protocol, 1);//啥都不干
 if (err)
  goto out_release;

out:
 *res = sock;
 return err;
out_release:
 sock_release(sock);
 sock = NULL;
 goto out;
}
1.2 创建sock结构并初始化
static int __netlink_create(struct net *net, struct socket *sock,
       struct mutex *cb_mutex, int protocol)
{
 struct sock *sk;
 struct netlink_sock *nlk;

 sock->ops = &netlink_ops;

 sk = sk_alloc(net, PF_NETLINK, GFP_KERNEL, &netlink_proto);//struct proto没用
 if (!sk)
  return -ENOMEM;

 sock_init_data(sock, sk); // sk和sock初始化

 nlk = nlk_sk(sk); 取得netlink_sock结构
 if (cb_mutex)
  nlk->cb_mutex = cb_mutex;
 else {
  nlk->cb_mutex = &nlk->cb_def_mutex;
  mutex_init(nlk->cb_mutex);
 }
 init_waitqueue_head(&nlk->wait); //初始化等待队列

 sk->sk_destruct = netlink_sock_destruct;
 sk->sk_protocol = protocol;
 return 0;
}
1.2.1 sock结构和socket结构初始化过程
void sock_init_data(struct socket *sock, struct sock *sk)
{
 skb_queue_head_init(&sk->sk_receive_queue);
 skb_queue_head_init(&sk->sk_write_queue);
 skb_queue_head_init(&sk->sk_error_queue);
#ifdef CONFIG_NET_DMA
 skb_queue_head_init(&sk->sk_async_wait_queue);
#endif

 sk->sk_send_head = NULL;

 init_timer(&sk->sk_timer);

 sk->sk_allocation = GFP_KERNEL;
 sk->sk_rcvbuf  = sysctl_rmem_default;
 sk->sk_sndbuf  = sysctl_wmem_default;
 sk->sk_state  = TCP_CLOSE;
 sk_set_socket(sk, sock);

 sock_set_flag(sk, SOCK_ZAPPED);

 if (sock) {
  sk->sk_type = sock->type;
  sk->sk_sleep = &sock->wait;
  sock->sk = sk;
 } else
  sk->sk_sleep = NULL;

 rwlock_init(&sk->sk_dst_lock);
 rwlock_init(&sk->sk_callback_lock);
 lockdep_set_class_and_name(&sk->sk_callback_lock,
   af_callback_keys + sk->sk_family,
   af_family_clock_key_strings[sk->sk_family]);

 sk->sk_state_change = sock_def_wakeup;
 sk->sk_data_ready = sock_def_readable;
 sk->sk_write_space = sock_def_write_space;
 sk->sk_error_report = sock_def_error_report;
 sk->sk_destruct  = sock_def_destruct;

 sk->sk_sndmsg_page = NULL;
 sk->sk_sndmsg_off = 0;

 sk->sk_peercred.pid  = 0;
 sk->sk_peercred.uid = -1;
 sk->sk_peercred.gid = -1;
 sk->sk_write_pending = 0;
 sk->sk_rcvlowat  = 1;
 sk->sk_rcvtimeo  = MAX_SCHEDULE_TIMEOUT;
 sk->sk_sndtimeo  = MAX_SCHEDULE_TIMEOUT;

 sk->sk_stamp = ktime_set(-1L, 0);

 /*
  * Before updating sk_refcnt, we must commit prior changes to memory
  * (Documentation/RCU/rculist_nulls.txt for details)
  */
 smp_wmb();
 atomic_set(&sk->sk_refcnt, 1);
 atomic_set(&sk->sk_drops, 0);
}
2 新建sk_buff结构,当内核态向用户态发送信息时,必须得建新的sk_buff结构
static inline struct sk_buff *alloc_skb(unsigned int size,
     gfp_t priority)
{
 return __alloc_skb(size, priority, 0, -1);
}
/**
 * __alloc_skb - allocate a network buffer
 * @size: size to allocate
 * @gfp_mask: allocation mask
 * @fclone: allocate from fclone cache instead of head cache
 *  and allocate a cloned (child) skb
 * @node: numa node to allocate memory on
 *
 * Allocate a new &sk_buff. The returned buffer has no headroom and a
 * tail room of size bytes. The object has a reference count of one.
 * The return is the buffer. On a failure the return is %NULL.
 *
 * Buffers may only be allocated from interrupts using a @gfp_mask of
 * %GFP_ATOMIC.
 */
struct sk_buff *__alloc_skb(unsigned int size, gfp_t gfp_mask,
       int fclone, int node)
{
 struct kmem_cache *cache;
 struct skb_shared_info *shinfo;
 struct sk_buff *skb;
 u8 *data;

 cache = fclone ? skbuff_fclone_cache : skbuff_head_cache;

 /* Get the HEAD */
 skb = kmem_cache_alloc_node(cache, gfp_mask & ~__GFP_DMA, node);
 if (!skb)
  goto out;

 size = SKB_DATA_ALIGN(size);
 data = kmalloc_node_track_caller(size + sizeof(struct skb_shared_info),
   gfp_mask, node); //返回新建空间首地址
 if (!data)
  goto nodata;

 /*
  * Only clear those fields we need to clear, not those that we will
  * actually initialise below. Hence, don't put any more fields after
  * the tail pointer in struct sk_buff!
  */
 memset(skb, 0, offsetof(struct sk_buff, tail));
 skb->truesize = size + sizeof(struct sk_buff);
 atomic_set(&skb->users, 1);
 skb->head = data;
 skb->data = data;
 skb_reset_tail_pointer(skb); //skb->tail = skb->data在刚开始时候
 skb->end = skb->tail + size;
 kmemcheck_annotate_bitfield(skb, flags1);
 kmemcheck_annotate_bitfield(skb, flags2);
#ifdef NET_SKBUFF_DATA_USES_OFFSET
 skb->mac_header = ~0U;
#endif

 /* make sure we initialize shinfo sequentially */
 shinfo = skb_shinfo(skb);该表skb_buff尾部结构信息
 atomic_set(&shinfo->dataref, 1);
 shinfo->nr_frags  = 0;
 shinfo->gso_size = 0;
 shinfo->gso_segs = 0;
 shinfo->gso_type = 0;
 shinfo->ip6_frag_id = 0;
 shinfo->tx_flags.flags = 0;
 skb_frag_list_init(skb);
 memset(&shinfo->hwtstamps, 0, sizeof(shinfo->hwtstamps));

 if (fclone) {
  struct sk_buff *child = skb + 1;
  atomic_t *fclone_ref = (atomic_t *) (child + 1);

  kmemcheck_annotate_bitfield(child, flags1);
  kmemcheck_annotate_bitfield(child, flags2);
  skb->fclone = SKB_FCLONE_ORIG;
  atomic_set(fclone_ref, 1);

  child->fclone = SKB_FCLONE_UNAVAILABLE;
 }
out:
 return skb;
nodata:
 kmem_cache_free(cache, skb);
 skb = NULL;
 goto out;
}
3 释放以使用完毕的sk_buff空间
/**
 * kfree_skb - free an sk_buff
 * @skb: buffer to free
 *
 * Drop a reference to the buffer and free it if the usage count has
 * hit zero.
 */
void kfree_skb(struct sk_buff *skb)
{
 if (unlikely(!skb))
  return;
 if (likely(atomic_read(&skb->users) == 1))
  smp_rmb();
 else if (likely(!atomic_dec_and_test(&skb->users)))
  return;
 trace_kfree_skb(skb, __builtin_return_address(0));
 __kfree_skb(skb);
}
4 kernel向user发送的sk_buff信息中格式为: nlmsghdr+内容,所以在内核态分配好并初始化好sk_buff之后,可以利用nlmsg_put函数来填写其nlmsghdr头部[只是一些相关协议的信息,与地址等无关]
/**
 * nlmsg_put - Add a new netlink message to an skb
 * @skb: socket buffer to store message in
 * @pid: netlink process id,for kernel ,pid=0
 * @seq: sequence number of message
 * @type: message type
 * @payload: length of message payload 该数据信息可承载的实际数据大小
 * @flags: message flags
 *
 * Returns NULL if the tailroom of the skb is insufficient to store
 * the message header and payload.
 */
static inline struct nlmsghdr *nlmsg_put(struct sk_buff *skb, u32 pid, u32 seq,
      int type, int payload, int flags)
{
 if (unlikely(skb_tailroom(skb) < nlmsg_total_size(payload)))
  return NULL;

 return __nlmsg_put(skb, pid, seq, type, payload, flags);
}
内核中填写nlmsghdr头部信息
static __inline__ struct nlmsghdr *
__nlmsg_put(struct sk_buff *skb, u32 pid, u32 seq, int type, int len, int flags)
{
 struct nlmsghdr *nlh;
 int size = NLMSG_LENGTH(len);  size = len+aline_to_4(nlmsghdr)

 nlh = (struct nlmsghdr*)skb_put(skb, NLMSG_ALIGN(size)); tail=data+nlmsg_align(size),返回data
 nlh->nlmsg_type = type;
 nlh->nlmsg_len = size;
 nlh->nlmsg_flags = flags;
 nlh->nlmsg_pid = pid;
 nlh->nlmsg_seq = seq;
 if (!__builtin_constant_p(size) || NLMSG_ALIGN(size) - size != 0)
  memset(NLMSG_DATA(nlh) + len, 0, NLMSG_ALIGN(size) - size);
 return nlh;
}
unsigned char *skb_put(struct sk_buff *skb, unsigned int len)
{
 unsigned char *tmp = skb_tail_pointer(skb);
 SKB_LINEAR_ASSERT(skb);
 skb->tail += len;
 skb->len  += len;
 if (unlikely(skb->tail > skb->end))
  skb_over_panic(skb, len, __builtin_return_address(0));
 return tmp;
}
5 内核态向用户态发送信息的函数,pid指向用户态的进程
int netlink_unicast(struct sock *ssk, struct sk_buff *skb,u32 pid, int nonblock)
{
 struct sock *sk;
 int err;
 long timeo;

 skb = netlink_trim(skb, gfp_any());

 timeo = sock_sndtimeo(ssk, nonblock);
retry:
 sk = netlink_getsockbypid(ssk, pid); 找到内核中对应该pid和SOCK_DGRAM协议的sock结构,挂载在哈希表中,这样可以找到另一端的sock结构,另一端可以为:用户态也可以为另一个内核端
 if (IS_ERR(sk)) {
  kfree_skb(skb);
  return PTR_ERR(sk);
 }
 if (netlink_is_kernel(sk)) //另一端也为内核端,调用其netlink_sock的input函数接受
  return netlink_unicast_kernel(sk, skb);

 if (sk_filter(sk, skb)) {
  err = skb->len;
  kfree_skb(skb);
  sock_put(sk);
  return err;
 }

 err = netlink_attachskb(sk, skb, &timeo, ssk);
 if (err == 1)
  goto retry;
 if (err)
  return err;

 return netlink_sendskb(sk, skb); //挂载到用户态sock结构的receive_queue队列中, 然后调用sk_data_ready函数,唤醒用户态睡眠函数
}
EXPORT_SYMBOL(netlink_unicast);
5-0 netlink_getsockbypid
static struct sock *netlink_getsockbypid(struct sock *ssk, u32 pid)
{
 struct sock *sock;
 struct netlink_sock *nlk;

 sock = netlink_lookup(sock_net(ssk), ssk->sk_protocol, pid);
 if (!sock)
  return ERR_PTR(-ECONNREFUSED);

 /* Don't bother queuing skb if kernel socket has no input function */
 nlk = nlk_sk(sock);
 if (sock->sk_state == NETLINK_CONNECTED &&
     nlk->dst_pid != nlk_sk(ssk)->pid) {
  sock_put(sock);
  return ERR_PTR(-ECONNREFUSED);
 }
 return sock;
}
查找主函数:
static inline struct sock *netlink_lookup(struct net *net, int protocol,//协议,可以自己定义
       u32 pid)
{
 struct nl_pid_hash *hash = &nl_table[protocol].hash;找到哈希表的头
 struct hlist_head *head;
 struct sock *sk;
 struct hlist_node *node;

 read_lock(&nl_table_lock);
 head = nl_pid_hashfn(hash, pid);根据pid散列值找到sock所在的链表
 sk_for_each(sk, node, head) {  在链表中匹配
  if (net_eq(sock_net(sk), net) && (nlk_sk(sk)->pid == pid)) {
   sock_hold(sk);
   goto found;
  }
 }
 sk = NULL;
found:
 read_unlock(&nl_table_lock);
 return sk;
}
5-1  将sock和sk_buff绑定在一起
/*
 * Attach a skb to a netlink socket.
 * The caller must hold a reference to the destination socket. On error, the
 * reference is dropped. The skb is not send to the destination, just all
 * all error checks are performed and memory in the queue is reserved.
 * Return values:
 * < 0: error. skb freed, reference to sock dropped.
 * 0: continue
 * 1: repeat lookup - reference dropped while waiting for socket memory.
 */
int netlink_attachskb(struct sock *sk, struct sk_buff *skb,
        long *timeo, struct sock *ssk)
{
 struct netlink_sock *nlk;

 nlk = nlk_sk(sk);
如果sock的接受缓冲区小于sock中接受队列已提交的字节数,或者还有数据数据未被上层处理
if (atomic_read(&sk->sk_rmem_alloc) > sk->sk_rcvbuf ||test_bit(0, &nlk->state)) {
  DECLARE_WAITQUEUE(wait, current); 声明等待队列
  if (!*timeo) {
   if (!ssk || netlink_is_kernel(ssk))
       netlink_overrun(sk);
   sock_put(sk);
   kfree_skb(skb);
   return -EAGAIN;
  }

  __set_current_state(TASK_INTERRUPTIBLE);
  add_wait_queue(&nlk->wait, &wait);  设置自身状态,等待睡眠

  if ((atomic_read(&sk->sk_rmem_alloc) > sk->sk_rcvbuf ||
       test_bit(0, &nlk->state)) &&
      !sock_flag(sk, SOCK_DEAD))
   *timeo = schedule_timeout(*timeo);

  __set_current_state(TASK_RUNNING);
  remove_wait_queue(&nlk->wait, &wait);
  sock_put(sk);递减sk引用次数

  if (signal_pending(current)) {
   kfree_skb(skb);
   return sock_intr_errno(*timeo);
  }
  return 1; //重新将sk_buff放在客户端sock上
 }
 skb_set_owner_r(skb, sk);
 return 0;
}
改变sock->sk_rmem_alloc大小,以及skb->sk变量
static inline void skb_set_owner_r(struct sk_buff *skb, struct sock *sk)
{
 skb_orphan(skb);
 skb->sk = sk;
 skb->destructor = sock_rfree;
 atomic_add(skb->truesize, &sk->sk_rmem_alloc);
 sk_mem_charge(sk, skb->truesize);
}
5.2 内核态将数据发送出去
int netlink_sendskb(struct sock *sk, struct sk_buff *skb)
{
 int len = skb->len;

 skb_queue_tail(&sk->sk_receive_queue, skb);
 sk->sk_data_ready(sk, len);唤醒等待进程,进行接受工作
 sock_put(sk);
 return len;
}
6 用户态接受消息
此时内核态将数据发送到用户态对应的sock的sk_receive_queue中,并且唤醒睡眠的进程
static int netlink_recvmsg(struct kiocb *kiocb, struct socket *sock,
      struct msghdr *msg, size_t len,
      int flags)
{//sock的io控制块
 struct sock_iocb *siocb = kiocb_to_siocb(kiocb);
 struct scm_cookie scm;
 struct sock *sk = sock->sk; 获取用户态进程的sock结构
 struct netlink_sock *nlk = nlk_sk(sk);
 int noblock = flags&MSG_DONTWAIT;
 size_t copied;
 struct sk_buff *skb, *data_skb;
 int err;

 if (flags&MSG_OOB)
  return -EOPNOTSUPP;

 copied = 0;

 skb = skb_recv_datagram(sk, flags, noblock, &err);从等待队列中接受一个数据包
 if (skb == NULL)
  goto out;

 data_skb = skb;

#ifdef CONFIG_COMPAT_NETLINK_MESSAGES
 if (unlikely(skb_shinfo(skb)->frag_list)) {
  /*
   * If this skb has a frag_list, then here that means that we
   * will have to use the frag_list skb's data for compat tasks
   * and the regular skb's data for normal (non-compat) tasks.
   *
   * If we need to send the compat skb, assign it to the
   * 'data_skb' variable so that it will be used below for data
   * copying. We keep 'skb' for everything else, including
   * freeing both later.
   */
  if (flags & MSG_CMSG_COMPAT)
   data_skb = skb_shinfo(skb)->frag_list;
 }
#endif

 msg->msg_namelen = 0;

 copied = data_skb->len; 接收数据的长度
 if (len < copied) {  //如果用户需要的长度小于数据报的长度,则丢弃剩余的数据包
  msg->msg_flags |= MSG_TRUNC;
  copied = len;
 }

 skb_reset_transport_header(data_skb); //将数据包从sk_buff中拷贝到msghdr结构中
 err = skb_copy_datagram_iovec(data_skb, 0, msg->msg_iov, copied);

 if (msg->msg_name) { 如果要求得到kernel的struct netlink_nl结构,那么...
  struct sockaddr_nl *addr = (struct sockaddr_nl *)msg->msg_name;
  addr->nl_family = AF_NETLINK;
  addr->nl_pad    = 0;
  addr->nl_pid = NETLINK_CB(skb).pid; //内核态自己的pid信息
  addr->nl_groups = netlink_group_mask(NETLINK_CB(skb).dst_group);//发送分组
  msg->msg_namelen = sizeof(*addr);
 }
接收数据报信息标志,将消息头拷贝到用户空间
if (nlk->flags & NETLINK_RECV_PKTINFO)
  netlink_cmsg_recv_pktinfo(msg, skb);

 if (NULL == siocb->scm) {
  memset(&scm, 0, sizeof(scm));
  siocb->scm = &scm;
 }
 siocb->scm->creds = *NETLINK_CREDS(skb);
 if (flags & MSG_TRUNC)
  copied = data_skb->len;

 skb_free_datagram(sk, skb); //释放sk_buff数据报信息

 if (nlk->cb && atomic_read(&sk->sk_rmem_alloc) <= sk->sk_rcvbuf / 2)
  netlink_dump(sk);

 scm_recv(sock, msg, siocb->scm, flags);
out:
 netlink_rcv_wake(sk);接受唤醒
 return err ? : copied;
}
6.1 从sock->sk_receive_buf中摘除一个sk_buff结构
从sock等待队列中摘除一个sk_buff结构:
struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned flags,
        int *peeked, int *err)
{
 struct sk_buff *skb;
 long timeo;
 /*
  * Caller is allowed not to check sk->sk_err before skb_recv_datagram()
  */
 int error = sock_error(sk);

 if (error)
  goto no_packet;

 timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);

 do {
  /* Again only user level code calls this function, so nothing
   * interrupt level will suddenly eat the receive_queue.
   *
   * Look at current nfs client by the way...
   * However, this function was corrent in any case. 8)
   */
  unsigned long cpu_flags;

  spin_lock_irqsave(&sk->sk_receive_queue.lock, cpu_flags);
  skb = skb_peek(&sk->sk_receive_queue);
  if (skb) {
   *peeked = skb->peeked;
   if (flags & MSG_PEEK) {
    skb->peeked = 1;
    atomic_inc(&skb->users);
   } else
    __skb_unlink(skb, &sk->sk_receive_queue);
  }
  spin_unlock_irqrestore(&sk->sk_receive_queue.lock, cpu_flags);

  if (skb)
   return skb;

  /* User doesn't want to wait */
  error = -EAGAIN;
  if (!timeo)
   goto no_packet;

 } while (!wait_for_packet(sk, err, &timeo));

 return NULL;

no_packet:
 *err = error;
 return NULL;
}
摘除过程:
static inline void __skb_unlink(struct sk_buff *skb, struct sk_buff_head *list)
{
 struct sk_buff *next, *prev;

 list->qlen--;
 next    = skb->next;
 prev    = skb->prev;
 skb->next  = skb->prev = NULL;
 next->prev = prev;
 prev->next = next;
}
6.2 将该sk_buff内容转换为msghdr结构
内核态采用netlink_unicast函数发送数据,数据结构存储为:nlmsghdr+data,内核发送即以这种方式组sk_buff
而用户态采用sendmsg等格式,发送数据方式不是sk_buff,而是msghdr格式,该格式定义如下:
/*
 * As we do 4.4BSD message passing we use a 4.4BSD message passing
 * system, not 4.3. Thus msg_accrights(len) are now missing. They
 * belong in an obscure libc emulation or the bin.
 */
struct msghdr {  位于linux/socket.h头文件中
 void * msg_name; /* Socket name   */  一般存储对方(发送方的地址信息,即netlink_nl信息)
 int  msg_namelen; /* Length of name  */
 struct iovec * msg_iov; /* Data blocks   */ 存储来自对方的实际数据信息
 __kernel_size_t msg_iovlen; /* Number of blocks  */
 void  * msg_control; /* Per protocol magic (eg BSD file descriptor passing) */
 __kernel_size_t msg_controllen; /* Length of cmsg list */
 unsigned msg_flags;
};
该数据分组结构单元定义如下:
struct iovec
{
 void __user *      iov_base; /* BSD uses caddr_t (1003.1g requires void *) */
 __kernel_size_t   iov_len; /* Must be size_t (1003.1g) */
};
转换函数如下定义:
/**
 * skb_copy_datagram_iovec - Copy a datagram to an iovec.
 * @skb: buffer to copy
 * @offset: offset in the buffer to start copying from
 * @to: io vector to copy to
 * @len: amount of data to copy from buffer to iovec
 *
 * Note: the iovec is modified during the copy.
 */
int skb_copy_datagram_iovec(const struct sk_buff *skb, int offset,
       struct iovec *to, int len)
{
 int start = skb_headlen(skb);
 int i, copy = start - offset;
 struct sk_buff *frag_iter;

 trace_skb_copy_datagram_iovec(skb, len);

 /* Copy header. */
 if (copy > 0) {
  if (copy > len)
   copy = len;
  if (memcpy_toiovec(to, skb->data + offset, copy))
   goto fault;
  if ((len -= copy) == 0)
   return 0;
  offset += copy;
 }

 /* Copy paged appendix. Hmm... why does this look so complicated? */
 for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
  int end;

  WARN_ON(start > offset + len);

  end = start + skb_shinfo(skb)->frags[i].size;
  if ((copy = end - offset) > 0) {
   int err;
   u8  *vaddr;
   skb_frag_t *frag = &skb_shinfo(skb)->frags[i];
   struct page *page = frag->page;

   if (copy > len)
    copy = len;
   vaddr = kmap(page);
   err = memcpy_toiovec(to, vaddr + frag->page_offset +
          offset - start, copy);
   kunmap(page);
   if (err)
    goto fault;
   if (!(len -= copy))
    return 0;
   offset += copy;
  }
  start = end;
 }

 skb_walk_frags(skb, frag_iter) {
  int end;

  WARN_ON(start > offset + len);

  end = start + frag_iter->len;
  if ((copy = end - offset) > 0) {
   if (copy > len)
    copy = len;
   if (skb_copy_datagram_iovec(frag_iter,
          offset - start,
          to, copy))
    goto fault;
   if ((len -= copy) == 0)
    return 0;
   offset += copy;
  }
  start = end;
 }
 if (!len)
  return 0;

fault:
 return -EFAULT;
}
7 内核态接收用户态发送的数据
static int netlink_sendmsg(struct kiocb *kiocb, struct socket *sock,
      struct msghdr *msg, size_t len)
{
 struct sock_iocb *siocb = kiocb_to_siocb(kiocb);
 struct sock *sk = sock->sk;
 struct netlink_sock *nlk = nlk_sk(sk);
 struct sockaddr_nl *addr = msg->msg_name;/ /目的地址信息
 u32 dst_pid;
 u32 dst_group;
 struct sk_buff *skb;
 int err;
 struct scm_cookie scm;

 if (msg->msg_flags&MSG_OOB)
  return -EOPNOTSUPP;

 if (NULL == siocb->scm)
  siocb->scm = &scm;
 err = scm_send(sock, msg, siocb->scm);
 if (err < 0)
  return err;

 if (msg->msg_namelen) {
  if (addr->nl_family != AF_NETLINK)
   return -EINVAL;
  dst_pid = addr->nl_pid;
  dst_group = ffs(addr->nl_groups);
  if (dst_group && !netlink_capable(sock, NL_NONROOT_SEND))
   return -EPERM;
 } else {
  dst_pid = nlk->dst_pid;
  dst_group = nlk->dst_group;
 }

 if (!nlk->pid) {
  err = netlink_autobind(sock);
  if (err)
   goto out;
 }

 err = -EMSGSIZE;
 if (len > sk->sk_sndbuf - 32)
  goto out;
 err = -ENOBUFS;
 skb = alloc_skb(len, GFP_KERNEL); 分配一个sk_buff结构,将msghdr结构转化为sk_buff结构
 if (skb == NULL)
  goto out;

 NETLINK_CB(skb).pid = nlk->pid;  //填写本地的pid信息
 NETLINK_CB(skb).dst_group = dst_group;
 NETLINK_CB(skb).loginuid = audit_get_loginuid(current);
 NETLINK_CB(skb).sessionid = audit_get_sessionid(current);
 security_task_getsecid(current, &(NETLINK_CB(skb).sid));
 memcpy(NETLINK_CREDS(skb), &siocb->scm->creds, sizeof(struct ucred));

 /* What can I do? Netlink is asynchronous, so that
    we will have to save current capabilities to
    check them, when this message will be delivered
    to corresponding kernel module.   --ANK (980802)
  */

 err = -EFAULT; 数据拷贝进sk_buff中
 if (memcpy_fromiovec(skb_put(skb, len), msg->msg_iov, len)) {
  kfree_skb(skb);
  goto out;
 }

 err = security_netlink_send(sk, skb);
 if (err) {
  kfree_skb(skb);
  goto out;
 }

 if (dst_group) {
  atomic_inc(&skb->users);
  netlink_broadcast(sk, skb, dst_pid, dst_group, GFP_KERNEL);
 }
 err = netlink_unicast(sk, skb, dst_pid, msg->msg_flags&MSG_DONTWAIT);

out:
 return err;
}