golang 通过 netlink 获取 taskstats

by:ksh

开始

开始了解 taskstate 的目的是为了给项目hickwall的agent增加 获取io read/write最高的top进程id功能,这个信息是可以通过读取proc里面的io来获取,但是考虑到要读取所有进程proc文件同样有很大的io消耗,而初衷是为了定位io消耗大的进程以此来发现问题,agent自身的io不能太大,所以暂时放弃这种方式。后来看到centos下有一个 iotop 命令,查看其源码 发现是使用netlink来获取的,于是决定采用netlink来获取。

Netlink是一种特殊的socket,用来实现用户态进程与内核的信息通信。可以通过AF_NETLINK来创建一个netlink套接字(socket),netlink协议由多个子协议组成目前包括:

  • NETLINK_ROUTE
  • NETLINK_W1
  • NETLINK_USERSOCK
  • NETLINK_FIREWALL
  • NETLINK_INET_DIAG
  • NETLINK_NFLOG
  • NETLINK_XFRM
  • NETLINK_SELINUX
  • NETLINK_ISCSI
  • NETLINK_AUDIT
  • NETLINK_FIB_LOOKUP
  • NETLINK_CONNECTOR
  • NETLINK_IP6_FW
  • NETLINK_DNRTMSG
  • NETLINK_KOBJECT_UEVENT
  • NETLINK_GENERIC
  • NETLINK_CRYPTO

taskstats属于NETLINK_GENERIC子协议。NETLINK_GENERIC有特殊的请求头 GenlMsghdr

type GenlMsghdr struct {
	Cmd      uint8
	Version  uint8
	Reserved uint16
}

/usr/include/linux/taskstats.h 有关于 taskstate的定义,首先先将其中的定义转换为go的定义

const (
	TS_COMM_LEN = 32
)

type Taskstats struct {

						    //1) Common and basic accounting fields:
						    /* The version number of this struct. This field is always set to
						     * TAKSTATS_VERSION, which is defined in <linux/taskstats.h>.
						     * Each time the struct is changed, the value should be incremented.
						     */
	Version uint16;

						    /* The exit code of a task. */
	Ac_exitcode uint32;	/* Exit status */

							/* The accounting flags of a task as defined in <linux/acct.h>
							* Defined values are AFORK, ASU, ACOMPAT, ACORE, and AXSIG.
							*/
	Ac_flag uint8;		/* Record flags */

							/* The value of task_nice() of a task. */
	Ac_nice uint8;		/* task_nice */

							//2) Delay accounting fields:
							/* Delay accounting fields start
							 *
							 * All values, until the comment "Delay accounting fields end" are
							 * available only if delay accounting is enabled, even though the last
							 * few fields are not delays
							 *
							 * xxx_count is the number of delay values recorded
							 * xxx_delay_total is the corresponding cumulative delay in nanoseconds
							 *
							 * xxx_delay_total wraps around to zero on overflow
							 * xxx_count incremented regardless of overflow
							 */

							/* Delay waiting for cpu, while runnable
							 * count, delay_total NOT updated atomically
							 */
	Cpu_count uint64;
	Cpu_delay_total uint64;

							/* Following four fields atomically updated using task->delays->lock */

							/* Delay waiting for synchronous block I/O to complete
							 * does not account for delays in I/O submission
							 */
	Blkio_count uint64;
	Blkio_delay_total uint64;

							/* Delay waiting for page fault I/O (swap in only) */
	Swapin_count uint64;
	Swapin_delay_total uint64;

							/* cpu "wall-clock" running time
							 * On some architectures, value will adjust for cpu time stolen
							 * from the kernel in involuntary waits due to virtualization.
							 * Value is cumulative, in nanoseconds, without a corresponding count
							 * and wraps around to zero silently on overflow
							 */
	Cpu_run_real_total uint64;

							/* cpu "virtual" running time
							 * Uses time intervals seen by the kernel i.e. no adjustment
							 * for kernel's involuntary waits due to virtualization.
							 * Value is cumulative, in nanoseconds, without a corresponding count
							 * and wraps around to zero silently on overflow
							 */
	Cpu_run_virtual_total uint64;
							/* Delay accounting fields end */
							/* version 1 ends here */

							/* The name of the command that started this task. */
	Ac_comm [TS_COMM_LEN]byte;	/* Command name */

							/* The scheduling discipline as set in task->policy field. */
	Ac_sched uint64;		/* Scheduling discipline */

	Ac_pad [3]uint8;
	Ac_uid uint32;			/* User ID */
	Ac_gid uint32;			/* Group ID */
	Ac_pid uint32;			/* Process ID */
	Ac_ppid uint32;		/* Parent process ID */

							/* The time when a task begins, in [secs] since 1970. */
	Ac_btime uint32;		/* Begin time [sec since 1970] */

							/* The elapsed time of a task, in [usec]. */
	Ac_etime uint64;		/* Elapsed time [usec] */

							/* The user CPU time of a task, in [usec]. */
	Ac_utime uint64;		/* User CPU time [usec] */

							/* The system CPU time of a task, in [usec]. */
	Ac_stime uint64;		/* System CPU time [usec] */

							/* The minor page fault count of a task, as set in task->min_flt. */
	Ac_minflt uint64;		/* Minor Page Fault Count */

							/* The major page fault count of a task, as set in task->maj_flt. */
	Ac_majflt uint64;		/* Major Page Fault Count */

							//3) Extended accounting fields
							/* Extended accounting fields start */

							/* Accumulated RSS usage in duration of a task, in MBytes-usecs.
							 * The current rss usage is added to this counter every time
							 * a tick is charged to a task's system time. So, at the end we
							 * will have memory usage multiplied by system time. Thus an
							 * average usage per system time unit can be calculated.
							 */
	Coremem uint64;		/* accumulated RSS usage in MB-usec */

							/* Accumulated virtual memory usage in duration of a task.
							* Same as acct_rss_mem1 above except that we keep track of VM usage.
							*/
	Virtmem uint64;		/* accumulated VM usage in MB-usec */

							/* High watermark of RSS usage in duration of a task, in KBytes. */
	Hiwater_rss uint64;		/* High-watermark of RSS usage */

							* High watermark of VM  usage in duration of a task, in KBytes. */
	Hiwater_vm uint64;		/* High-water virtual memory usage */

							/* The following four fields are I/O statistics of a task. */
	Read_char uint64;		/* bytes read */
	Write_char uint64;		/* bytes written */
	Read_syscalls uint64;		/* read syscalls */
	Write_syscalls uint64;		/* write syscalls */

							/* Extended accounting fields end */

							/4) Per-task and per-thread statistics
	Read_bytes uint64;             /* bytes of read I/O */
	Write_bytes uint64;            /* bytes of write I/O */
	Cancelled_write_bytes uint64;  /* bytes of cancelled write I/O */
	Nvcsw uint64;			/* Context voluntary switch counter */
	Nivcsw uint64;			/* Context involuntary switch counter */

							//5) Time accounting for SMT machines
	Ac_utimescaled uint64;		/* utime scaled on frequency etc */
	Ac_stimescaled uint64;		/* stime scaled on frequency etc */
	Cpu_scaled_run_real_total uint64; /* scaled cpu_run_real_total */

							//6) Extended delay accounting fields for memory reclaim
							/* Delay waiting for memory reclaim */
	Freepages_count uint64;
	Freepages_delay_total uint64;
}

const (
	TASKSTATS_CMD_UNSPEC = iota /* Reserved */
	TASKSTATS_CMD_GET           /* user->kernel request/get-response */
	TASKSTATS_CMD_NEW           /* kernel->user event */
	__TASKSTATS_CMD_MAX
)

const (
	TASKSTATS_TYPE_UNSPEC    = iota /* Reserved */
	TASKSTATS_TYPE_PID              /* Process id */
	TASKSTATS_TYPE_TGID             /* Thread group id */
	TASKSTATS_TYPE_STATS            /* taskstats structure */
	TASKSTATS_TYPE_AGGR_PID         /* contains pid + stats */
	TASKSTATS_TYPE_AGGR_TGID        /* contains tgid + stats */
	__TASKSTATS_TYPE_MAX
)

const (
	TASKSTATS_CMD_ATTR_UNSPEC = iota
	TASKSTATS_CMD_ATTR_PID
	TASKSTATS_CMD_ATTR_TGID
	TASKSTATS_CMD_ATTR_REGISTER_CPUMASK
	TASKSTATS_CMD_ATTR_DEREGISTER_CPUMASK
	__TASKSTATS_CMD_ATTR_MAX
)

实现

有了taskstate的golang版定义,就可以通过 syscall.socket 来获取taskstate信息了

import (
	"encoding/binary"
	"github.com/hkwi/nlgo"
	"unsafe"
	"syscall"
	"fmt"
)

type MSG struct {
	Len  uint16
	Type uint16
	Pid  uint32
}

type Attr struct {
	Type int
	Data []byte
}

func GetTaskStats(nlsk *nlgo.NlSock, p int) (t *Taskstats) {
	nlsk.Flags |= nlgo.NL_NO_AUTO_ACK
	const familyID  =  22
	m := &MSG{
		Len:  8,
		Type: TASKSTATS_CMD_ATTR_PID,
		Pid:  uint32(p),
	}

	hdr := (*[nlgo.SizeofGenlMsghdr]byte)(unsafe.Pointer(&nlgo.GenlMsghdr{
		Cmd:     TASKSTATS_CMD_GET,
		Version: 0,
	}))[:]
	req := ((*[8]byte)(unsafe.Pointer(m)))[:]
	length := 4
	pad := ((length + 4 - 1) & (^3)) - length
	for i := 0; i < pad; i++ {
		req = append(req, 0)
	}
	hdr = append(hdr, req...)
	nlgo.NlSendSimple(nlsk, familyID, 1, hdr[:])

	func() error {
		for {
			buf := make([]byte, 16384)
			if nn, _, err := syscall.Recvfrom(nlsk.Fd, buf, 0); err != nil {
				return err
			} else if nn > len(buf) {
				return nlgo.NLE_MSG_TRUNC
			} else {
				buf = buf[:nn]
			}
			if msgs, err := syscall.ParseNetlinkMessage(buf); err != nil {
				return err
			} else {
				for _, msg := range msgs {

					switch msg.Header.Type {
					case syscall.NLMSG_DONE:
						return nil
					case syscall.NLMSG_ERROR:
						return fmt.Errorf("NlMsgerr=%s", nlgo.NlMsgerr(msg))
					case 22:
						genl := (*nlgo.GenlMsghdr)(unsafe.Pointer(&msg.Data[0]))
						_ = genl
						attrs := parse_attributes(msg.Data[nlgo.GENL_HDRLEN:])
						for _, attr := range attrs {
							if attr.Type == TASKSTATS_TYPE_AGGR_PID {
								attrs = parse_attributes(attr.Data)
								break
							}
						}
						for _, attr := range attrs {
							if attr.Type == TASKSTATS_TYPE_STATS {
								_ = uint32(*(*uint32)(unsafe.Pointer(&attr.Data[248])))
								t = (*Taskstats)(unsafe.Pointer(&attr.Data[0]))
								break
							}
						}
						return nil
					default:
						return fmt.Errorf("unexpected NlMsghdr=%s", msg.Header)
					}
				}
			}
		}
	}()
	return
}


func parse_attributes(data []byte) map[int]Attr {
	attrs := make(map[int]Attr, 0)

	for len(data) != 0 {

		attr_len := binary.LittleEndian.Uint16(data[0:2])

		attr_type := binary.LittleEndian.Uint16(data[2:4])

		attrs[int(attr_type)] = Attr{
			int(attr_type),
			data[4:attr_len],
		}

		data = data[nlgo.NLMSG_ALIGN(int(attr_len)):]
	}
	return attrs
}

这里借用了 github.com/hkwi/nlgo 库的创建套接字和请求头的定义,这部分完全自己实现其实也很容易。github.com/hkwi/nlgo 中没有和taskstats相关的定义,所以这部分需要自已做。

golang便捷的兼容C结构体的encode和decode

(*Taskstats)(unsafe.Pointer(&attr.Data[0]))

将一个 []byte 数值的地址 转换为 unsafe.Pointer, 之后便可以强制转换成任意类型

package main
import (
	"fmt"
	"unsafe"
)

type S1 struct {
	a uint8
	b uint8
	c uint8
	d uint8
	i int32
}

func main() {
	data := []byte{0x01, 0x02, 0x03, 0x04, 0x10, 0x00, 0x00, 0x00}
	s1 := (*S1)(unsafe.Pointer(&data[0]))
	fmt.Println(s1)
}

输出:

&{1 2 3 4 16}

S1的每个属性都按顺序从字节数组中读取了数据

如果去掉一个 uint8 会怎么样呢

package main
import (
	"fmt"
	"unsafe"
)

type S1 struct {
	a uint8
	b uint8
	c uint8
	i int32
}

func main() {
	data := []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x00, 0x00, 0x00}
	s1 := (*S1)(unsafe.Pointer(&data[0]))
	fmt.Println(s1)
}
&{1 2 3 5}

可以发现 第4个字节的数据(0x04) 并没有被读取而是直接从第5个字节开始读取了4个字节载入 i int32中,这其实是由于golang中的结构体属性的字节对齐导致的。

C中的 attribute((aligned(8)));

taskstate.h 的结构体定义里出现了几处 __attribute__((aligned(8))),这个的意思是以8字节来进行地址对齐,