Post

python-telegram-bot定义指令并异步执行shell不卡顿交互

前言

在上一篇文章里做了同步执行脚本,但是在直接脚本期间,其他命令不会再接受处理,体验很不好,特研究做异步处理 所使用的技术主要是asyncio

小飞机接收指令并执行shell脚本ios_bot_async.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
#!/usr/bin/env python

"""
Don't forget to enable inline mode with @BotFather

1、创建机器人
2、设置开启机器人的inline mode
3、将机器人拉入到群里,并且设置为管理员

Usage:
停止执行代码:  Ctrl-C
"""

import logging
from uuid import uuid4
import asyncio

from telegram import InlineQueryResultArticle, InputTextMessageContent, Update
from telegram.ext import Application, CommandHandler, ContextTypes, InlineQueryHandler, MessageHandler, filters

# 开启日志
logging.basicConfig(
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
# set higher logging level for httpx to avoid all GET and POST requests being logged
logging.getLogger("httpx").setLevel(logging.WARNING)

logger = logging.getLogger(__name__)

# 定义支持的App列表
suportAppList = [
    ("aa", "scheme1", "xx.xx.x1", "https://xxx/xxx/xx1.png"),
    ("bb", "scheme2", "xx.xx.x1", "https://xxx/xxx/xx2.png"),
    ("cc", "scheme3", "xx.xx.x1", "https://xxx/xxx/xx3.png"),
    ]

# 定义响应/start的指令
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
   
    message = update.message
    chatId = message.chat.id
   
    text = message.text
    # 如果输入的是模版,不处理
    if "/start scheme bundleId" in text:
        await update.message.reply_text("输入的是demo,需要真实的内容")
        return

    # 去掉多余的空格,方便后续使用
    while "  " in text:
        text = text.replace("  ", "")
    print(f"text2: {text}")

    # 查找/start命令的位置
    fromIdx = text.find('/start')
    if fromIdx >= 0:
        # 截取字符串,只去命令之外的内容
        realText = text[fromIdx+7:len(text)]
        print(f"realText: {realText}")
        findTxtList = realText.split(" ")
        print(f"paramsCount: {len(findTxtList)}")

        # 判断参数个数是否正确
        if len(findTxtList) != 2:
            await update.message.reply_text("输入的内容不对,必须是/start scheme bundleId 的格式")
            return
        scheme = findTxtList[0]
        bundleId = findTxtList[1]
        
        # 判断scheme是否正确
        fullSchemeList = []
        demoStr = ""
        # 查找支持的所有scheme
        for x in suportAppList:
            fullSchemeList.append(x[1])
            demoStr = f"{demoStr}\n{x[1]}"

        # 比较scheme是否正确
        if scheme not in fullSchemeList:
            await update.message.reply_text(f"输入的scheme内容: {scheme}不对,必须是如下的一个:\n{demoStr}")
            return
        
        # 找到App的名称
        appName = ""
        for x in suportAppList:
            if scheme in x[1]:
                appName = x[0]

        print(f"scheme: {scheme}")
        print(f"bundleId: {bundleId}")
        print(f"chatId: {chatId}")

        # 提示参数正确,开始准备执行
        await update.message.reply_text(f"Hi! 开始打包,大概耗时15分钟\nApp名称:{appName}\nscheme: {scheme} \nbundleId: {bundleId} \n默认打包分支:develop")
        commandStr = f"sh /xxx/xxx/xxx.sh {scheme} {bundleId} {chatId}"
        # 异步执行脚本,不阻塞流程
        asyncio.create_task(run_command_realtime(commandStr, update))

# 异步执行命令,并实时获取输出
async def run_command_realtime(command: str, update: Update):
    process = await asyncio.create_subprocess_shell(
        command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # 创建两个异步任务,用于分别读取 stdout 和 stderr 的输出
    await asyncio.gather(
        stream_output(process.stdout, update, "输出"),
        stream_output(process.stderr, update, "错误"),
    )

    # 等待进程结束,并获取退出状态码
    # returncode = await process.wait()
    # if returncode == 0:
    #     await update.message.reply_text("命令执行完成 ✅")
    # else:
    #     await update.message.reply_text(f"命令执行失败 ❌,退出码: {returncode}")

# 实时读取流(stdout/stderr)并逐行发送到 Telegram
async def stream_output(stream, update: Update, stream_name: str):
   while True:
        chunk = await stream.read(1024)  # 每次读取 1024 字节
        if not chunk:  # 如果没有更多数据,跳出循环
            break
        # 解码时忽略解码错误
        text = chunk.decode('utf-8', errors='ignore').strip()
        if text:
            print(f"[{stream_name}] `{text}`")

# 响应一遍会话
async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Echo the user message."""
    message = update.message
    chat = message.chat
    # if chat.id > 0:
    #     print("群")
    # elif chat.id < 0:
    #     print("单聊")
   
    text = message.text
    print(chat.id)
    print(text)

# 响应help指令
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Send a message when the command /help is issued."""
    await update.message.reply_text("Help! \n方式一:/start scheme名 bundleId名,如\n/start qSportProduce com.p.xncsp\n\n方式二:在群里输入@xxxx_bot 选择打包对象")

# 响应inlinemode指令, @机器人的名字时出来
async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Handle the inline query. This is run when you type: @botusername <query>"""

    print("inline_query")
    query = update.inline_query.query

    findList = []
    print(f"query: {query}")
    if not query:
        # 没有搜索内容,就展示全部内容
        findList = suportAppList
    else:
        # 搜索出名称、scheme、bundleId任何一个包含的结果
        str1 = query.lower()
        for x in suportAppList:
            name = x[0]
            scheme = x[1]
            bundleId = x[2]
            
            # print(cmdStr)
            if (str1 in name) or (str1 in scheme) or (str1 in bundleId):
                findList.append(x)

    # 没有搜索出内容,则使用全部内容
    if len(findList) == 0:
        findList = suportAppList
    
    # 将搜索出来的内容转换为InlineQueryResultArticle
    results = []
    for x in findList:
        name = x[0]
        scheme = x[1]
        bundleId = x[2]
        url = x[3]
        cmdStr = f"/start {scheme} {bundleId}"
        fullTitle=f"{name} {scheme} {bundleId}"

        print(f"url: {url}")
        print(f"fullTitle: {fullTitle}")
        results.append(
            InlineQueryResultArticle(
                id=str(uuid4()),
                thumbnail_url=url,
                title=fullTitle,
                input_message_content=InputTextMessageContent(cmdStr),
            )
        )
    
    await update.inline_query.answer(results)

# 入口主函数
def main() -> None:
    """Run the bot."""
    # Create the Application and pass it your bot's token.
    current_file_path = __file__
    tokenStr = "xxxxxxxxxx"
    
    application = Application.builder().token(tokenStr).build()

    # on different commands - answer in Telegram
    application.add_handler(CommandHandler("start", start))
    application.add_handler(CommandHandler("help", help_command))

    # on inline queries - show corresponding inline results
    application.add_handler(InlineQueryHandler(inline_query))
    # application.add_handler(ChosenInlineResultHandler(chosen_result))

     # on non command i.e message - echo the message on Telegram
    application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))

    # Run the bot until the user presses Ctrl-C
    application.run_polling(allowed_updates=Update.ALL_TYPES)


if __name__ == "__main__":
    main()

# python3 /xxx/xxx/ios_bot_async.py
This post is licensed under CC BY 4.0 by the author.