Erlo

Python调用千问2.5或deepseek-r1自然语言模型,实现串口输出AT命令控制ESP32S3NANO单片机GPIO数字输出或PWM输出

2025-02-01 00:29:10 发布   20 浏览  
页面报错/反馈
收藏 点赞

初版随笔,后面有时间再完善细节,先把重要的放上来

近期deepseek-r1大火,而我在2025-1-30才去关注它,既然它这么高效,我们不妨部署一个deepseek-r1或是qwen2.5语言大模型在某些性能强大的Linux单板机上玩玩。不过家里的香橙派zero3性能实在拉垮,笔者最后还是在自己的win11笔记本电脑上部署的。

笔者也是从零开始,花了将近一下午的时间去实现部署模型、给提示词、代码调用、添加语音转文本,编写ESP32接收AT指令代码。令人欣慰的是时间没有被浪费,整体效果还是不错的。

1 安装所需环境

首先需要去安装ollama,python3.10。ollama和python的安装教程网上有很多,这里就不多赘述了。

别忘了安装好ollama和python后在windows终端执行pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/,目的是访问阿里云镜像,加速pip安装

以管理员身份打开终端,分别执行ollama run qwen2.5:1.5bollama run deepseek-r1:1.5b来安装模型

打开vscode,新建一个python文件,按照vscode的提示选择python3.10 环境,然后在vscode中打开终端器,分别在终端输入pip install ollamapip install pyserial,安装ollama的python包和串口库

2 python端代码

先展示不调用google在线语音转文的代码,毕竟要FQ

# -*- coding: utf-8 -*-
import ollama
import re
from typing import Generator
import serial

ser = serial.Serial("COM3", baudrate=115200, timeout=1)  # COM3 /dev/ttyAS5

GPIO_prompt = """
# 角色定义
你是一个多模态硬件控制指令转换助手,支持GPIO和PWM指令生成
专门将自然语言转换为标准AT指令格式

# 指令生成规范
## GPIO模式
参数要求:
- 引脚号:1-100
- 状态:0(关闭)/1(开启)
关键词:开/关/高电平/低电平

## PWM模式
参数要求:
- 引脚号:1-100(需支持PWM)
- 占空比:0-100(百分比)
关键词:占空比/亮度/百分比

# 参数转换表
| 参数类型 | 自然语言表达          | 转换规则          |
|---------|----------------------|-------------------|
| GPIO状态 | 开/高电平/启动        | 1                 |
| {state} | 关/低电平/停止        | 0                 |

| 占空比   | 数字+% (如"30%")      | 直接取值 ,不能带%号  |
|  {duty}  | 分数 (如"三分之1")    | 转换为百分比      |
|          | 模糊描述 (如"最大")   | 映射为100         |

# 输出模板
GPIO:AT+GPIO={pin},{state}
PWM:AT+PWM={pin},{duty}

# 规则
1. 占空比自动修正到0-100范围
2. 禁止同一引脚同时设置GPIO/PWM
3. GPIO状态只能是0或1
4. 当用户文本中包含‘占空比’三个字时,务必要使用PWM指令,不能使用GPIO指令
5. 必须使用标签包裹指令

# 典型示例
用户:帮我点亮48号引脚的LED
你:AT+GPIO=48,1

用户:打开48
你:AT+GPIO=48,1

用户:给48设置高电平
你:AT+GPIO=48,1

用户:给48设置低电瓶
你:AT+GPIO=48,0

用户:给48设置占空比20
你:AT+PWM=48,20

用户:48设置占空比100
你:AT+PWM=48,100

用户:设置PWM5为40%
你:AT+PWM=5,40

用户:把3号PWM调到七成
你:AT+PWM=3,70

接下来你就需要举一反三了
"""

class AiChat:
    def __init__(self, system_prompt: str):
        self.history = [{
            "role": "system",
            "content": f"{system_prompt}"
        }]

    def _extract_at_commands(self, text: str) -> list:
        pattern = r'(AT+w+=d+(?:,d+)?)'
        return re.findall(pattern, text)

    def chat(self, message: str, stream: bool = False) -> Generator[str, None, None]:
        self.history.append({"role": "user", "content": message})

        response = ollama.chat(
            model="qwen2.5:1.5b",  # deepseek-r1:1.5b qwen2.5:1.5b
            messages=self.history,
            stream=stream,
            options={
                "temperature": 0.1,
                "max_tokens": 1200,
                "top_p": 0.85
            }
        )

        full_response = ""
        if stream:
            for chunk in response:
                content = chunk["message"]["content"]
                full_response += content
                yield content
        else:
            full_response = response["message"]["content"]
            yield full_response

        self.history.append({"role": "assistant", "content": full_response})

def main():
    GPIObot = AiChat(GPIO_prompt)
    text = "给48设置占空比10" # 或者说:给48低电平

    while True:
        all_chunks = []
        for chunk in GPIObot.chat(text, stream=True):
            print(chunk, end="", flush=True)
            all_chunks.append(chunk)

        full_response = ''.join(all_chunks)
        commands = GPIObot._extract_at_commands(full_response)
        for cmd in commands:
            print(f"n[找到] {cmd}")
            if ser.is_open:
                print("n串口打开")
                try:
                    ser.write((cmd + 'rn').encode('utf-8'))  # 将字符串转换为字节
                except Exception as e:
                    print(f"串口写入失败: {e}")
                    print("猜测是Ai输出指令不匹配")
            ser.close()
            print("串口关闭")
        # 询问用户是否继续
        break

if __name__ == "__main__":
    main()

接下来展示调用调用google在线语音转文的代码,别忘了FQ,最好是规则模式

在开梯子之前,先在终端中执行pip install speechrecognition来安装语音识别库

# -*- coding: utf-8 -*-
import ollama
import re
from typing import Generator
import speech_recognition as sr
import serial

ser = serial.Serial("COM3", baudrate=115200, timeout=1)  # COM3 /dev/ttyAS5

GPIO_prompt = """
# 角色定义
你是一个多模态硬件控制指令转换助手,支持GPIO和PWM指令生成
专门将自然语言转换为标准AT指令格式

# 指令生成规范
## GPIO模式
参数要求:
- 引脚号:1-100
- 状态:0(关闭)/1(开启)
关键词:开/关/高电平/低电平

## PWM模式
参数要求:
- 引脚号:1-100(需支持PWM)
- 占空比:0-100(百分比)
关键词:占空比/亮度/百分比

# 参数转换表
| 参数类型 | 自然语言表达          | 转换规则          |
|---------|----------------------|-------------------|
| GPIO状态 | 开/高电平/启动        | 1                 |
| {state} | 关/低电平/停止        | 0                 |

| 占空比   | 数字+% (如"30%")      | 直接取值 ,不能带%号  |
|  {duty}  | 分数 (如"三分之1")    | 转换为百分比      |
|          | 模糊描述 (如"最大")   | 映射为100         |

# 输出模板
GPIO:AT+GPIO={pin},{state}
PWM:AT+PWM={pin},{duty}

# 规则
1. 占空比自动修正到0-100范围
2. 禁止同一引脚同时设置GPIO/PWM
3. GPIO状态只能是0或1
4. 当用户文本中包含‘占空比’三个字时,务必要使用PWM指令,不能使用GPIO指令
5. 必须使用标签包裹指令

# 典型示例
用户:帮我点亮48号引脚的LED
你:AT+GPIO=48,1

用户:打开48
你:AT+GPIO=48,1

用户:给48设置高电平
你:AT+GPIO=48,1

用户:给48设置低电瓶
你:AT+GPIO=48,0

用户:给48设置占空比20
你:AT+PWM=48,20

用户:48设置占空比100
你:AT+PWM=48,100

用户:设置PWM5为40%
你:AT+PWM=5,40

用户:把3号PWM调到七成
你:AT+PWM=3,70

接下来你就需要举一反三了
"""

class AiChat:
    def __init__(self, system_prompt: str):
        self.history = [{
            "role": "system",
            "content": f"{system_prompt}"
        }]

    def _extract_at_commands(self, text: str) -> list:
        pattern = r'(AT+w+=d+(?:,d+)?)'
        return re.findall(pattern, text)

    def chat(self, message: str, stream: bool = False) -> Generator[str, None, None]:
        self.history.append({"role": "user", "content": message})

        response = ollama.chat(
            model="qwen2.5:1.5b",  # deepseek-r1:1.5b qwen2.5:1.5b
            messages=self.history,
            stream=stream,
            options={
                "temperature": 0.1,
                "max_tokens": 1200,
                "top_p": 0.85
            }
        )

        full_response = ""
        if stream:
            for chunk in response:
                content = chunk["message"]["content"]
                full_response += content
                yield content
        else:
            full_response = response["message"]["content"]
            yield full_response

        self.history.append({"role": "assistant", "content": full_response})

def main():
    r = sr.Recognizer()
    GPIObot = AiChat(GPIO_prompt)

    while True:
        # 使用默认麦克风作为音频来源
        with sr.Microphone() as source:
            print("请说些什么吧...")
            audio = r.listen(source, phrase_time_limit=10)

            try:
                # 使用Google Web Speech API进行识别
                print("Google 语音识别认为你说的是:")
                text = r.recognize_google(audio, language='zh-CN')
                print(text)
            except sr.UnknownValueError:
                print("Google 语音识别无法理解音频")
                continue
            except sr.RequestError as e:
                print(f"无法从Google 语音识别服务请求结果; {e}")
                continue

        all_chunks = []
        for chunk in GPIObot.chat(text, stream=True):
            print(chunk, end="", flush=True)
            all_chunks.append(chunk)

        full_response = ''.join(all_chunks)
        commands = GPIObot._extract_at_commands(full_response)
        for cmd in commands:
            print(f"n[找到] {cmd}")
            if ser.is_open:
                print("n串口打开")
                try:
                    ser.write((cmd + 'rn').encode('utf-8'))  # 将字符串转换为字节
                except Exception as e:
                    print(f"串口写入失败: {e}")
                    print("猜测是Ai输出指令不匹配")
            ser.close()
            print("串口关闭")
        # 询问用户是否继续
        user_input = input("是否继续?(y/n): ").strip().lower()
        if user_input != 'y':
            break


if __name__ == "__main__":
    main()

3 ESP32S3NANO端代码

#include 

void parseATCommand(String command);

void setup()
{
	Serial1.begin(115200, SERIAL_8N1, 8, 7); // 初始化串口通信,波特率为 115200
	Serial.begin(115200, SERIAL_8N1, 38, 39); // 用于调试信息输出
}

void loop()
{
	if (Serial1.available())
	{
		String cmd = Serial1.readStringUntil('n');
		if (cmd.endsWith("r"))
			cmd.trim();
		parseATCommand(cmd);
	}
	delay(50);
}
/*
	## 解析AT指令并执行操作
  示例展示了使用了sscanf函数解析AT指令
  也展示了使用了String对象的indexOf和substring等方法来解析AT指令。
*/
void parseATCommand(String command)
{
	if (command.startsWith("AT+GPIO="))
	{
		int pin, state;
		if (sscanf(command.c_str()+8, "%d,%d", &pin, &state))
		{
			// 确保引脚号和状态是有效的
			if (pin >= 0 && (state == 0 || state == 1))
			{
				// 设置引脚模式为输出
				pinMode(pin, OUTPUT);
				// 根据状态设置引脚电平
				digitalWrite(pin, state);
				// 打印调试信息
				Serial.print("设置引脚 ");
				Serial.print(pin);
				Serial.print(" 为 ");
				Serial.println(state == 1 ? "高电平" : "低电平");
			}
			else
				Serial.println("无效的引脚号或状态");
		}
		else
			Serial.println("无法解析引脚号和状态");
	}
	else if (command.startsWith("AT+PWM="))
	{
		// 提取引脚号和占空比
		int equalIndex = command.indexOf('=');
		int commaIndex = command.indexOf(',');
		if (equalIndex != -1 && commaIndex != -1)
		{
			String pinStr = command.substring(equalIndex + 1, commaIndex);
			String dutyStr = command.substring(commaIndex + 1);

			int pin = pinStr.toInt();
			int duty = dutyStr.toInt();
			// 确保引脚号和占空比是有效的
			if (pin >= 0 && duty >= 0 && duty 

登录查看全部

参与评论

评论留言

还没有评论留言,赶紧来抢楼吧~~

手机查看

返回顶部

给这篇文章打个标签吧~

棒极了 糟糕透顶 好文章 PHP JAVA JS 小程序 Python SEO MySql 确认