最近由于准备软件工程师职称考试,然后考完之后不小心生病了,都没写过DPDK的博客了。今天开始在上次架构优化的基础上增加TCP的协议栈流程。
什么是TCP
百度百科:TCP即传输控制协议(Transmission Control Protocol)是一种面向连接的、可靠的、基于字节流的传输层通讯协议。
这里最需要关注的就是基于字节流,在我们使用Linux的Posix API创建TCP的Socket时,我们通常会这样操作:
int socket = socket(AF_INET, SOCK_STREAM, 0);
其中的SOCK_STREAM参数的意思就是创建流式套接字。在写UDP的时候,只需要单纯的发送一个一个报文就可以,因为UDP是面向数据包的。TCP相对UDP来说是比较复杂的,它对每一个TCP数据流都需要一个对应的TCP控制块,控制数据流。
数据结构
TCP状态
typedef enum _LN_TCP_STATUS { LN_TCP_STATUS_CLOSED = 0, LN_TCP_STATUS_LISTEN, LN_TCP_STATUS_SYN_RECV, LN_TCP_STATUS_SYN_SEND, LN_TCP_STATUS_ESTABLELISTEN, LN_TCP_STATUS_FIN_WAIT_1, LN_TCP_STATUS_FIN_WAIT_2, LN_TCP_STATUS_CLOSEING, LN_TCP_STATUS_TIME_WAIT, LN_TCP_STATUS_CLOSE_WAIT, LN_TCP_STATUS_LAST_ACK, } LN_TCP_STATUS;
定义TCP的11个状态,LN没有别的意思,就是我的名字lenn的缩写而已。
TCP控制块
struct ln_tcp_stream { int fd; uint32_t sip; uint32_t dip; uint16_t sport; uint16_t dport; uint16_t proto; uint8_t localmac[RTE_ETHER_ADDR_LEN]; uint32_t snd_nxt; uint32_t rev_nxt; LN_TCP_STATUS status; struct rte_ring* snd_buf; struct rte_ring* rev_buf; struct ln_tcp_stream* prev; struct ln_tcp_stream* next; };
- fd:socket句柄
- sip、dip:源ip和目的ip
- proto:协议类型
- localmac:本地mac地址
- snd_nxt:seq
- rev_nxt:ack
- snd_buf:发送队列
- rev_buf:接收队列
- prev、next:链表存储所有tcp块
TCP数据流
struct ln_tcp_fragment { uint16_t sport; uint16_t dport; uint32_t seqnum; uint32_t acknum; uint8_t hdrlen_off; uint8_t tcp_flags; uint16_t windows; uint16_t cksum; uint16_t tcp_urp; int optlen; uint32_t option[TCP_OPTION_LENGTH]; uint8_t* data; int length; };
将tcp数据包的参数定义到fragment里面,包括数据和数据长度。
TCP控制块链表
struct ln_tcp_table { int count; struct ln_tcp_stream* streams; }; struct ln_tcp_table* tcpt = NULL; static struct ln_tcp_table* ln_tcp_instance(void) { if(tcpt == NULL) { tcpt = rte_malloc("tcpt", sizeof(struct ln_tcp_table), 0); if(!tcpt) { rte_exit(EXIT_FAILURE, "Error with malloc tcpt"); } memset(tcpt, 0, sizeof(struct ln_tcp_table)); } return tcpt; } static struct ln_tcp_stream* ln_tcp_stream_search(uint32_t sip, uint32_t dip, uint16_t sport, uint16_t dport) { struct ln_tcp_table* table = ln_tcp_instance(); struct ln_tcp_stream* iter; for(iter = table->streams; iter != NULL; iter = iter->next) { if(iter->dip == dip && iter->sip == sip && iter->sport == sport && iter->dport == dport) { return iter; } } return NULL; } static struct ln_tcp_stream* ln_tcp_stream_create(uint32_t sip, uint32_t dip, uint32_t sport, uint32_t dport) { struct ln_tcp_stream* stream = rte_malloc("ln_tcp_stream", sizeof(struct ln_tcp_stream), 0); if(!stream) return NULL; stream->sip = sip; stream->dip = dip; stream->sport = sport; stream->dport = dport; stream->proto = IPPROTO_TCP; stream->status = LN_TCP_STATUS_LISTEN; uint32_t next_seed = time(NULL); stream->snd_nxt = rand_r(&next_seed) % TCP_MAX_SEQ; stream->rev_buf = rte_ring_create("tcp_rev_ring", RING_SIZE, rte_socket_id(), 0); stream->snd_buf = rte_ring_create("tcp_snd_ring", RING_SIZE, rte_socket_id(), 0); rte_memcpy(stream->localmac, gSrcMac, RTE_ETHER_ADDR_LEN); struct ln_tcp_table* table = ln_tcp_instance(); LL_ADD(stream, table->streams); return stream; }
单例模式,将所有的TCP控制块存储在一个链表中,同时统计有多少个TCP控制块。根据源端口,目的端口,源IP和目的IP来搜索链表中有没有已经存在的TCP控制块;如果没有搜索到的话,创建新的TCP控制块并且插入到链表中。需要注意的是,每个TCP控制块都有自己的环形收发缓冲区用来管理自己的数据流fragment。
协议栈函数
TCP流程控制
static int ln_tcp_process(struct rte_mbuf* tcpmbuf) { printf("ln_tcp_process\n"); struct rte_ipv4_hdr* iphdr = rte_pktmbuf_mtod_offset(tcpmbuf, struct rte_ipv4_hdr*, sizeof(struct rte_ether_hdr)); struct rte_tcp_hdr* tcphdr = (struct rte_tcp_hdr*)(iphdr + 1); #if 1 uint16_t tcpcksum = tcphdr->cksum; tcphdr->cksum = 0; uint16_t cksum = rte_ipv4_udptcp_cksum(iphdr, tcphdr); if(tcpcksum != cksum) { printf("cksum: %x, tcp cksum: %x\n", cksum, tcpcksum); return -1; } #endif struct ln_tcp_stream* stream = ln_tcp_stream_search(iphdr->src_addr, iphdr->dst_addr, tcphdr->src_port, tcphdr->dst_port); if(stream == NULL) { stream = ln_tcp_stream_create(iphdr->src_addr, iphdr->dst_addr, tcphdr->src_port, tcphdr->dst_port); if(stream == NULL) return -2; } switch(stream->status) { case LN_TCP_STATUS_CLOSED: break; case LN_TCP_STATUS_LISTEN: printf("listen\n"); ln_tcp_handle_listen(stream, tcphdr); break; case LN_TCP_STATUS_SYN_RECV: printf("recv\n"); ln_tcp_handle_syn_recv(stream, tcphdr); break; case LN_TCP_STATUS_SYN_SEND: break; case LN_TCP_STATUS_ESTABLELISTEN: { printf("establelisten\n"); uint8_t hdrlen = (tcphdr->data_off & 0xF0); //hdrlen >= 4; uint8_t* offload = (uint8_t*)(tcphdr + 1) + hdrlen * 4; printf("offload: %s\n", offload); break; } case LN_TCP_STATUS_FIN_WAIT_1: break; case LN_TCP_STATUS_FIN_WAIT_2: break; case LN_TCP_STATUS_CLOSEING: break; case LN_TCP_STATUS_TIME_WAIT: break; case LN_TCP_STATUS_CLOSE_WAIT: break; case LN_TCP_STATUS_LAST_ACK: break; } return 0; }
这里是主要的TCP流程控制函数,这里已经完成的部分只是实现了TCP的三次握手,比较直观的说就是,点击网络助手的连接可以连接成功:
首先我们需要校验每一个TCP数据包,如果校验结果不对,那包数据就是错误的,直接返回。其实在这里,ln_tcp_handle_syn_recv不是必要的,只要进入的ESTABLELISTEN状态都是可以连接成功的。
组织TCP数据包
static int ln_encode_tcp_pkt(uint8_t* msg, uint32_t sip, uint32_t dip, uint8_t* smac, uint8_t* dmac, struct ln_tcp_fragment* fragment) { printf("ln_encode_tcp_pkt\n"); uint16_t hdr_len = sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_tcp_hdr); uint16_t total_len = fragment->length + hdr_len + fragment->optlen * sizeof(uint32_t); struct rte_ether_hdr* ethhdr = (struct rte_ether_hdr*)msg; rte_memcpy(ethhdr->s_addr.addr_bytes, smac, RTE_ETHER_ADDR_LEN); rte_memcpy(ethhdr->d_addr.addr_bytes, dmac, RTE_ETHER_ADDR_LEN); ethhdr->ether_type = htons(RTE_ETHER_TYPE_IPV4); struct rte_ipv4_hdr* iphdr = (struct rte_ipv4_hdr*)(ethhdr + 1); iphdr->version_ihl = 0x45; iphdr->time_to_live = 64; iphdr->src_addr = sip; iphdr->dst_addr = dip; iphdr->next_proto_id = IPPROTO_TCP; iphdr->fragment_offset = 0; iphdr->total_length = htons(total_len - sizeof(struct rte_ether_hdr)); iphdr->packet_id = 0; iphdr->type_of_service = 0; iphdr->hdr_checksum = 0; iphdr->hdr_checksum = rte_ipv4_cksum(iphdr); struct rte_tcp_hdr* tcphdr = (struct rte_tcp_hdr*)(iphdr + 1); tcphdr->src_port = fragment->sport; tcphdr->dst_port = fragment->dport; tcphdr->recv_ack = htonl(fragment->acknum); tcphdr->sent_seq = htonl(fragment->seqnum); tcphdr->data_off = fragment->hdrlen_off; tcphdr->rx_win = fragment->windows; tcphdr->tcp_flags = fragment->tcp_flags; tcphdr->tcp_urp = fragment->tcp_urp; if(fragment->data != NULL) { uint8_t* offload = (uint8_t*)(tcphdr + 1) + fragment->optlen * sizeof(uint32_t); rte_memcpy(offload, fragment->data, fragment->length); } tcphdr->cksum = 0; tcphdr->cksum = rte_ipv4_udptcp_cksum(iphdr, tcphdr); return 0; } static struct rte_mbuf* ln_send_tcp(struct rte_mempool* mbuf_pool, uint32_t sip, uint32_t dip, uint8_t* smac, uint8_t* dmac, struct ln_tcp_fragment* fragment) { struct rte_mbuf* mbuf = rte_pktmbuf_alloc(mbuf_pool); if(!mbuf) { rte_exit(EXIT_FAILURE, "rte_pktmbuf_alloc tcp\n"); } uint16_t total_len = fragment->length + sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_tcp_hdr) + fragment->optlen * sizeof(uint32_t); mbuf->pkt_len = total_len; mbuf->data_len = total_len; uint8_t* pktdata = rte_pktmbuf_mtod(mbuf, uint8_t*); ln_encode_tcp_pkt(pktdata, sip, dip, smac, dmac, fragment); return mbuf; }
这里不解释了,一直都是这样过来的,哪里有问题了[doge]
TCP过程转换
三次握手过程
参考这篇文章,我这里就摘录一下文字总结的部分。
三次握手是 TCP 连接的建立过程。在握手之前,主动打开连接的客户端结束 CLOSE 阶段,被动打开的服务器也结束 CLOSE 阶段,并进入 LISTEN 阶段。随后进入三次握手阶段:
- 首先客户端向服务器发送一个 SYN 包,并等待服务器确认,其中:
- 标志位为 SYN,表示请求建立连接
- 序号为 Seq = x(x 一般取随机数)
- 随后客户端进入 SYN-SENT 阶段 - 服务器接收到客户端发来的 SYN 包后,对该包进行确认后结束 LISTEN 阶段,并返回一段 TCP 报文,其中:
- 标志位为 SYN 和 ACK,表示确认客户端的报文 Seq 序号有效,服务器能正常接收客户端发送的数据,并同意创建新连接
- 序号为 Seq = y
- 确认号为 Ack = x + 1,表示收到客户端的序号 Seq 并将其值加 1 作为自己确认号 Ack 的值,随后服务器端进入 SYN-RECV 阶段 - 客户端接收到发送的 SYN + ACK 包后,明确了从客户端到服务器的数据传输是正常的,从而结束 SYN-SENT 阶段。并返回最后一段报文。其中:
- 标志位为 ACK,表示确认收到服务器端同意连接的信号
- 序号为 Seq = x + 1,表示收到服务器端的确认号 Ack,并将其值作为自己的序号值
- 确认号为 Ack= y + 1,表示收到服务器端序号 seq,并将其值加 1 作为自己的确认号 Ack 的值
- 随后客户端进入 ESTABLISHED
当服务器端收到来自客户端确认收到服务器数据的报文后,得知从服务器到客户端的数据传输是正常的,从而结束 SYN-RECV 阶段,进入 ESTABLISHED 阶段,从而完成三次握手。
服务器LISTEN状态
static int ln_tcp_handle_listen(struct ln_tcp_stream* stream, struct rte_tcp_hdr* hdr) { if(hdr->tcp_flags & RTE_TCP_SYN_FLAG) { if(stream->status == LN_TCP_STATUS_LISTEN) { struct ln_tcp_fragment* fragment = rte_malloc("tcp_fragment", sizeof(struct ln_tcp_fragment), 0); if(!fragment) { return -1; } memset(fragment, 0, sizeof(struct ln_tcp_fragment)); fragment->sport = hdr->dst_port; fragment->dport = hdr->src_port; struct in_addr addr; addr.s_addr = stream->sip; printf("tcp --> src: %s:%d ", inet_ntoa(addr), ntohs(hdr->src_port)); addr.s_addr = stream->dip; printf(" --> dst: %s:%d\n", inet_ntoa(addr), ntohs(hdr->dst_port)); fragment->seqnum = stream->snd_nxt; printf("before get ack\n"); fragment->acknum = ntohl(hdr->sent_seq) + 1; printf("before get flags\n"); fragment->tcp_flags = (RTE_TCP_ACK_FLAG | RTE_TCP_SYN_FLAG); fragment->windows = TCP_INITIAL_WINDOW; fragment->hdrlen_off = 0x50; fragment->data = NULL; fragment->length = 0; rte_ring_mp_enqueue(stream->snd_buf, fragment); stream->status = LN_TCP_STATUS_SYN_RECV; } } return 0; }
服务器SYN_RECV状态
static int ln_tcp_handle_syn_recv(struct ln_tcp_stream* stream, struct rte_tcp_hdr* hdr) { if(hdr->tcp_flags & RTE_TCP_ACK_FLAG) { if(stream->status == LN_TCP_STATUS_SYN_RECV) { uint32_t ack = ntohl(hdr->recv_ack); if(ack == stream->snd_nxt + 1) { } stream->status = LN_TCP_STATUS_ESTABLELISTEN; } } return 0; }
完整代码
#include <rte_eal.h> #include <rte_ethdev.h> #include <rte_mbuf.h> #include <rte_malloc.h> #include <rte_timer.h> #include <rte_ring.h> #include <stdio.h> #include <stdlib.h> #include <arpa/inet.h> #include "arp.h" #define ENABLE_SEND 1 #define ENABLE_ARP 1 #define ENABLE_ICMP 1 #define ENABLE_ARP_REPLY 1 #define ENABLE_DEBUG 1 #define ENABLE_TIMER 1 #define NUM_MBUFS (4096-1) #define BURST_SIZE 32 #define RING_SIZE 1024 #define UDP_APP_RECV_BUFFER_SIZE 128 #define TIMER_RESOLUTION_CYCLES 120000000000ULL // 10ms * 1000 = 10s * 6 struct inout_ring { struct rte_ring* in; struct rte_ring* out; }; static struct inout_ring* ioInst = NULL; static struct inout_ring* inout_ring_instance(void) { if(ioInst == NULL) { ioInst = rte_malloc("inout ring", sizeof(struct inout_ring), 0); memset(ioInst, 0, sizeof(struct inout_ring)); } return ioInst; } #if ENABLE_SEND #define MAKE_IPV4_ADDR(a, b, c, d) (a + (b<<8) + (c<<16) + (d<<24)) static uint32_t gLocalIp = MAKE_IPV4_ADDR(172, 26, 34, 243); static uint32_t gSrcIp; // static uint32_t gDstIp; static uint8_t gSrcMac[RTE_ETHER_ADDR_LEN]; //static uint8_t gDstMac[RTE_ETHER_ADDR_LEN]; static uint16_t gSrcPort; static uint16_t gDstPort; #endif #if ENABLE_ARP_REPLY static uint8_t gDefaultArpMac[RTE_ETHER_ADDR_LEN] = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}; #endif int gDpdkPortId = 0; static const struct rte_eth_conf port_conf_default = { .rxmode = {.max_rx_pkt_len = RTE_ETHER_MAX_LEN } }; int udp_process(struct rte_mbuf* udpmbuf); static void ng_init_port(struct rte_mempool *mbuf_pool) { uint16_t nb_sys_ports= rte_eth_dev_count_avail(); // if (nb_sys_ports == 0) { rte_exit(EXIT_FAILURE, "No Supported eth found\n"); } struct rte_eth_dev_info dev_info; rte_eth_dev_info_get(gDpdkPortId, &dev_info); // const int num_rx_queues = 1; const int num_tx_queues = 1; struct rte_eth_conf port_conf = port_conf_default; rte_eth_dev_configure(gDpdkPortId, num_rx_queues, num_tx_queues, &port_conf); if (rte_eth_rx_queue_setup(gDpdkPortId, 0 , 1024, rte_eth_dev_socket_id(gDpdkPortId),NULL, mbuf_pool) < 0) { rte_exit(EXIT_FAILURE, "Could not setup RX queue\n"); } #if ENABLE_SEND struct rte_eth_txconf txq_conf = dev_info.default_txconf; txq_conf.offloads = port_conf.rxmode.offloads; if (rte_eth_tx_queue_setup(gDpdkPortId, 0 , 1024, rte_eth_dev_socket_id(gDpdkPortId), &txq_conf) < 0) { rte_exit(EXIT_FAILURE, "Could not setup TX queue\n"); } #endif if (rte_eth_dev_start(gDpdkPortId) < 0 ) { rte_exit(EXIT_FAILURE, "Could not start\n"); } } static int ng_encode_udp_pkt(uint8_t *msg, uint32_t sip, uint32_t dip, uint16_t sport, uint16_t dport, uint8_t* smac, uint8_t* dmac, unsigned char *data, uint16_t total_len) { // encode // 1 ethhdr struct rte_ether_hdr *eth = (struct rte_ether_hdr *)msg; rte_memcpy(eth->s_addr.addr_bytes, smac, RTE_ETHER_ADDR_LEN); rte_memcpy(eth->d_addr.addr_bytes, dmac, RTE_ETHER_ADDR_LEN); eth->ether_type = htons(RTE_ETHER_TYPE_IPV4); // 2 iphdr struct rte_ipv4_hdr *ip = (struct rte_ipv4_hdr *)(msg + sizeof(struct rte_ether_hdr)); ip->version_ihl = 0x45; ip->type_of_service = 0; ip->total_length = htons(total_len - sizeof(struct rte_ether_hdr)); ip->packet_id = 0; ip->fragment_offset = 0; ip->time_to_live = 64; // ttl = 64 ip->next_proto_id = IPPROTO_UDP; ip->src_addr = sip; ip->dst_addr = dip; ip->hdr_checksum = 0; ip->hdr_checksum = rte_ipv4_cksum(ip); // 3 udphdr struct rte_udp_hdr *udp = (struct rte_udp_hdr *)(msg + sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr)); udp->src_port = sport; udp->dst_port = dport; uint16_t udplen = total_len - sizeof(struct rte_ether_hdr) - sizeof(struct rte_ipv4_hdr); udp->dgram_len = htons(udplen); rte_memcpy((uint8_t*)(udp+1), data, udplen); udp->dgram_cksum = 0; udp->dgram_cksum = rte_ipv4_udptcp_cksum(ip, udp); struct in_addr addr; addr.s_addr = gSrcIp; printf(" --> src: %s:%d, ", inet_ntoa(addr), ntohs(gSrcPort)); addr.s_addr = gDstIp; printf("dst: %s:%d\n", inet_ntoa(addr), ntohs(gDstPort)); return 0; } static struct rte_mbuf * ng_send_udp(struct rte_mempool *mbuf_pool, uint32_t sip, uint32_t dip, uint16_t sport, uint16_t dport, uint8_t* smac, uint8_t* dmac, uint8_t *data, uint16_t length) { // mempool --> mbuf const unsigned total_len = length + 42; struct rte_mbuf *mbuf = rte_pktmbuf_alloc(mbuf_pool); if (!mbuf) { rte_exit(EXIT_FAILURE, "rte_pktmbuf_alloc udp\n"); } mbuf->pkt_len = total_len; mbuf->data_len = total_len; uint8_t *pktdata = rte_pktmbuf_mtod(mbuf, uint8_t*); ng_encode_udp_pkt(pktdata, sip, dip, sport, dport, smac, dmac, data, total_len); return mbuf; } #if ENABLE_ARP static int ng_encode_arp_pkt(uint8_t *msg, uint16_t opcode, uint8_t *dst_mac, uint32_t sip, uint32_t dip) { // 1 ethhdr struct rte_ether_hdr *eth = (struct rte_ether_hdr *)msg; rte_memcpy(eth->s_addr.addr_bytes, gSrcMac, RTE_ETHER_ADDR_LEN); if (!strncmp((const char *)dst_mac, (const char *)gDefaultArpMac, RTE_ETHER_ADDR_LEN)) { uint8_t mac[RTE_ETHER_ADDR_LEN] = {0x0}; rte_memcpy(eth->d_addr.addr_bytes, mac, RTE_ETHER_ADDR_LEN); } else { rte_memcpy(eth->d_addr.addr_bytes, dst_mac, RTE_ETHER_ADDR_LEN); } eth->ether_type = htons(RTE_ETHER_TYPE_ARP); // 2 arp struct rte_arp_hdr *arp = (struct rte_arp_hdr *)(eth + 1); arp->arp_hardware = htons(1); arp->arp_protocol = htons(RTE_ETHER_TYPE_IPV4); arp->arp_hlen = RTE_ETHER_ADDR_LEN; arp->arp_plen = sizeof(uint32_t); arp->arp_opcode = htons(opcode); rte_memcpy(arp->arp_data.arp_sha.addr_bytes, gSrcMac, RTE_ETHER_ADDR_LEN); rte_memcpy( arp->arp_data.arp_tha.addr_bytes, dst_mac, RTE_ETHER_ADDR_LEN); arp->arp_data.arp_sip = sip; arp->arp_data.arp_tip = dip; return 0; } static struct rte_mbuf *ng_send_arp(struct rte_mempool *mbuf_pool, uint16_t opcode, uint8_t *dst_mac, uint32_t sip, uint32_t dip) { const unsigned total_length = sizeof(struct rte_ether_hdr) + sizeof(struct rte_arp_hdr); struct rte_mbuf *mbuf = rte_pktmbuf_alloc(mbuf_pool); if (!mbuf) { rte_exit(EXIT_FAILURE, "rte_pktmbuf_alloc arp\n"); } mbuf->pkt_len = total_length; mbuf->data_len = total_length; uint8_t *pkt_data = rte_pktmbuf_mtod(mbuf, uint8_t *); ng_encode_arp_pkt(pkt_data, opcode, dst_mac, sip, dip); return mbuf; } #endif #if ENABLE_ICMP static uint16_t ng_checksum(uint16_t *addr, int count) { register long sum = 0; while (count > 1) { sum += *(unsigned short*)addr++; count -= 2; } if (count > 0) { sum += *(unsigned char *)addr; } while (sum >> 16) { sum = (sum & 0xffff) + (sum >> 16); } return ~sum; } static int ng_encode_icmp_pkt(uint8_t *msg, uint8_t *dst_mac, uint32_t sip, uint32_t dip, uint16_t id, uint16_t seqnb) { // 1 ether struct rte_ether_hdr *eth = (struct rte_ether_hdr *)msg; rte_memcpy(eth->s_addr.addr_bytes, gSrcMac, RTE_ETHER_ADDR_LEN); rte_memcpy(eth->d_addr.addr_bytes, dst_mac, RTE_ETHER_ADDR_LEN); eth->ether_type = htons(RTE_ETHER_TYPE_IPV4); // 2 ip struct rte_ipv4_hdr *ip = (struct rte_ipv4_hdr *)(msg + sizeof(struct rte_ether_hdr)); ip->version_ihl = 0x45; ip->type_of_service = 0; ip->total_length = htons(sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_icmp_hdr)); ip->packet_id = 0; ip->fragment_offset = 0; ip->time_to_live = 64; // ttl = 64 ip->next_proto_id = IPPROTO_ICMP; ip->src_addr = sip; ip->dst_addr = dip; ip->hdr_checksum = 0; ip->hdr_checksum = rte_ipv4_cksum(ip); // 3 icmp struct rte_icmp_hdr *icmp = (struct rte_icmp_hdr *)(msg + sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr)); icmp->icmp_type = RTE_IP_ICMP_ECHO_REPLY; icmp->icmp_code = 0; icmp->icmp_ident = id; icmp->icmp_seq_nb = seqnb; icmp->icmp_cksum = 0; icmp->icmp_cksum = ng_checksum((uint16_t*)icmp, sizeof(struct rte_icmp_hdr)); return 0; } static struct rte_mbuf *ng_send_icmp(struct rte_mempool *mbuf_pool, uint8_t *dst_mac, uint32_t sip, uint32_t dip, uint16_t id, uint16_t seqnb) { const unsigned total_length = sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_icmp_hdr); struct rte_mbuf *mbuf = rte_pktmbuf_alloc(mbuf_pool); if (!mbuf) { rte_exit(EXIT_FAILURE, "rte_pktmbuf_alloc icmp\n"); } mbuf->pkt_len = total_length; mbuf->data_len = total_length; uint8_t *pkt_data = rte_pktmbuf_mtod(mbuf, uint8_t *); ng_encode_icmp_pkt(pkt_data, dst_mac, sip, dip, id, seqnb); return mbuf; } #endif static void print_ethaddr(const char *name, const struct rte_ether_addr *eth_addr) { char buf[RTE_ETHER_ADDR_FMT_SIZE]; rte_ether_format_addr(buf, RTE_ETHER_ADDR_FMT_SIZE, eth_addr); printf("%s%s", name, buf); } #if ENABLE_TIMER static void arp_request_timer_cb(__attribute__((unused)) struct rte_timer *tim, void *arg) { struct rte_mempool *mbuf_pool = (struct rte_mempool *)arg; #if 0 struct rte_mbuf *arpbuf = ng_send_arp(mbuf_pool, RTE_ARP_OP_REQUEST, ahdr->arp_data.arp_sha.addr_bytes, ahdr->arp_data.arp_tip, ahdr->arp_data.arp_sip); rte_eth_tx_burst(gDpdkPortId, 0, &arpbuf, 1); rte_pktmbuf_free(arpbuf); #endif int i = 0; for (i = 1;i <= 254;i ++) { uint32_t dstip = (gLocalIp & 0x00FFFFFF) | (0xFF000000 & (i << 24)); struct in_addr addr; addr.s_addr = dstip; printf("arp ---> src: %s \n", inet_ntoa(addr)); struct rte_mbuf *arpbuf = NULL; uint8_t *dstmac = ng_get_dst_macaddr(dstip); if (dstmac == NULL) { arpbuf = ng_send_arp(mbuf_pool, RTE_ARP_OP_REQUEST, gDefaultArpMac, gLocalIp, dstip); } else { arpbuf = ng_send_arp(mbuf_pool, RTE_ARP_OP_REQUEST, dstmac, gLocalIp, dstip); } rte_eth_tx_burst(gDpdkPortId, 0, &arpbuf, 1); rte_pktmbuf_free(arpbuf); } } #endif static int udp_out(struct rte_mempool* mbuf_pool); static int ln_tcp_out(struct rte_mempool* mbuf_pool); static int ln_tcp_process(struct rte_mbuf* tcpmbuf); static int pkt_process(void* arg) { struct rte_mempool* mbuf_pool = (struct rte_mempool*)arg; struct inout_ring* ring = inout_ring_instance(); while(1) { struct rte_mbuf *mbufs[BURST_SIZE]; unsigned num_recvd = rte_ring_mc_dequeue_burst(ring->in, (void**)mbufs, BURST_SIZE, NULL); unsigned i = 0; for (i = 0;i < num_recvd;i++) { struct rte_ether_hdr *ehdr = rte_pktmbuf_mtod(mbufs[i], struct rte_ether_hdr*); #if ENABLE_ARP if (ehdr->ether_type == rte_cpu_to_be_16(RTE_ETHER_TYPE_ARP)) { struct rte_arp_hdr *ahdr = rte_pktmbuf_mtod_offset(mbufs[i], struct rte_arp_hdr *, sizeof(struct rte_ether_hdr)); struct in_addr addr; addr.s_addr = ahdr->arp_data.arp_tip; printf("arp ---> src: %s ", inet_ntoa(addr)); addr.s_addr = gLocalIp; printf(" local: %s \n", inet_ntoa(addr)); if (ahdr->arp_data.arp_tip == gLocalIp) { if (ahdr->arp_opcode == rte_cpu_to_be_16(RTE_ARP_OP_REQUEST)) { printf("arp --> request\n"); struct rte_mbuf *arpbuf = ng_send_arp(mbuf_pool, RTE_ARP_OP_REPLY, ahdr->arp_data.arp_sha.addr_bytes, ahdr->arp_data.arp_tip, ahdr->arp_data.arp_sip); //rte_eth_tx_burst(gDpdkPortId, 0, &arpbuf, 1); //rte_pktmbuf_free(arpbuf); rte_ring_mp_enqueue_burst(ring->out, (void**)&arpbuf, 1, NULL); } else if (ahdr->arp_opcode == rte_cpu_to_be_16(RTE_ARP_OP_REPLY)) { printf("arp --> reply\n"); struct arp_table *table = arp_table_instance(); uint8_t *hwaddr = ng_get_dst_macaddr(ahdr->arp_data.arp_sip); if (hwaddr == NULL) { struct arp_entry *entry = rte_malloc("arp_entry",sizeof(struct arp_entry), 0); if (entry) { memset(entry, 0, sizeof(struct arp_entry)); entry->ip = ahdr->arp_data.arp_sip; rte_memcpy(entry->hwaddr, ahdr->arp_data.arp_sha.addr_bytes, RTE_ETHER_ADDR_LEN); entry->type = 0; LL_ADD(entry, table->entries); table->count ++; } } #if ENABLE_DEBUG struct arp_entry *iter; for (iter = table->entries; iter != NULL; iter = iter->next) { struct in_addr addr; addr.s_addr = iter->ip; print_ethaddr("arp table --> mac: ", (struct rte_ether_addr *)iter->hwaddr); printf(" ip: %s \n", inet_ntoa(addr)); } #endif rte_pktmbuf_free(mbufs[i]); } continue; } } #endif if (ehdr->ether_type != rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4)) { continue; } struct rte_ipv4_hdr *iphdr = rte_pktmbuf_mtod_offset(mbufs[i], struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr)); if (iphdr->next_proto_id == IPPROTO_UDP) { udp_process(mbufs[i]); } if(iphdr->next_proto_id == IPPROTO_TCP) { ln_tcp_process(mbufs[i]); } #if ENABLE_ICMP if (iphdr->next_proto_id == IPPROTO_ICMP) { struct rte_icmp_hdr *icmphdr = (struct rte_icmp_hdr *)(iphdr + 1); struct in_addr addr; addr.s_addr = iphdr->src_addr; printf("icmp ---> src: %s ", inet_ntoa(addr)); if (icmphdr->icmp_type == RTE_IP_ICMP_ECHO_REQUEST) { addr.s_addr = iphdr->dst_addr; printf(" local: %s , type : %d\n", inet_ntoa(addr), icmphdr->icmp_type); struct rte_mbuf *txbuf = ng_send_icmp(mbuf_pool, ehdr->s_addr.addr_bytes, iphdr->dst_addr, iphdr->src_addr, icmphdr->icmp_ident, icmphdr->icmp_seq_nb); //rte_eth_tx_burst(gDpdkPortId, 0, &txbuf, 1); //rte_pktmbuf_free(txbuf); rte_ring_mp_enqueue_burst(ring->out, (void**)&txbuf, 1, NULL); rte_pktmbuf_free(mbufs[i]); } } #endif } udp_out(mbuf_pool); ln_tcp_out(mbuf_pool); } return 0; } struct localhost { int fd; uint32_t localip; uint8_t localmac[RTE_ETHER_ADDR_LEN]; uint16_t localport; uint8_t proto; struct rte_ring* recv_buf; struct rte_ring* send_buf; struct localhost* prev; struct localhost* next; pthread_mutex_t mutex; pthread_cond_t cond; }; struct localhost* lhost = NULL; #define DEFAULT_FD 3 static int get_fd_frombitmap(void) { int fd = DEFAULT_FD; return fd; } static struct localhost* get_host_fromfd(int fd) { struct localhost* host = lhost; for(host = lhost; host != NULL; host = host->next) { if(host->fd == fd) return host; } return NULL; }; static struct localhost* get_host_fromport(uint32_t dip, uint16_t port, uint8_t proto) { struct localhost* host = lhost; for(host = lhost; host != NULL; host = host->next) { if(host->localip == dip && host->localport == port && host->proto == proto) return host; } return NULL; } struct offload { uint32_t sip; uint32_t dip; uint16_t sport; uint16_t dport; uint8_t proto; uint8_t* data; uint16_t length; }; int udp_process(struct rte_mbuf* udpmbuf) { struct rte_ipv4_hdr* ip = rte_pktmbuf_mtod_offset(udpmbuf, struct rte_ipv4_hdr*, sizeof(struct rte_ether_hdr)); struct rte_udp_hdr* udp = (struct rte_udp_hdr*)(ip + 1); struct localhost* host = get_host_fromport(ip->dst_addr, udp->dst_port, ip->next_proto_id); if(host == NULL) { rte_pktmbuf_free(udpmbuf); return -3; } struct offload* ol = rte_malloc("udp ol", sizeof(struct offload), 0); if(ol == NULL) { rte_pktmbuf_free(udpmbuf); return -2; } ol->sip = ip->src_addr; ol->dip = ip->dst_addr; ol->sport = udp->src_port; ol->dport = udp->dst_port; ol->proto = IPPROTO_UDP; ol->length = ntohs(udp->dgram_len); ol->data = rte_malloc("ol data", ol->length - sizeof(struct rte_udp_hdr), 0); if(ol->data == NULL) { rte_pktmbuf_free(udpmbuf); rte_free(ol); return -1; } rte_memcpy(ol->data, (uint8_t*)(udp + 1), ol->length - sizeof(struct rte_udp_hdr)); rte_ring_mp_enqueue(host->recv_buf, ol); pthread_mutex_lock(&host->mutex); pthread_cond_signal(&host->cond); pthread_mutex_unlock(&host->mutex); return 0; } static int udp_out(struct rte_mempool* mbuf_pool) { struct localhost* host; for(host = lhost; host != NULL; host = host->next) { struct offload* ol; int nb_send = rte_ring_mc_dequeue(host->send_buf, (void**)&ol); if(nb_send < 0) continue; struct in_addr addr; addr.s_addr = ol->dip; printf("udp_out --> src: %s:%d\n", inet_ntoa(addr), ntohs(ol->dport)); uint8_t* dstmac = ng_get_dst_macaddr(ol->dip); if(dstmac == NULL) { struct rte_mbuf* arpbuf = ng_send_arp(mbuf_pool, RTE_ARP_OP_REQUEST, gDefaultArpMac, ol->sip, ol->dip); struct inout_ring* ring = inout_ring_instance(); rte_ring_mp_enqueue_burst(ring->out, (void**)&arpbuf, 1, NULL); rte_ring_mp_enqueue(host->send_buf, ol); } else { struct rte_mbuf* udpbuf = ng_send_udp(mbuf_pool, ol->sip, ol->dip, ol->sport, ol->dport, host->localmac, dstmac, ol->data, ol->length); struct inout_ring* ring = inout_ring_instance(); rte_ring_mp_enqueue_burst(ring->out, (void**)&udpbuf, 1, NULL); } } return 0; } static int nsocket(__attribute__((unused)) int domain, int type, __attribute__((unused)) int protocol) { int fd = get_fd_frombitmap(); struct localhost* host = rte_malloc("localhost", sizeof(struct localhost), 0); if(host == NULL) { return -1; } memset(host, 0, sizeof(struct localhost)); host->fd = fd; if(type == SOCK_DGRAM) host->proto = IPPROTO_UDP; host->send_buf = rte_ring_create("send buffer", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); if(host->send_buf == NULL) { rte_free(host); return -1; } host->recv_buf = rte_ring_create("recv buffer", RING_SIZE, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); if(host->recv_buf == NULL) { rte_ring_free(host->send_buf); rte_free(host); return -1; } pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; rte_memcpy(&host->cond, &blank_cond, sizeof(pthread_cond_t)); rte_memcpy(&host->mutex, &blank_mutex, sizeof(pthread_mutex_t)); LL_ADD(host, lhost); return fd; } static int nbind(int sockfd, const struct sockaddr *addr, __attribute__((unused))socklen_t addrlen) { struct localhost* host = get_host_fromfd(sockfd); if(host == NULL) { return -1; } const struct sockaddr_in* addr_in = (const struct sockaddr_in*)addr; host->localport = addr_in->sin_port; rte_memcpy(&host->localip, &addr_in->sin_addr.s_addr, sizeof(uint32_t)); rte_memcpy(host->localmac, gSrcMac, RTE_ETHER_ADDR_LEN); return 0; } static ssize_t nrecvfrom(int sockfd, void *buf, size_t len, __attribute__((unused))int flags, struct sockaddr *src_addr, __attribute__((unused))socklen_t *addrlen) { struct localhost* host = get_host_fromfd(sockfd); if(host == NULL) { return -1; } struct offload* ol = NULL; uint8_t* ptr = NULL; struct sockaddr_in* addr_in = (struct sockaddr_in*)src_addr; int nb = -1; pthread_mutex_lock(&host->mutex); while((nb = rte_ring_mc_dequeue(host->recv_buf, (void**)&ol)) < 0) { pthread_cond_wait(&host->cond, &host->mutex); } pthread_mutex_unlock(&host->mutex); addr_in->sin_port = ol->sport; rte_memcpy(&addr_in->sin_addr.s_addr, &ol->sip, sizeof(uint32_t)); if(len < ol->length) { rte_memcpy(buf, ol->data, len); ptr = rte_malloc("ptr", ol->length - len, 0); rte_memcpy(ptr, ol->data + len, ol->length - len); ol->length -= len; rte_free(ol->data); ol->data = ptr; rte_ring_mp_enqueue(host->recv_buf, ol); return len; } else { uint16_t length = ol->length; rte_memcpy(buf, ol->data, ol->length); rte_free(ol->data); rte_free(ol); return length; } } static ssize_t nsendto(int sockfd, const void *buf, size_t len, __attribute__((unused))int flags, const struct sockaddr *dest_addr, __attribute__((unused))socklen_t addrlen) { struct localhost* host = get_host_fromfd(sockfd); if(host == NULL) { return -1; } const struct sockaddr_in* addr_in = (const struct sockaddr_in*)dest_addr; struct offload* ol = rte_malloc("ol", sizeof(struct offload), 0); if(ol == NULL) { return -1; } ol->dport = addr_in->sin_port; ol->sport = host->localport; ol->dip = addr_in->sin_addr.s_addr; ol->sip = host->localip; ol->length = len; ol->data = rte_malloc("data", len, 0); if(ol->data == NULL) { rte_free(ol); return -1; } rte_memcpy(ol->data, buf, len); rte_ring_mp_enqueue(host->send_buf, ol); return len; } static int nclose(int fd) { struct localhost* host = get_host_fromfd(fd); if(host == NULL) { return -1; } LL_REMOVE(host, lhost); if(host->send_buf) { rte_ring_free(host->send_buf); } if(host->recv_buf) { rte_ring_free(host->recv_buf); } rte_free(host); return 0; } static int udp_server_entry(__attribute__((unused)) void *arg) { int connfd = nsocket(AF_INET, SOCK_DGRAM, 0); if (connfd == -1) { printf("sockfd failed\n"); return -1; } struct sockaddr_in localaddr, clientaddr; // struct sockaddr memset(&localaddr, 0, sizeof(struct sockaddr_in)); localaddr.sin_port = htons(8889); localaddr.sin_family = AF_INET; localaddr.sin_addr.s_addr = inet_addr("192.168.1.184"); // 0.0.0.0 nbind(connfd, (struct sockaddr*)&localaddr, sizeof(localaddr)); char buffer[UDP_APP_RECV_BUFFER_SIZE] = {0}; socklen_t addrlen = sizeof(clientaddr); while (1) { if (nrecvfrom(connfd, buffer, UDP_APP_RECV_BUFFER_SIZE, 0, (struct sockaddr*)&clientaddr, &addrlen) < 0) { continue; } else { printf("recv from %s:%d, data:%s\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port), buffer); nsendto(connfd, buffer, strlen(buffer), 0, (struct sockaddr*)&clientaddr, sizeof(clientaddr)); } } nclose(connfd); } #define TCP_OPTION_LENGTH 10 #define TCP_MAX_SEQ 4294967295 #define TCP_INITIAL_WINDOW 14600 typedef enum _LN_TCP_STATUS { LN_TCP_STATUS_CLOSED = 0, LN_TCP_STATUS_LISTEN, LN_TCP_STATUS_SYN_RECV, LN_TCP_STATUS_SYN_SEND, LN_TCP_STATUS_ESTABLELISTEN, LN_TCP_STATUS_FIN_WAIT_1, LN_TCP_STATUS_FIN_WAIT_2, LN_TCP_STATUS_CLOSEING, LN_TCP_STATUS_TIME_WAIT, LN_TCP_STATUS_CLOSE_WAIT, LN_TCP_STATUS_LAST_ACK, } LN_TCP_STATUS; struct ln_tcp_stream { int fd; uint32_t sip; uint32_t dip; uint16_t sport; uint16_t dport; uint16_t proto; uint8_t localmac[RTE_ETHER_ADDR_LEN]; uint32_t snd_nxt; uint32_t rev_nxt; LN_TCP_STATUS status; struct rte_ring* snd_buf; struct rte_ring* rev_buf; struct ln_tcp_stream* prev; struct ln_tcp_stream* next; }; struct ln_tcp_table { int count; struct ln_tcp_stream* streams; }; struct ln_tcp_fragment { uint16_t sport; uint16_t dport; uint32_t seqnum; uint32_t acknum; uint8_t hdrlen_off; uint8_t tcp_flags; uint16_t windows; uint16_t cksum; uint16_t tcp_urp; int optlen; uint32_t option[TCP_OPTION_LENGTH]; uint8_t* data; int length; }; struct ln_tcp_table* tcpt = NULL; static struct ln_tcp_table* ln_tcp_instance(void) { if(tcpt == NULL) { tcpt = rte_malloc("tcpt", sizeof(struct ln_tcp_table), 0); if(!tcpt) { rte_exit(EXIT_FAILURE, "Error with malloc tcpt"); } memset(tcpt, 0, sizeof(struct ln_tcp_table)); } return tcpt; } static struct ln_tcp_stream* ln_tcp_stream_search(uint32_t sip, uint32_t dip, uint16_t sport, uint16_t dport) { struct ln_tcp_table* table = ln_tcp_instance(); struct ln_tcp_stream* iter; for(iter = table->streams; iter != NULL; iter = iter->next) { if(iter->dip == dip && iter->sip == sip && iter->sport == sport && iter->dport == dport) { return iter; } } return NULL; } static struct ln_tcp_stream* ln_tcp_stream_create(uint32_t sip, uint32_t dip, uint32_t sport, uint32_t dport) { struct ln_tcp_stream* stream = rte_malloc("ln_tcp_stream", sizeof(struct ln_tcp_stream), 0); if(!stream) return NULL; stream->sip = sip; stream->dip = dip; stream->sport = sport; stream->dport = dport; stream->proto = IPPROTO_TCP; stream->status = LN_TCP_STATUS_LISTEN; uint32_t next_seed = time(NULL); stream->snd_nxt = rand_r(&next_seed) % TCP_MAX_SEQ; stream->rev_buf = rte_ring_create("tcp_rev_ring", RING_SIZE, rte_socket_id(), 0); stream->snd_buf = rte_ring_create("tcp_snd_ring", RING_SIZE, rte_socket_id(), 0); rte_memcpy(stream->localmac, gSrcMac, RTE_ETHER_ADDR_LEN); struct ln_tcp_table* table = ln_tcp_instance(); LL_ADD(stream, table->streams); table->count++; return stream; } static int ln_tcp_handle_listen(struct ln_tcp_stream* stream, struct rte_tcp_hdr* hdr) { if(hdr->tcp_flags & RTE_TCP_SYN_FLAG) { if(stream->status == LN_TCP_STATUS_LISTEN) { struct ln_tcp_fragment* fragment = rte_malloc("tcp_fragment", sizeof(struct ln_tcp_fragment), 0); if(!fragment) { return -1; } memset(fragment, 0, sizeof(struct ln_tcp_fragment)); fragment->sport = hdr->dst_port; fragment->dport = hdr->src_port; struct in_addr addr; addr.s_addr = stream->sip; printf("tcp --> src: %s:%d ", inet_ntoa(addr), ntohs(hdr->src_port)); addr.s_addr = stream->dip; printf(" --> dst: %s:%d\n", inet_ntoa(addr), ntohs(hdr->dst_port)); fragment->seqnum = stream->snd_nxt; printf("before get ack\n"); fragment->acknum = ntohl(hdr->sent_seq) + 1; printf("before get flags\n"); fragment->tcp_flags = (RTE_TCP_ACK_FLAG | RTE_TCP_SYN_FLAG); fragment->windows = TCP_INITIAL_WINDOW; fragment->hdrlen_off = 0x50; fragment->data = NULL; fragment->length = 0; rte_ring_mp_enqueue(stream->snd_buf, fragment); stream->status = LN_TCP_STATUS_SYN_RECV; } } return 0; } static int ln_tcp_handle_syn_recv(struct ln_tcp_stream* stream, struct rte_tcp_hdr* hdr) { if(hdr->tcp_flags & RTE_TCP_ACK_FLAG) { if(stream->status == LN_TCP_STATUS_SYN_RECV) { uint32_t ack = ntohl(hdr->recv_ack); if(ack == stream->snd_nxt + 1) { } stream->status = LN_TCP_STATUS_ESTABLELISTEN; } } return 0; } static int ln_tcp_process(struct rte_mbuf* tcpmbuf) { printf("ln_tcp_process\n"); struct rte_ipv4_hdr* iphdr = rte_pktmbuf_mtod_offset(tcpmbuf, struct rte_ipv4_hdr*, sizeof(struct rte_ether_hdr)); struct rte_tcp_hdr* tcphdr = (struct rte_tcp_hdr*)(iphdr + 1); #if 1 uint16_t tcpcksum = tcphdr->cksum; tcphdr->cksum = 0; uint16_t cksum = rte_ipv4_udptcp_cksum(iphdr, tcphdr); if(tcpcksum != cksum) { printf("cksum: %x, tcp cksum: %x\n", cksum, tcpcksum); return -1; } #endif struct ln_tcp_stream* stream = ln_tcp_stream_search(iphdr->src_addr, iphdr->dst_addr, tcphdr->src_port, tcphdr->dst_port); if(stream == NULL) { stream = ln_tcp_stream_create(iphdr->src_addr, iphdr->dst_addr, tcphdr->src_port, tcphdr->dst_port); if(stream == NULL) return -2; } switch(stream->status) { case LN_TCP_STATUS_CLOSED: break; case LN_TCP_STATUS_LISTEN: printf("listen\n"); ln_tcp_handle_listen(stream, tcphdr); break; case LN_TCP_STATUS_SYN_RECV: printf("recv\n"); ln_tcp_handle_syn_recv(stream, tcphdr); break; case LN_TCP_STATUS_SYN_SEND: break; case LN_TCP_STATUS_ESTABLELISTEN: { printf("establelisten\n"); uint8_t hdrlen = (tcphdr->data_off & 0xF0); //hdrlen >= 4; uint8_t* offload = (uint8_t*)(tcphdr + 1) + hdrlen * 4; printf("offload: %s\n", offload); break; } case LN_TCP_STATUS_FIN_WAIT_1: break; case LN_TCP_STATUS_FIN_WAIT_2: break; case LN_TCP_STATUS_CLOSEING: break; case LN_TCP_STATUS_TIME_WAIT: break; case LN_TCP_STATUS_CLOSE_WAIT: break; case LN_TCP_STATUS_LAST_ACK: break; } return 0; } static int ln_encode_tcp_pkt(uint8_t* msg, uint32_t sip, uint32_t dip, uint8_t* smac, uint8_t* dmac, struct ln_tcp_fragment* fragment) { printf("ln_encode_tcp_pkt\n"); uint16_t hdr_len = sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_tcp_hdr); uint16_t total_len = fragment->length + hdr_len + fragment->optlen * sizeof(uint32_t); struct rte_ether_hdr* ethhdr = (struct rte_ether_hdr*)msg; rte_memcpy(ethhdr->s_addr.addr_bytes, smac, RTE_ETHER_ADDR_LEN); rte_memcpy(ethhdr->d_addr.addr_bytes, dmac, RTE_ETHER_ADDR_LEN); ethhdr->ether_type = htons(RTE_ETHER_TYPE_IPV4); struct rte_ipv4_hdr* iphdr = (struct rte_ipv4_hdr*)(ethhdr + 1); iphdr->version_ihl = 0x45; iphdr->time_to_live = 64; iphdr->src_addr = sip; iphdr->dst_addr = dip; iphdr->next_proto_id = IPPROTO_TCP; iphdr->fragment_offset = 0; iphdr->total_length = htons(total_len - sizeof(struct rte_ether_hdr)); iphdr->packet_id = 0; iphdr->type_of_service = 0; iphdr->hdr_checksum = 0; iphdr->hdr_checksum = rte_ipv4_cksum(iphdr); struct rte_tcp_hdr* tcphdr = (struct rte_tcp_hdr*)(iphdr + 1); tcphdr->src_port = fragment->sport; tcphdr->dst_port = fragment->dport; tcphdr->recv_ack = htonl(fragment->acknum); tcphdr->sent_seq = htonl(fragment->seqnum); tcphdr->data_off = fragment->hdrlen_off; tcphdr->rx_win = fragment->windows; tcphdr->tcp_flags = fragment->tcp_flags; tcphdr->tcp_urp = fragment->tcp_urp; if(fragment->data != NULL) { uint8_t* offload = (uint8_t*)(tcphdr + 1) + fragment->optlen * sizeof(uint32_t); rte_memcpy(offload, fragment->data, fragment->length); } tcphdr->cksum = 0; tcphdr->cksum = rte_ipv4_udptcp_cksum(iphdr, tcphdr); return 0; } static struct rte_mbuf* ln_send_tcp(struct rte_mempool* mbuf_pool, uint32_t sip, uint32_t dip, uint8_t* smac, uint8_t* dmac, struct ln_tcp_fragment* fragment) { struct rte_mbuf* mbuf = rte_pktmbuf_alloc(mbuf_pool); if(!mbuf) { rte_exit(EXIT_FAILURE, "rte_pktmbuf_alloc tcp\n"); } uint16_t total_len = fragment->length + sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_tcp_hdr) + fragment->optlen * sizeof(uint32_t); mbuf->pkt_len = total_len; mbuf->data_len = total_len; uint8_t* pktdata = rte_pktmbuf_mtod(mbuf, uint8_t*); ln_encode_tcp_pkt(pktdata, sip, dip, smac, dmac, fragment); return mbuf; } static int ln_tcp_out(struct rte_mempool* mbuf_pool) { struct ln_tcp_table* table = ln_tcp_instance(); struct ln_tcp_stream* stream = NULL; for(stream = table->streams; stream != NULL; stream = stream->next) { struct ln_tcp_fragment* fragment = NULL; int nb_snd = rte_ring_mc_dequeue(stream->snd_buf, (void**)&fragment); if(nb_snd < 0) continue; uint8_t* dmac = ng_get_dst_macaddr(stream->sip); if(dmac == NULL) { struct rte_mbuf* arp_buf = ng_send_arp(mbuf_pool, RTE_ARP_OP_REQUEST, gDefaultArpMac, stream->dip, stream->sip); struct inout_ring* ring = inout_ring_instance(); rte_ring_mp_enqueue_burst(ring->out, (void**)&arp_buf, 1, NULL); rte_ring_mp_enqueue(stream->snd_buf, fragment); } else { struct rte_mbuf* tcp_buf = ln_send_tcp(mbuf_pool, stream->dip, stream->sip, stream->localmac, dmac, fragment); struct inout_ring* ring = inout_ring_instance(); rte_ring_mp_enqueue_burst(ring->out, (void**)&tcp_buf, 1, NULL); rte_free(fragment); } } return 0; } int main(int argc, char *argv[]) { if (rte_eal_init(argc, argv) < 0) { rte_exit(EXIT_FAILURE, "Error with EAL init\n"); } struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create("mbuf pool", NUM_MBUFS, 0, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); if (mbuf_pool == NULL) { rte_exit(EXIT_FAILURE, "Could not create mbuf pool\n"); } ng_init_port(mbuf_pool); rte_eth_macaddr_get(gDpdkPortId, (struct rte_ether_addr *)gSrcMac); #if ENABLE_TIMER rte_timer_subsystem_init(); struct rte_timer arp_timer; rte_timer_init(&arp_timer); uint64_t hz = rte_get_timer_hz(); unsigned lcore_id = rte_lcore_id(); rte_timer_reset(&arp_timer, hz, PERIODICAL, lcore_id, arp_request_timer_cb, mbuf_pool); #endif struct inout_ring* ring = inout_ring_instance(); if(ring == NULL) rte_exit(EXIT_FAILURE, "Could not init ioInst\n"); if(ring->in == NULL) ring->in = rte_ring_create("ring in", RING_SIZE, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); if(ring->out == NULL) ring->out = rte_ring_create("ring out", RING_SIZE, rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ); lcore_id = rte_get_next_lcore(lcore_id, 1, 0); rte_eal_remote_launch(pkt_process, mbuf_pool, lcore_id); lcore_id = rte_get_next_lcore(lcore_id, 1, 0); rte_eal_remote_launch(udp_server_entry, mbuf_pool, lcore_id); while (1) { struct rte_mbuf* rx[BURST_SIZE]; unsigned nb_recv = rte_eth_rx_burst(gDpdkPortId, 0, rx, BURST_SIZE); if(nb_recv > BURST_SIZE) { rte_exit(EXIT_FAILURE, "Error receiving from eth\n"); } else if(nb_recv > 0){ rte_ring_sp_enqueue_burst(ring->in, (void**)rx, nb_recv, NULL); } struct rte_mbuf* tx[BURST_SIZE]; unsigned nb_send = rte_ring_sc_dequeue_burst(ring->out, (void**)tx, BURST_SIZE, NULL); if(nb_send > 0) { rte_eth_tx_burst(gDpdkPortId, 0, tx, nb_send); unsigned i = 0; for(i = 0; i < nb_send; i++) { rte_pktmbuf_free(tx[i]); } } #if ENABLE_TIMER static uint64_t prev_tsc = 0, cur_tsc; uint64_t diff_tsc; cur_tsc = rte_rdtsc(); diff_tsc = cur_tsc - prev_tsc; if (diff_tsc > TIMER_RESOLUTION_CYCLES) { rte_timer_manage(); prev_tsc = cur_tsc; } #endif } }