XiaoYuanKouSuan
这是一个小猿口算的炸鱼脚本项目 涉及的知识包括但不限于基础编程知识、设备调试、抓包、Js等高级涨姿势
这里只简单分析Python代码实现流程和基本原理 目前本脚本已失效 只是简单的分析学习
相关链接在本文最后
首先从检查adb开始
用到了[[subprocess]]库
try:
result = subprocess.run(["adb", "devices"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
print(f"ADB 检查失败: {result.stderr}")
sys.exit(1)
except FileNotFoundError:
print("ADB 未找到,请先安装 ADB 工具。")
sys.exit(1)
用subprocess.run
来运行adb命令判断是否准备好了adb配置和设备
然后是解析命令行参数
# 解析命令行参数
parser = argparse.ArgumentParser(description="Mitmproxy script")
parser.add_argument("-P", "--port", type=int, default=8080, help="Port to listen on")
parser.add_argument("-H", "--host", type=str, default="0.0.0.0", help="Host to listen on")
parser.add_argument("-AI", "--adb-ip", type=str, help="IP and port for ADB wireless connection (e.g., 192.168.0.101:5555)")
args = parser.parse_args()
# 如果指定了 ADB IP,进行无线调试连接
if args.adb_ip:
connect_adb_wireless(args.adb_ip)
这里用了 [[argparse]] 模块
先创建了一个 ArgumentParser
对象,用于处理命令行参数,然后进行配置定义,在调用对象的parse_args()
方法来解析命令行参数。
解析参数后的无线调试可以忽略,自己连接更方便
然后是运行mitdump
# 运行mitmdump
sys.argv = ["mitmdump", "-s", __file__, "--listen-host", args.host, "--listen-port", str(args.port)]
mitmdump()
这里的sys.argv
是Python 中用来存储命令行参数的列表,通常,sys.argv[0]
是脚本的名字,后续的元素是传递给脚本的命令行参数。
这行代码模拟了命令行调用 mitmdump
时传递的一组参数,其中包括要运行的脚本文件和 mitmproxy 监听的主机和端口信息
这里两次解析命令行参数然后执行mitmdump()
mitmdump
是[[mitmproxy]]提供的命令行工具,用于以简化模式运行
mitmdump
可以实现拦截 HTTP/HTTPS 请求和响应进行分析、修改 HTTP 响应的内容(如修改网页返回的 JavaScript 代码、HTTP headers 等)、自动化处理复杂的网络交互,如模拟点击、表单提交等。
要对请求和响应进行重写就需要使用Python中的重写去重写mitmdump的钩子函数,如下图所示。
def request(flow: http.HTTPFlow) -> None:
pass
def response(flow: http.HTTPFlow) -> None:
global is_dialog_shown
url = flow.request.url
print(f"Response: {flow.response.status_code} {url}")
if is_target_url(url):
handle_target_response(flow, url)
elif "https://xyks.yuanfudao.com/leo-game-pk/android/math/pk/match/v2?" in url:
if not is_dialog_shown:
is_dialog_shown = True
threading.Thread(target=gui_answer).start()
response
中拦截了HTTP响应,然后声明了全局变量is_dialog_shown,然后判断这个HTTP响应的链接是否是目标链接,如果是的话就开始进行处理。
def handle_target_response(flow, url):
print(f"匹配到指定的 URL: {url}")
responsetext = flow.response.text
funname = extract_function_name(responsetext)
if funname:
update_response_text(flow, responsetext, funname)
else:
print("未找到匹配的函数名,无法进行替换。")
这里先调用了extract_function_name
函数来提取一个函数名
def extract_function_name(responsetext):
match = re.search(r"(?<=isRight:)[^,]*?\(.*?\).*?(?=\|)", responsetext)
return match.group() if match else None
这里的正则表达式r"(?<=isRight:)[^,]*?\(.*?\).*?(?=\|)"
较为复杂,(?<=isRight:)
是一个 正向先行断言,意思是匹配的字符串必须以 isRight:
开头,但不包括 isRight:
本身。[^,]*?
匹配任意不是逗号的字符 [^,]
,并使用 *?
(非贪婪模式)匹配尽量少的字符。\(.+?\)
匹配函数调用中的参数部分。\(.*?\)
表示匹配括号以及其中的任意字符,尽量少匹配(非贪婪模式)。match.group()
如果找到匹配项,group()
方法返回匹配的字符串。
这里就涉及稍微复杂的高级[[正则表达式]]知识
如果匹配到了函数就对该函数进行内容替换
def update_response_text(flow, responsetext, funname):
print(f"找到函数名: {funname}")
updated_text = responsetext.replace(funname, f"{funname}||true")
flow.response.text = updated_text
# 保存js到exercise.js
with open("exercise.js", "w", encoding="utf-8") as f:
f.write(updated_text)
print(f"替换后的响应: {updated_text}")
threading.Thread(target=show_message_box, args=("过滤成功", f"函数 {funname} 替换成功!")).start()
替换为强制正确后,就保存js内容到exercise.js
替换成功后利用一个非常简单的可视化弹窗来显示替换信息
def show_message_box(title, message):
root = tk.Tk()
root.withdraw()
messagebox.showinfo(title, message)
root.destroy()
现在再回到response函数中的第二个分支
def response(flow: http.HTTPFlow) -> None:
global is_dialog_shown
url = flow.request.url
print(f"Response: {flow.response.status_code} {url}")
if is_target_url(url):
handle_target_response(flow, url)
elif "https://xyks.yuanfudao.com/leo-game-pk/android/math/pk/match/v2?" in url:
if not is_dialog_shown:
is_dialog_shown = True
threading.Thread(target=gui_answer).start()
这里用了一个全局变量来保证同一时间只有一个线程在答题(一个匹配的响应在处理),这里开了一个线程应该也是因为防止阻塞其他响应处理,这里如果重复收到了响应会放弃其他响应的答题处理。
接下来就进入答题函数
def gui_answer():
# 预先准备 ADB 命令
prepared_commands = number_command.prepare_tap_commands(".", ANSWER_COUNT)
root = tk.Tk()
root.title("继续执行")
button = tk.Button(root, text="点击继续", command=lambda: answer_write(prepared_commands))
button.pack(pady=20)
# 设置定时器自动执行
threading.Timer(WAITING_TIME, auto_click_and_close, args=(root, prepared_commands)).start()
root.mainloop()
这里先预先对要输入的内容进行处理,也就是prepare_tap_commands
函数,效果是将一个配置转换为一系列的shell命令
def prepare_tap_commands(command_str: str, times: int) -> List[str]:
xy_paths = str_to_xy(command_str, *map(lambda x: get_device_resolution()[x] / BASE_RESOLUTION[x], (0, 1)))
adb_commands = []
if xy_paths:
if isinstance(xy_paths[0], tuple):
x, y = xy_paths[0]
adb_commands.extend([f"input tap {x} {y}" for _ in range(times)])
else:
for path in xy_paths:
x, y = path[0] # 假设每个路径只有一个点
adb_commands.extend([f"input tap {x} {y}" for _ in range(times)])
return adb_commands
这里稍微复杂一点的只有*map(lambda x: get_device_resolution()[x] / BASE_RESOLUTION[x], (0, 1))
这里调用了一个函数来获取设备的分辨率
@lru_cache()
def get_device_resolution():
# 获取设备的物理分辨率,并缓存
result = subprocess.run(["adb", "shell", "wm", "size"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
output = result.stdout
if "Physical size" in output:
resolution_str = output.split(":")[-1].strip()
width, height = map(int, resolution_str.split("x"))
return width, height
else:
raise Exception("无法获取设备分辨率")
@lru_cache()
是 Python 中 functools
模块提供的一个装饰器,用于对函数的返回结果进行缓存,从而避免多次调用相同参数的函数时重复执行相同的计算或操作。
@lru_cache()
会将函数调用的输入参数和对应的输出结果缓存起来。如果函数再次被调用并传入相同的参数值时,Python 不会重新执行该函数,而是直接返回缓存的结果。这减少了函数的计算时间,特别适合一些耗时的操作(如网络请求、I/O 操作等)
lru_cache
默认会缓存最多 128 个不同参数的函数调用结果。如果超过 128 个,最久未使用的缓存项将被丢弃
然后str_to_xy
等一系列函数都是数学转换,就不涉及整个流程分析,暂时不分析了。
threading.Timer(WAITING_TIME, auto_click_and_close, args=(root, prepared_commands)).start()
然后这里再创建一个定时器,倒计时结束之后一次性全部点击完
def auto_click_and_close(root, prepared_commands):
answer_write(prepared_commands)
global is_dialog_shown
is_dialog_shown = False
root.destroy()
def answer_write(prepared_commands):
start_time = time.time()
# 一次性发送准备好的 ADB 命令
number_command.run_adb_command(prepared_commands)
end_time = time.time()
print(f"点击操作耗时: {end_time - start_time:.3f}秒")
然后就结束该线程,并修改全局变量,等待下一次响应拦截
以上内容是对主分支的代码流程分析,主分支主要是采取拦截并修改响应,然后强制答案正确实现的自动答题。在dev分支中则是采取的抓包再解析答案再进行模拟滑动,大致是一样的。
def response(flow: http.HTTPFlow) -> None:
# 处理响应
print(f"Response: {flow.response.status_code} {flow.request.url}")
# 如果url中包含指定的关键字,则打印响应信息
if "https://xyks.yuanfudao.com/leo-math/android/exams?" in flow.request.url:
# 将响应信息转换为json格式
answer = json.loads(flow.response.text)
# 格式化输出
print(json.dumps(answer, indent=4))
# 保存到文件
# with open("answer.json", "w") as f:
# f.write(json.dumps(answer, indent=4))
select_answer(answer,"练习")
elif "https://xyks.yuanfudao.com/leo-game-pk/android/math/pk/match?" in flow.request.url:
# 将响应信息转换为json格式
answer = json.loads(flow.response.text)
# 格式化输出
print(json.dumps(answer, indent=4))
# 保存到文件
# with open("answer.json", "w") as f:
# f.write(json.dumps(answer, indent=4))
select_answer(answer,"pk")
def select_answer(answer, type):
# 关闭notepad
# os.system("taskkill /f /im notepad.exe")
# 并保存到txt文件
f = open("answer.txt", "w")
select_answer = []
if type == "练习":
for question in answer["questions"]:
answers=question["answers"]
for i in range(len(answers)):
if "." in answers[i]:
correct_answer = answers[i]
break
if i == len(answers)-1:
correct_answer = answers[0]
select_answer.append(correct_answer)
print(correct_answer, end=" ")
f.write(str(correct_answer) + " ")
elif type == "pk":
for question in answer["examVO"]["questions"]:
answers=question["answers"]
for i in range(len(answers)):
if "." in answers[i]:
correct_answer = answers[i]
break
if i == len(answers)-1:
correct_answer = answers[0]
select_answer.append(correct_answer)
print(correct_answer, end=" ")
f.write(str(correct_answer) + " ")
# 关闭文件
f.close()
q_num = len(select_answer)
threading.Thread(target=gui_answer, args=(select_answer,q_num,)).start()
# 用记事本打开文件
# os.system("notepad answer.txt")
# threading.Thread(target=os.system, args=("notepad answer.txt",)).start()
该项目其他的地方就很杂了,差不多都是解析数据和模拟数据了。
还有更高阶的技能就是HooK等技术了
Github https://github.com/cr4n5/XiaoYuanKouSuan
项目源代码备份