Python 连接mysql脚本

Python 连接mysql脚本

单个连接配置

import pymysql.cursors

host = "ip"
port = "3306"
user = "user"
password = "pass"

try:
    connection = pymysql.connect(
        host=host,
        port=int(port),
        user=user,
        password=password,
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    print(f"Connection to {host}:{port} as {user} successful.")
    connection.close()
except pymysql.Error as e:
    print(f"Connection to {host}:{port} as {user} failed. Error: {e}")

python3 testsql.py

图片[1]-Python 连接mysql脚本-tess-wiki连接成功

 

多个mysql数据库测试连接

account.txt存有多个mysql账号

account.txt必须以json格式,每段都要以,结尾

除了最后一行

{
	'url': '192.168.1.1',
	'host': '192.168.1.1',
	'port': '10306',
	'user': 'root',
	'password': '123'
}
{
	'url': 'baidu.com',
	'host': 'localhost',
	'port': '10306',
	'user': 'root',
	'password': 'abc'
}
{
	'url': 'ssssss.com',
	'host': 'localhost',
	'port': '3306',
	'user': 'bbbb666',
	'password': 'AAA'
}

这个里面有主要看host,吐过host是的值localhost的话,需要解析url的域名成ip,在传参给host

如果 con_db_host 是 “localhost“,
则通过解析 “url” 获取对应的 IP 地址,并将其赋值给 host。

sql.py

import pymysql
import socket
import json
from concurrent.futures import ThreadPoolExecutor

def resolve_hostname(hostname):
    try:
        addr_info = socket.getaddrinfo(hostname, None)
        ipv4_addresses = [info[4][0] for info in addr_info if info[1] == socket.AF_INET]
        if ipv4_addresses:
            return ipv4_addresses[0]
    except socket.error:
        return None

def execute_connection(data):
    host = data.get("con_db_host", "")
    port = data.get("con_db_port", "")
    user = data.get("con_db_id", "")
    password = data.get("con_db_pass", "")
    url = data.get("url", "")

    # If host is localhost, resolve URL to IP
    if host.lower() == "localhost":
        if url:
            ip_address = resolve_hostname(url)
            if ip_address:
                host = ip_address

    if not host or not port or not user or not password:
        return None

    try:
        connection = pymysql.connect(
            host=host,
            port=int(port),
            user=user,
            password=password,
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
        print(f"Connection to {host}:{port} as {user} successful.")

        # Save to ookk.txt
        with open("ookk.txt", "a") as ookk_file:
            ookk_file.write(f"{{\n\t'url': '{url}',\n\t'host': '{host}',\n\t'port': '{port}',\n\t'user': '{user}',\n\t'password': '{password}'\n}}\n")

        connection.close()
        return True
    except pymysql.Error as e:
        print(f"Connection to {host}:{port} as {user} failed. Error: {e}")

        # Save to error.txt
        with open("error.txt", "a") as error_file:
            error_file.write(f"{{\n\t'url': '{url}',\n\t'host': '{host}',\n\t'port': '{port}',\n\t'user': '{user}',\n\t'password': '{password}'\n}}\n")

        return False

def process_data(data):
    try:
        if not data.startswith("{"):
            data = "{" + data
        if not data.endswith("}"):
            data += "}"
        data = json.loads(data)
        success = execute_connection(data)
        if not success:
            return None
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        return None

def main():
    with open("zzz.txt", "r") as file:
        content = file.read().split("}\n{")

    with ThreadPoolExecutor(max_workers=10) as executor:
        executor.map(process_data, content)

if __name__ == "__main__":
    main()

 

详解

  • 读取 account.txt 文件:

with open("account.txt", "r") as file:
    content = file.read().split("}\n{")

这段代码打开 account.txt 文件,使用 read 方法读取文件内容,然后使用 split("}\n{") 按照 }\n{ 分割,以确保每个块都是一个完整的 JSON 对象

  • 处理每个 JSON 块:

for block in content:
    process_data(block)

这个循环遍历每个 JSON 块,并将其传递给 process_data 函数。

  • 解析 JSON 块:

def process_data(data):
    try:
        if not data.startswith("{"):
            data = "{" + data
        if not data.endswith("}"):
            data += "}"
        data = json.loads(data)
        success = execute_connection(data)
        if not success:
            return None
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        return None

process_data 函数尝试解析 JSON 数据。如果 JSON 数据块不以 { 开头,就在开头添加 {,如果不以 } 结尾,就在末尾添加 }。然后,使用 json.loads 解析 JSON 数据,将其转换为 Python 字典。接着,调用 execute_connection 函数,传递解析得到的数据字典。

  • 执行数据库连接操作:

def execute_connection(data):
    host = data.get("con_db_host", "")
    port = data.get("con_db_port", "")
    user = data.get("con_db_id", "")
    password = data.get("con_db_pass", "")
    url = data.get("url", "")

    if host.lower() == "localhost":
        if url:
            ip_address = resolve_hostname(url)
            if ip_address:
                host = ip_address

    if not host or not port or not user or not password:
        return None

    try:
        connection = pymysql.connect(
            host=host,
            port=int(port),
            user=user,
            password=password,
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
        print(f"Connection to {host}:{port} as {user} successful.")

        # Save to ookk.txt
        with open("ookk.txt", "a") as ookk_file:
            ookk_file.write(f"{{\n\t'url': '{url}',\n\t'host': '{host}',\n\t'port': '{port}',\n\t'user': '{user}',\n\t'password': '{password}'\n}}\n")

        connection.close()
        return True
    except pymysql.Error as e:
        print(f"Connection to {host}:{port} as {user} failed. Error: {e}")

        # Save to error.txt
        with open("error.txt", "a") as error_file:
            error_file.write(f"{{\n\t'url': '{url}',\n\t'host': '{host}',\n\t'port': '{port}',\n\t'user': '{user}',\n\t'password': '{password}'\n}}\n")

        return False

execute_connection 函数首先获取连接信息,如 host, port, user, password。如果 con_db_host 是 “localhost”,则通过解析 “url” 获取对应的 IP 地址,并将其赋值给 host。接着,尝试使用 pymysql.connect 建立数据库连接。如果连接成功,输出连接信息并保存到 ookk.txt 文件。如果连接失败,输出错误信息并保存到 error.txt 文件。

这样,整个处理流程就是读取 xxx.txt 文件,解析每个 JSON 块,执行数据库连接操作,并根据连接结果保存到对应文件。

 

© 版权声明
THE END
喜欢就支持一下吧
点赞8 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称

    暂无评论内容