人工智能在脑卒中监控中的应用

1. 背景介绍

脑卒中一般发病急,无征兆或者征兆不明显,大多数脑卒中患者伴有嘴歪眼斜的症状。当前的智能摄像头有人脸识别、摔倒检测、婴儿啼哭等功能,但是对于脑卒中的监控目前尚无相关的上市产品。于是,作者基于人工智能技术,开发一款基于摄像头的脑卒中监控系统,充分利用摄像头人物跟随功能来实时监控易感人,若发现嘴歪眼斜等脑卒中症状,则立即发出警告,通知家人及时就医。

2. 核心算法模型

本系统采用2层卷积神经网络(CNN)进行图像识别。CNN通过卷积层和池化层进行特征提取,然后通过全连接层进行分类。 模型文件:mymodel.py

import torch
from torch.utils.data import Dataset
from torchvision.transforms import  Lambda
from torch import nn
import os
import pandas as pd
from torchvision.io import decode_image
from torchvision import transforms

device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
#  独热数据逆转化
def arc_one_hot(x,list=torch.tensor([0,1],dtype=torch.float).to(device)):
    return x@list
#  1.创建自定义数据集
class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file, header=None) # 注意首行默认会被作为标题行忽略,或者设置header=None 
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        #print(img_path)
        image = decode_image(img_path).float().div(255) #需要转成float类型,否则无法训练
        #print(image.shape)
        label = self.img_labels.iloc[idx, 1]
        filename = self.img_labels.iloc[idx, 0]
        #print(label)
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        #独热化
        # print(label)
        new_transform = Lambda(lambda y: torch.zeros(2, dtype=torch.float).scatter_(dim=0, index=torch.tensor(y), value=1))
        label = new_transform(label)
        return image, label, filename
    
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        
        # --- 卷积块 ---
        # 第一层卷积: 输入1通道, 输出32通道, 3x3卷积核, padding=1保持尺寸
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1)
        self.bn1=nn.BatchNorm2d(32) # 添加 BN
        # 非线性激活函数,正数保留,负数置零
        self.relu1 = nn.ReLU()
        # 最大池化: 2x2窗口, 步长2, 尺寸减半 (100->50)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # # 第二层卷积 (可选, 推荐加深网络): 输入32, 输出64
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        #在卷积层和激活函数之间加入 Batch Normalization (BN) 层。BN 可以解决内部协变量偏移问题,允许使用更大的学习率,并显著加速训练。
        self.bn2=nn.BatchNorm2d(64) # 添加 BN
        self.relu2 = nn.ReLU()
        # 再次池化: 尺寸减半 (50->25)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

        # --- 全连接块 ---
        self.flatten = nn.Flatten()
        
        # 计算展平后的维度: 
        # 初始 100x100 -> Pool1 -> 50x50 -> Pool2 -> 25x25 -> Pool3 -> 12x12
        # 通道数: 64
        # 总特征数: 64 * 25 * 25 = 40000
        fc_input_dim = 64 * 25 * 25
        
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(fc_input_dim, 512),
            nn.ReLU(),
            nn.Dropout(0.5), # 添加Dropout防止过拟合
            nn.Linear(512, 2), # 输出2个类别
        )

    def forward(self, x):
        # 卷积部分
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        x = self.pool2(x)

        
        # 展平
        x = self.flatten(x)
        # 全连接分类
        logits = self.linear_relu_stack(x)
        return logits
    
# 定义变换:调整为 28x28 并转为灰度图 (1通道)
# 注意:decode_image 返回的是 Tensor,transforms 支持 Tensor 输入
data_transforms = transforms.Compose([
    transforms.Resize((100, 100)),       # 确保尺寸一致
    transforms.Grayscale(num_output_channels=1), # 关键:转为1通道灰度图
    transforms.Normalize(mean=[0.5], std=[0.5]) # 将数据标准化到 [-1, 1] 区间,有助于加速收敛。
])

3. 训练模型

训练模型代码:mytrain.py

import torch
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor, Lambda
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
from torch import nn
import os
import pandas as pd
from torchvision.io import decode_image
from torchvision import transforms
from mymodule.mymodel import NeuralNetwork, CustomImageDataset,arc_one_hot,device,data_transforms
import torchvision.utils as vutils
from mymodule.model_visualization import VisualizeModel

from torch.utils.tensorboard import SummaryWriter

import argparse
parser = argparse.ArgumentParser(description='location the model')
parser.add_argument("-m","--model", type=str, default='model.pth', help='location of the model')
parser.add_argument("-cl","--checkpoint-latest", type=str, default='checkpoint_latest.pth', help='location of the latest checkpoint')
parser.add_argument("-cb","--checkpoint-best", type=str, default='checkpoint_best.pth', help='location of the best checkpoint')

args = parser.parse_args()
model_path=args.model
checkpoint_latest_path=args.checkpoint_latest
checkpoint_best_path=args.checkpoint_best

# csv注意是否有标题行
csv_path='cuzhong/train.csv'
img_dir='cuzhong/train/'
batch_size = 64

# 创建自定义数据集实例
mydataset = CustomImageDataset(annotations_file=csv_path, img_dir=img_dir, transform=data_transforms, target_transform=None)

# 使用 DataLoader 加载数据
mydataloader = DataLoader(mydataset, batch_size, shuffle=True, num_workers=0) #, num_workers=4 macos报错
print(len(mydataloader))
print(len(mydataloader.dataset))

csv_path_test='cuzhong/test.csv'
img_dir_test='cuzhong/test/'
# 创建自定义数据集实例
test_data = CustomImageDataset(annotations_file=csv_path_test, img_dir=img_dir_test, transform=data_transforms, target_transform=None)


test_dataloader = DataLoader(test_data, batch_size=batch_size)


for X, y,z in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break
print(len(mydataloader))
# exit()


#  2. 可视化数据
def showdata():
    labels_map = {
        0: "OK",
        1: "Error",
    }
    figure = plt.figure(figsize=(8, 8))
    cols, rows = 2, 1
    xxx=''
    for i in range(1, cols * rows + 1):
        sample_idx = torch.randint(len(mydataset), size=(1,)).item()
        img, label, filename = mydataset[sample_idx]
        figure.add_subplot(rows, cols, i)
        # 独热逆转化
        label=arc_one_hot(label.to(device)).item()
        plt.title(labels_map[label]+"  @"+filename)
        plt.axis("off")
        xxx=img
        print(img.shape)
        plt.imshow(img.squeeze(), cmap="gray")
    plt.show()
    print(xxx.shape)
    print('------')
    print(xxx.squeeze().shape)

    # Display image and label.
    train_features, train_labels, train_filenames = next(iter(mydataloader))
    print(f"Feature batch shape: {train_features.size()}")
    print(f"Labels batch shape: {train_labels.size()}")
    img = train_features[0].squeeze()
    label = train_labels[0]
    # 独热逆转化
    label=arc_one_hot(label.to(device)).item()
    plt.title(labels_map[label]+"  @"+train_filenames[0])
    plt.axis("off")
    plt.imshow(img, cmap="gray")
    plt.show()
    print(f"Label: {label}")
    # exit()


# 3.定义模型

print(f"Using {device} device")

    
model = NeuralNetwork().to(device)
print(model)

#初始化TensorBoard writer
writer = SummaryWriter('runs/logs/mymodule_visualize_model_runs')

# 4. 定义损失函数和优化器
loss_fn = nn.CrossEntropyLoss() #交叉熵
#随机梯度下降,SGD(随机梯度下降)虽然经典,但往往需要精细调整学习率且收敛较慢。Adam 优化器能自适应调整每个参数的学习率,通常能显著加快收敛速度,尤其是在 CNN 任务中。
#optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# 修改为Adam优化器
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
# 5. 训练
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    print("size="+str(size))
    model.train() # 启用 Batch Normalization 和 Dropout,归一化,随机丢弃神经元防止过拟合,测试时不丢弃
    for batch, (X, y,z) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward() # 计算梯度
        optimizer.step() # 根据梯度优化参数
        optimizer.zero_grad() # 梯度归零

        if batch % 1 == 0: # 每100个batch打印一次
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
        # exit()
    writer.close()
# 6. 测试
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y,z in dataloader:
            X, y = X.to(device), y.to(device)
            
            pred = model(X)
            #print(pred)
            test_loss += loss_fn(pred, y).item()
            #统计个数
            yy=arc_one_hot(y)
            # 分别比较,得到真假值,然后转成浮点数,求和,得到个数
            correct += (pred.argmax(1) == yy).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f}\n")
    return test_loss,correct

#  7. 训练和测试
def do_train():
    epochs = 50
    # 初始化模型和优化器 (resume=False 表示从头开始,如果想断点续训改为 True)
    model, optimizer, scheduler, start_epoch, best_acc = load_model(resume=True, checkpoint_best_path=checkpoint_best_path, checkpoint_latest_path=checkpoint_latest_path, path=model_path)
    # 重新绑定全局变量中的 model, optimizer, scheduler (如果它们在外部定义)
    # 注意:最好将 model, optimizer, scheduler 作为参数传递,或者在 global 作用域更新它们
    globals()['model'] = model
    globals()['optimizer'] = optimizer
    globals()['scheduler'] = scheduler
    if scheduler is  None:
        # 添加调度器:当验证集 Loss 不再下降时,将学习率乘以 0.5
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)
   
    #最佳正确率
    best_acc=0.0
    for t in range(epochs):
        print(f"Epoch {t+1}\n-------------------------------")
        train(mydataloader, model, loss_fn, optimizer)
        # 【关键】测试并获取 Loss,用于调度器判断
        # 注意:你需要修改 test 函数返回 test_loss,或者在这里计算
        test_loss, correct = test(test_dataloader, model, loss_fn) # 假设 test 返回这两个值
        
        # 判断是否为最佳模型
        is_best = correct > best_acc
        if is_best:
            best_acc = correct
            # 保存 Checkpoint
            if correct >= 0.75:
                __save_model0__(f"best/best_model_{t}_{correct*100:.2f}.pth")
        # 保存 Checkpoint
        __save_model__(t + 1, model, optimizer, scheduler, best_acc, is_best)

        # if correct >= 90:
        #     break
        scheduler.step(test_loss) # 将 test_loss 传递给调度器
    print("Done!")
    __save_model0__()


#  8. 保存模型
def __save_model0__(path="model.pth"):
    torch.save(model.state_dict(), path)
    print("Saved PyTorch Model State to "+path)

def __save_model__(epoch, model, optimizer, scheduler, best_acc, is_best=False):
    """
    保存 Checkpoint
    :param is_best: 如果为 True,则额外保存为 best_model.pth
    """
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),      # 权重
        'optimizer_state_dict': optimizer.state_dict(), # 这里记录了 Adam 的 exp_avg, exp_avg_sq, step
        'scheduler_state_dict': scheduler.state_dict(), # 这里记录了调度器的步数和当前 LR
        'best_acc': best_acc,
    }
    
    # 保存最新的 checkpoint
    torch.save(checkpoint, 'checkpoint_latest.pth')
    
    # 如果是最佳模型,额外保存一份
    if is_best:
        torch.save(checkpoint, 'checkpoint_best.pth')
        print(f"Saved Best Model at Epoch {epoch} with Acc: {best_acc:.4f}")
    else:
        print(f"Saved Latest Checkpoint at Epoch {epoch}")

#  9. 加载模型
def load_model0(path="model.pth"):
    model = NeuralNetwork().to(device)
    model.load_state_dict(torch.load(path, weights_only=True,map_location=device))
    return model

def load_model(resume=False, path="model.pth", checkpoint_latest_path="checkpoint_latest.pth", checkpoint_best_path="checkpoint_best.pth"):
    """
    加载模型
    :param resume: 如果为 True,尝试从 latest checkpoint 恢复训练状态
    """
    model = NeuralNetwork().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)
    
    start_epoch = 0
    best_acc = 0.0
    
    if resume:
        if os.path.exists(checkpoint_latest_path):
            print("=> Loading checkpoint 'checkpoint_latest.pth'")
            checkpoint = torch.load(checkpoint_latest_path, map_location=device)
            start_epoch = checkpoint['epoch']
            model.load_state_dict(checkpoint['model_state_dict'])
            optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
            scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
            best_acc = checkpoint.get('best_acc', 0.0)
            print(f"Resumed from Epoch {start_epoch}, Best Acc: {best_acc:.4f}")
        else:
            print("=> No checkpoint found, starting from scratch.")
    else:
        # 如果只是测试,加载最佳模型
        if os.path.exists(checkpoint_best_path):
            print("=> Loading best model 'checkpoint_best.pth' for testing")
            checkpoint = torch.load(checkpoint_best_path, map_location=device)
            model.load_state_dict(checkpoint['model_state_dict'])
        elif os.path.exists(path):
            # 兼容旧代码
            model.load_state_dict(torch.load(path, weights_only=True, map_location=device))

    return model, optimizer, scheduler, start_epoch, best_acc
#  10. 测试模型
def  test_model():
    # model = load_model()
    # 加载最佳模型用于测试
    model, _, _, _, _ = load_model(resume=False, checkpoint_best_path=checkpoint_best_path, checkpoint_latest_path=checkpoint_latest_path, path=model_path)
    classes = [
        "OK",
        "Error",
    ]

    model.eval()

    visualize_model = VisualizeModel(model)
    # 提取第一个卷积层的卷积核
    conv1_weights = model.conv1.weight.data
    conv2_weights = model.conv2.weight.data
    print("conv1_weights")
    print(conv1_weights.shape)
    visualize_model.visualize_filters(conv1_weights,"conv1")
    print("conv2_weights")
    print(conv2_weights.shape)
    visualize_model.visualize_filters(conv2_weights,"conv2")

    

    for data in test_data:
        x, y,z = data[0], data[1], data[2]
        visualize_model = VisualizeModel(model)
        handle1 = visualize_model.register_hook(model.conv1)
        handle2 = visualize_model.register_hook(model.conv2)
        with torch.no_grad():
            x = x.to(device)
            # 【关键修复】增加批次维度: (C, H, W) -> (1, C, H, W)
            if x.dim() == 3:
                x = x.unsqueeze(0)
            print(x.shape)
            pred = model(x)
            # 独热转化,同device才能计算
            y=arc_one_hot(y.to(device)).int()
            print(y)

            print(f"Predicted: {pred[0].argmax(0)}, Actual: {y}")
            predicted, actual = classes[pred[0].argmax(0)], classes[y]
            print(f'Predicted: "{predicted}", Actual: "{actual}"')
            if predicted != actual:
                print("----------error------------- File:"+z+"\n")

        # 提取完数据后,移除钩子(好习惯,防止内存泄漏)
        handle1.remove()
        handle2.remove()
        visualize_model.visualize()

        visualize_model.grad_cam_visualize(x, model,classes, target_class=pred[0].argmax(0),actual_class=y.item(),filename=z,device=device)
        #saliency_map可视化,显示输入图像的梯度热图,反映模型对输入的敏感程度
        visualize_model.saliency_map_visualize(x, model,classes, target_class=pred[0].argmax(0),actual_class=y.item(),filename=z,device=device)
        # break  # 只看第一张图的特征图,去掉这个 break 就会一直显示每张图的特征图

def  do_test_model():
    # load_model()
    test_model()
def do_train_model():
    # showdata()
    do_train()
    test_model()
def main():
    do_train_model()
    # do_test_model()
    

if __name__ == '__main__':
    main()

4. 模型应用

对于监控照片的获取,也费了一些周折。最初想给树莓派配个摄像头,可以实现自定义拍摄。然而树莓派摄像头拍摄效果跟智能摄像头还有较大差距,比如夜视、矫畸、移动跟随等等功能缺失,还要单独造轮子。那为什么不直接从摄像头获取数据呢?经过研究,支持rtsp协议的摄像头,可以很方便的获取数据。可惜的是手头的小米摄像头并不支持rtsp协议,最终找到了萤石摄像头。

4.1 系统构成

  1. 萤石C7摄像头,用于图像采集。
  2. 树莓派作为服务器,定时采集摄像头数据,并运行模型进行预测。

4.2 模型预测

模型预测的代码如下:

import torch
from torch.utils.data import DataLoader
from torch import nn
from torchvision import transforms
from mymodule.mymodel import NeuralNetwork, CustomImageDataset,arc_one_hot,device,data_transforms
import time,os

import argparse
parser = argparse.ArgumentParser(description='location the model')
parser.add_argument("-m","--model", type=str, default='model.pth', help='location of the model')
args = parser.parse_args()
model_path=args.model
batch_size = 16


csv_path_test='cuzhong/test.csv'
img_dir_test='cuzhong/test/'

# 3.定义模型

print(f"Using {device} device")
    
#  9. 加载模型
def load_model(path="model.pth"):
    model = NeuralNetwork().to(device)
    model.load_state_dict(torch.load(path, weights_only=True,map_location=device))
    return model
#  10. 测试模型
def  test_model(path=model_path):
    model = load_model(path)
    classes = [
        "OK",
        "Error",
    ]

    model.eval()
    
    # 创建自定义数据集实例
    test_data = CustomImageDataset(annotations_file=csv_path_test, img_dir=img_dir_test, transform=data_transforms, target_transform=None)
    for data in test_data:
        x, y,z = data[0], data[1], data[2]
        with torch.no_grad():
            x = x.to(device)
            # 【关键修复】增加批次维度: (C, H, W) -> (1, C, H, W)
            if x.dim() == 3:
                x = x.unsqueeze(0)
            pred = model(x)
            # 独热转化,同device才能计算
            y=arc_one_hot(y.to(device)).int()
            predicted, actual = classes[pred[0].argmax(0)], classes[y]
            if predicted != actual:
                print("---Error: "+z+"---\t"+f'Predicted: "{predicted}", Actual: "{actual}"')
                cmd="cvlc --play-and-exit  700hzbeep.mp3"
                os.system(cmd)

def  do_test_model():
    #load_model()
    test_model()

def main():
    do_test_model()
    

if __name__ == '__main__':
    main()