AI+无线通信总结——初赛算法实现(Top37)

简介: AI+无线通信总结——初赛算法实现(Top37)

先上代码。剩下的有时间在补,最终提交的初赛排名是37,但是没有跑完,跑完应该能进前20,但是由于算力不足,只跑了50个epoch。我先抛个砖,希望能引来玉。

完整代码:

Model_define_pytorch.py

#!/usr/bin/env python3
import numpy as np
import torch.nn as nn
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset
from collections import OrderedDict
channelNum = 64
class Mish(torch.nn.Module):
    def __init__(self):
        super().__init__()
    def forward(self, x):
        x = x * (torch.tanh(torch.nn.functional.softplus(x)))
        return x
# This part implement the quantization and dequantization operations.
# The output of the encoder must be the bitstream.
def Num2Bit(Num, B):
    Num_ = Num.type(torch.uint8)
    def integer2bit(integer, num_bits=B * 2):
        dtype = integer.type()
        exponent_bits = -torch.arange(-(num_bits - 1), 1).type(dtype)
        exponent_bits = exponent_bits.repeat(integer.shape + (1,))
        out = integer.unsqueeze(-1) // 2 ** exponent_bits
        return (out - (out % 1)) % 2
    bit = integer2bit(Num_)
    bit = (bit[:, :, B:]).reshape(-1, Num_.shape[1] * B)
    return bit.type(torch.float32)
def Bit2Num(Bit, B):
    Bit_ = Bit.type(torch.float32)
    Bit_ = torch.reshape(Bit_, [-1, int(Bit_.shape[1] / B), B])
    num = torch.zeros(Bit_[:, :, 1].shape).cuda()
    for i in range(B):
        num = num + Bit_[:, :, i] * 2 ** (B - 1 - i)
    return num
class Quantization(torch.autograd.Function):
    @staticmethod
    def forward(ctx, x, B):
        ctx.constant = B
        step = 2 ** B
        out = torch.round(x * step - 0.5)
        out = Num2Bit(out, B)
        return out
    @staticmethod
    def backward(ctx, grad_output):
        # return as many input gradients as there were arguments.
        # Gradients of constant arguments to forward must be None.
        # Gradient of a number is the sum of its four bits.
        b, _ = grad_output.shape
        grad_num = torch.sum(grad_output.reshape(b, -1, ctx.constant), dim=2)
        return grad_num, None
class Dequantization(torch.autograd.Function):
    @staticmethod
    def forward(ctx, x, B):
        ctx.constant = B
        step = 2 ** B
        out = Bit2Num(x, B)
        out = (out + 0.5) / step
        return out
    @staticmethod
    def backward(ctx, grad_output):
        # return as many input gradients as there were arguments.
        # Gradients of non-Tensor arguments to forward must be None.
        # repeat the gradient of a Num for B time.
        b, c = grad_output.shape
        grad_output = grad_output.unsqueeze(2) / ctx.constant
        grad_bit = grad_output.expand(b, c, ctx.constant)
        return torch.reshape(grad_bit, (-1, c * ctx.constant)), None
class QuantizationLayer(nn.Module):
    def __init__(self, B):
        super(QuantizationLayer, self).__init__()
        self.B = B
    def forward(self, x):
        out = Quantization.apply(x, self.B)
        return out
class ResBlock(nn.Module):
    """
    Sequential residual blocks each of which consists of \
    two convolution layers.
    Args:
        ch (int): number of input and output channels.
        nblocks (int): number of residual blocks.
        shortcut (bool): if True, residual tensor addition is enabled.
    """
    def __init__(self, ch, nblocks=1, shortcut=True):
        super().__init__()
        self.shortcut = shortcut
        self.module_list = nn.ModuleList()
        for i in range(nblocks):
            resblock_one = nn.ModuleList()
            resblock_one.append(ConvBN(ch, ch, 1))
            resblock_one.append(Mish())
            resblock_one.append(ConvBN(ch, ch, 3))
            resblock_one.append(Mish())
            self.module_list.append(resblock_one)
    def forward(self, x):
        for module in self.module_list:
            h = x
            for res in module:
                h = res(h)
            x = x + h if self.shortcut else h
        return x
class DequantizationLayer(nn.Module):
    def __init__(self, B):
        super(DequantizationLayer, self).__init__()
        self.B = B
    def forward(self, x):
        out = Dequantization.apply(x, self.B)
        return out
def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=True)
class ConvBN(nn.Sequential):
    def __init__(self, in_planes, out_planes, kernel_size, stride=1, groups=1):
        if not isinstance(kernel_size, int):
            padding = [(i - 1) // 2 for i in kernel_size]
        else:
            padding = (kernel_size - 1) // 2
        super(ConvBN, self).__init__(OrderedDict([
            ('conv', nn.Conv2d(in_planes, out_planes, kernel_size, stride,
                               padding=padding, groups=groups, bias=False)),
            ('bn', nn.BatchNorm2d(out_planes)),
            ('Mish', Mish())
        ]))
class CRBlock64(nn.Module):
    def __init__(self):
        super(CRBlock64, self).__init__()
        self.convbncrb = ConvBN(channelNum, channelNum * 2, 3)
        self.path1 = Encoder_conv(channelNum * 2, 4)
        self.path2 = nn.Sequential(OrderedDict([
            ('conv1x5', ConvBN(channelNum * 2, channelNum * 2, [1, 5])),
            ('conv5x1', ConvBN(channelNum * 2, channelNum * 2, [5, 1])),
            ('conv5x1', ConvBN(channelNum * 2, channelNum * 2, 1)),
            ('conv5x1', ConvBN(channelNum * 2, channelNum * 2, 3)),
        ]))
        self.encoder_conv = Encoder_conv(channelNum * 4, 4)
        self.encoder_conv1 = ConvBN(channelNum * 4, channelNum, 1)
        self.identity = nn.Identity()
        self.relu = Mish()
    def forward(self, x):
        identity = self.identity(x)
        x = self.convbncrb(x)
        out1 = self.path1(x)
        out2 = self.path2(x)
        out = torch.cat((out1, out2), dim=1)
        out = self.relu(out)
        out = self.encoder_conv(out)
        out = self.encoder_conv1(out)
        out = self.relu(out + identity)
        return out
class CRBlock(nn.Module):
    def __init__(self):
        super(CRBlock, self).__init__()
        self.convban = nn.Sequential(OrderedDict([
            ("conv3x3_bn", ConvBN(channelNum, channelNum, 3)),
        ]))
        self.path1 = Encoder_conv(channelNum, 4)
        self.path2 = nn.Sequential(OrderedDict([
            ('conv1x5', ConvBN(channelNum, channelNum, [1, 5])),
            ('conv5x1', ConvBN(channelNum, channelNum, [5, 1])),
            ("conv9x1_bn", ConvBN(channelNum, channelNum, 1)),
        ]))
        self.encoder_conv = Encoder_conv(channelNum * 2)
        self.encoder_conv1 = ConvBN(channelNum * 2, channelNum, 1)
        self.identity = nn.Identity()
        self.relu = Mish()
    def forward(self, x):
        identity = self.identity(x)
        x = self.convban(x)
        out1 = self.path1(x)
        out2 = self.path2(x)
        out = torch.cat((out1, out2), dim=1)
        out = self.relu(out)
        out = self.encoder_conv(out)
        out = self.encoder_conv1(out)
        out = self.relu(out + identity)
        return out
class Encoder_conv(nn.Module):
    def __init__(self, in_planes=128, blocks=2):
        super().__init__()
        self.conv2 = ConvBN(in_planes, in_planes, [1, 9])
        self.conv3 = ConvBN(in_planes, in_planes, [9, 1])
        self.conv4 = ConvBN(in_planes, in_planes, 1)
        self.resBlock = ResBlock(ch=in_planes, nblocks=blocks)
        self.conv5 = ConvBN(in_planes, in_planes, [1, 7])
        self.conv6 = ConvBN(in_planes, in_planes, [7, 1])
        self.conv7 = ConvBN(in_planes, in_planes, 1)
        self.relu = Mish()
    def forward(self, input):
        x2 = self.conv2(input)
        x3 = self.conv3(x2)
        x4 = self.conv4(x3)
        r1 = self.resBlock(x4)
        x5 = self.conv5(r1)
        x6 = self.conv6(x5)
        x7 = self.conv7(x6)
        x7 = self.relu(x7 + x4)
        return x7
class Encoder(nn.Module):
    B = 4
    def __init__(self, feedback_bits, quantization=True):
        super(Encoder, self).__init__()
        self.convban = nn.Sequential(OrderedDict([
            ("conv3x3_bn", ConvBN(2, channelNum, 3)),
        ]))
        self.encoder1 = Encoder_conv(channelNum)
        self.encoder2 = nn.Sequential(OrderedDict([
            ('conv1x5', ConvBN(channelNum, channelNum, [1, 5])),
            ('conv5x1', ConvBN(channelNum, channelNum, [5, 1])),
            ("conv9x1_bn", ConvBN(channelNum, channelNum, 3)),
        ]))
        self.encoder_conv = Encoder_conv(channelNum * 2)
        self.encoder_conv1 = nn.Sequential(OrderedDict([
            ("conv1x1_bn", ConvBN(channelNum * 2, 2, 1)),
        ]))
        self.fc = nn.Linear(1024, int(feedback_bits / self.B))
        self.sig = nn.Sigmoid()
        self.quantize = QuantizationLayer(self.B)
        self.quantization = quantization
    def forward(self, x):
        x = self.convban(x)
        encode1 = self.encoder1(x)
        encode2 = self.encoder2(x)
        out = torch.cat((encode1, encode2), dim=1)
        out = self.encoder_conv(out)
        out = self.encoder_conv1(out)
        out = out.view(-1, 1024)
        out = self.fc(out)
        out = self.sig(out)
        if self.quantization:
            out = self.quantize(out)
        else:
            out = out
        return out
class Decoder(nn.Module):
    B = 4
    def __init__(self, feedback_bits, quantization=True):
        super(Decoder, self).__init__()
        self.feedback_bits = feedback_bits
        self.dequantize = DequantizationLayer(self.B)
        self.fc = nn.Linear(int(feedback_bits / self.B), 1024)
        decoder = OrderedDict([
            ("conv3x3_bn", ConvBN(2, channelNum, 3)),
            ("CRBlock1", CRBlock64()),
            ("CRBlock2", CRBlock()),
        ])
        self.decoder_feature = nn.Sequential(decoder)
        self.out_cov = conv3x3(channelNum, 2)
        self.sig = nn.Sigmoid()
        self.quantization = quantization
    def forward(self, x):
        if self.quantization:
            out = self.dequantize(x)
        else:
            out = x
        out = out.view(-1, int(self.feedback_bits / self.B))
        out = self.fc(out)
        out = out.view(-1, 2, 16, 32)
        out = self.decoder_feature(out)
        out = self.out_cov(out)
        out = self.sig(out)
        return out
# Note: Do not modify following class and keep it in your submission.
# feedback_bits is 128 by default.
class AutoEncoder(nn.Module):
    def __init__(self, feedback_bits):
        super(AutoEncoder, self).__init__()
        self.encoder = Encoder(feedback_bits)
        self.decoder = Decoder(feedback_bits)
    def forward(self, x):
        feature = self.encoder(x)
        out = self.decoder(feature)
        return out
def NMSE(x, x_hat):
    x_real = np.reshape(x[:, :, :, 0], (len(x), -1))
    x_imag = np.reshape(x[:, :, :, 1], (len(x), -1))
    x_hat_real = np.reshape(x_hat[:, :, :, 0], (len(x_hat), -1))
    x_hat_imag = np.reshape(x_hat[:, :, :, 1], (len(x_hat), -1))
    x_C = x_real - 0.5 + 1j * (x_imag - 0.5)
    x_hat_C = x_hat_real - 0.5 + 1j * (x_hat_imag - 0.5)
    power = np.sum(abs(x_C) ** 2, axis=1)
    mse = np.sum(abs(x_C - x_hat_C) ** 2, axis=1)
    nmse = np.mean(mse / power)
    return nmse
def NMSE_cuda(x, x_hat):
    x_real = x[:, 0, :, :].view(len(x), -1) - 0.5
    x_imag = x[:, 1, :, :].view(len(x), -1) - 0.5
    x_hat_real = x_hat[:, 0, :, :].view(len(x_hat), -1) - 0.5
    x_hat_imag = x_hat[:, 1, :, :].view(len(x_hat), -1) - 0.5
    power = torch.sum(x_real ** 2 + x_imag ** 2, axis=1)
    mse = torch.sum((x_real - x_hat_real) ** 2 + (x_imag - x_hat_imag) ** 2, axis=1)
    nmse = mse / power
    return nmse
class NMSELoss(nn.Module):
    def __init__(self, reduction='sum'):
        super(NMSELoss, self).__init__()
        self.reduction = reduction
    def forward(self, x_hat, x):
        nmse = NMSE_cuda(x, x_hat)
        if self.reduction == 'mean':
            nmse = torch.mean(nmse)
        else:
            nmse = torch.sum(nmse)
        return nmse
def Score(NMSE):
    score = 1 - NMSE
    return score
# dataLoader
class DatasetFolder(Dataset):
    def __init__(self, matData):
        self.matdata = matData
    def __len__(self):
        return self.matdata.shape[0]
    def __getitem__(self, index):
        return self.matdata[index]  # , self.matdata[index]

Model_train.py

'''
seefun . Aug 2020.
github.com/seefun | kaggle.com/seefun
'''
import numpy as np
import h5py
import torch
import os
import torch.nn as nn
import random
from Model_define_pytorch import AutoEncoder, DatasetFolder, NMSE_cuda, NMSELoss
# Parameters for training
gpu_list = '0'
os.environ["CUDA_VISIBLE_DEVICES"] = gpu_list
def seed_everything(seed=42):
    random.seed(seed)
    os.environ['PYHTONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
SEED = 42
seed_everything(SEED)
batch_size = 32
epochs = 150
learning_rate = 1e-3  # bigger to train faster
num_workers = 0
print_freq = 100
train_test_ratio = 0.8
# parameters for data
feedback_bits = 128
img_height = 16
img_width = 32
img_channels = 2
# Model construction
model = AutoEncoder(feedback_bits)
model.encoder.quantization = False
model.decoder.quantization = False
if len(gpu_list.split(',')) > 1:
    model = torch.nn.DataParallel(model).cuda()  # model.module
else:
    model = model.cuda()
criterion = NMSELoss(reduction='mean')  # nn.MSELoss()
criterion_test = NMSELoss(reduction='sum')
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
# Data loading
data_load_address = './data'
mat = h5py.File(data_load_address + '/Hdata.mat', 'r')
data = np.transpose(mat['H_train'])  # shape=(320000, 1024)
data = data.astype('float32')
data = np.reshape(data, [len(data), img_channels, img_height, img_width])
# split data for training(80%) and validation(20%)
np.random.shuffle(data)
start = int(data.shape[0] * train_test_ratio)
x_train, x_test = data[:start], data[start:]
# dataLoader for training
train_dataset = DatasetFolder(x_train)
train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True, drop_last=True)
# dataLoader for training
test_dataset = DatasetFolder(x_test)
test_loader = torch.utils.data.DataLoader(
    test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True)
best_loss = 100
for epoch in range(epochs):
    print('========================')
    print('lr:%.4e' % optimizer.param_groups[0]['lr'])
    # model training
    model.train()
    if epoch < epochs // 10:
        try:
            model.encoder.quantization = False
            model.decoder.quantization = False
        except:
            model.module.encoder.quantization = False
            model.module.decoder.quantization = False
    else:
        try:
            model.encoder.quantization = True
            model.decoder.quantization = True
        except:
            model.module.encoder.quantization = True
            model.module.decoder.quantization = True
    if epoch % 50 == 0 and epoch > 0:
        optimizer.param_groups[0]['lr'] = optimizer.param_groups[0]['lr'] * 0.1
    for i, input in enumerate(train_loader):
        input = input.cuda()
        output = model(input)
        loss = criterion(output, input)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        if i % print_freq == 0:
            print('Epoch: [{0}][{1}/{2}]\t'
                  'Loss {loss:.4f}\t'.format(
                epoch, i, len(train_loader), loss=loss.item()))
    model.eval()
    try:
        model.encoder.quantization = True
        model.decoder.quantization = True
    except:
        model.module.encoder.quantization = True
        model.module.decoder.quantization = True
    total_loss = 0
    with torch.no_grad():
        for i, input in enumerate(test_loader):
            # convert numpy to Tensor
            input = input.cuda()
            output = model(input)
            total_loss += criterion_test(output, input).item()
        average_loss = total_loss / len(test_dataset)
        print('NMSE %.4f' % average_loss)
        if average_loss < best_loss:
            # model save
            # save encoder
            modelSave1 = './Modelsave/encoder.pth.tar'
            try:
                torch.save({'state_dict': model.encoder.state_dict(), }, modelSave1)
            except:
                torch.save({'state_dict': model.module.encoder.state_dict(), }, modelSave1)
            # save decoder
            modelSave2 = './Modelsave/decoder.pth.tar'
            try:
                torch.save({'state_dict': model.decoder.state_dict(), }, modelSave2)
            except:
                torch.save({'state_dict': model.module.decoder.state_dict(), }, modelSave2)
            print('Model saved!')
            best_loss = average_loss
目录
相关文章
|
9月前
|
人工智能 算法 搜索推荐
电商API的“AI革命”:全球万亿市场如何被算法重新定义?
AI+电商API正引领智能商业变革,通过智能推荐、动态定价与自动化运营三大核心场景,大幅提升转化率、利润率与用户体验。2025年,75%电商API将具备个性化能力,90%业务实现智能决策,AI与API的深度融合将成为未来电商竞争的关键基石。
|
人工智能 自然语言处理 算法
阿里云 AI 搜索开放平台:从算法到业务——AI 搜索驱动企业智能化升级
本文介绍了阿里云 AI 搜索开放平台的技术的特点及其在各行业的应用。
1221 3
|
7月前
|
机器学习/深度学习 人工智能 算法
当AI提示词遇见精密算法:TimeGuessr如何用数学魔法打造文化游戏新体验
TimeGuessr融合AI与历史文化,首创时间与空间双维度评分体系,结合分段惩罚、Haversine距离计算与加权算法,辅以连击、速度与完美奖励机制,实现公平且富挑战性的游戏体验。
|
9月前
|
机器学习/深度学习 人工智能 算法
AI-Compass RLHF人类反馈强化学习技术栈:集成TRL、OpenRLHF、veRL等框架,涵盖PPO、DPO算法实现大模型人类价值对齐
AI-Compass RLHF人类反馈强化学习技术栈:集成TRL、OpenRLHF、veRL等框架,涵盖PPO、DPO算法实现大模型人类价值对齐
 AI-Compass RLHF人类反馈强化学习技术栈:集成TRL、OpenRLHF、veRL等框架,涵盖PPO、DPO算法实现大模型人类价值对齐
|
人工智能 编解码 算法
DeepSeek加持的通义灵码2.0 AI程序员实战案例:助力嵌入式开发中的算法生成革新
本文介绍了通义灵码2.0 AI程序员在嵌入式开发中的实战应用。通过安装VS Code插件并登录阿里云账号,用户可切换至DeepSeek V3模型,利用其强大的代码生成能力。实战案例中,AI程序员根据自然语言描述快速生成了C语言的base64编解码算法,包括源代码、头文件、测试代码和CMake编译脚本。即使在编译错误和需求迭代的情况下,AI程序员也能迅速分析问题并修复代码,最终成功实现功能。作者认为,通义灵码2.0显著提升了开发效率,打破了编程语言限制,是AI编程从辅助工具向工程级协同开发转变的重要标志,值得开发者广泛使用。
9435 71
DeepSeek加持的通义灵码2.0 AI程序员实战案例:助力嵌入式开发中的算法生成革新
|
9月前
|
机器学习/深度学习 人工智能 算法
AI-Compass 强化学习模块:理论到实战完整RL技术生态,涵盖10+主流框架、多智能体算法、游戏AI与金融量化应用
AI-Compass 强化学习模块:理论到实战完整RL技术生态,涵盖10+主流框架、多智能体算法、游戏AI与金融量化应用
|
12月前
|
机器学习/深度学习 人工智能 JSON
这个AI把arXiv变成代码工厂,快速复现顶会算法!Paper2Code:AI论文自动转代码神器,多智能体框架颠覆科研复现
Paper2Code是由韩国科学技术院与DeepAuto.ai联合开发的多智能体框架,通过规划、分析和代码生成三阶段流程,将机器学习论文自动转化为可执行代码仓库,显著提升科研复现效率。
1598 19
这个AI把arXiv变成代码工厂,快速复现顶会算法!Paper2Code:AI论文自动转代码神器,多智能体框架颠覆科研复现
|
8月前
|
人工智能 算法 计算机视觉
只需完成手画线稿,让AI算法帮你自动上色
本文介绍了如何利用图像处理技术生成手绘风格图像及自动上色的方法。内容涵盖图像灰度化、梯度调整、虚拟深度实现手绘效果,以及使用 Python 编程实现相关算法。此外,还介绍了 AI 工具 Style2Paints V4.5,其可为线稿自动上色并支持多种线稿类型,如插画和手绘铅笔稿,适用于艺术创作与图像处理领域。
|
9月前
|
机器学习/深度学习 人工智能 编解码
AI视觉新突破:多角度理解3D世界的算法原理全解析
多视角条件扩散算法通过多张图片输入生成高质量3D模型,克服了单图建模背面细节缺失的问题。该技术模拟人类多角度观察方式,结合跨视图注意力机制与一致性损失优化,大幅提升几何精度与纹理保真度,成为AI 3D生成的重要突破。
1201 0

热门文章

最新文章