Python 连接mysql脚本
单个连接配置
import pymysql.cursors
host = "ip"
port = "3306"
user = "user"
password = "pass"
try:
connection = pymysql.connect(
host=host,
port=int(port),
user=user,
password=password,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
print(f"Connection to {host}:{port} as {user} successful.")
connection.close()
except pymysql.Error as e:
print(f"Connection to {host}:{port} as {user} failed. Error: {e}")
python3 testsql.py
连接成功
多个mysql数据库测试连接
account.txt存有多个mysql账号
account.txt必须以json格式,每段都要以,结尾
除了最后一行
{
'url': '192.168.1.1',
'host': '192.168.1.1',
'port': '10306',
'user': 'root',
'password': '123'
}
{
'url': 'baidu.com',
'host': 'localhost',
'port': '10306',
'user': 'root',
'password': 'abc'
}
{
'url': 'ssssss.com',
'host': 'localhost',
'port': '3306',
'user': 'bbbb666',
'password': 'AAA'
}
这个里面有主要看host,吐过host是的值localhost的话,需要解析url的域名成ip,在传参给host
如果 con_db_host 是 “localhost“,
则通过解析 “url” 获取对应的 IP 地址,并将其赋值给 host。
sql.py
import pymysql
import socket
import json
from concurrent.futures import ThreadPoolExecutor
def resolve_hostname(hostname):
try:
addr_info = socket.getaddrinfo(hostname, None)
ipv4_addresses = [info[4][0] for info in addr_info if info[1] == socket.AF_INET]
if ipv4_addresses:
return ipv4_addresses[0]
except socket.error:
return None
def execute_connection(data):
host = data.get("con_db_host", "")
port = data.get("con_db_port", "")
user = data.get("con_db_id", "")
password = data.get("con_db_pass", "")
url = data.get("url", "")
# If host is localhost, resolve URL to IP
if host.lower() == "localhost":
if url:
ip_address = resolve_hostname(url)
if ip_address:
host = ip_address
if not host or not port or not user or not password:
return None
try:
connection = pymysql.connect(
host=host,
port=int(port),
user=user,
password=password,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
print(f"Connection to {host}:{port} as {user} successful.")
# Save to ookk.txt
with open("ookk.txt", "a") as ookk_file:
ookk_file.write(f"{{\n\t'url': '{url}',\n\t'host': '{host}',\n\t'port': '{port}',\n\t'user': '{user}',\n\t'password': '{password}'\n}}\n")
connection.close()
return True
except pymysql.Error as e:
print(f"Connection to {host}:{port} as {user} failed. Error: {e}")
# Save to error.txt
with open("error.txt", "a") as error_file:
error_file.write(f"{{\n\t'url': '{url}',\n\t'host': '{host}',\n\t'port': '{port}',\n\t'user': '{user}',\n\t'password': '{password}'\n}}\n")
return False
def process_data(data):
try:
if not data.startswith("{"):
data = "{" + data
if not data.endswith("}"):
data += "}"
data = json.loads(data)
success = execute_connection(data)
if not success:
return None
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
return None
def main():
with open("zzz.txt", "r") as file:
content = file.read().split("}\n{")
with ThreadPoolExecutor(max_workers=10) as executor:
executor.map(process_data, content)
if __name__ == "__main__":
main()
详解
-
读取 account.txt 文件:
with open("account.txt", "r") as file:
content = file.read().split("}\n{")
这段代码打开 account.txt
文件,使用 read
方法读取文件内容,然后使用 split("}\n{")
按照 }\n{
分割,以确保每个块都是一个完整的 JSON 对象
-
处理每个 JSON 块:
for block in content:
process_data(block)
这个循环遍历每个 JSON 块,并将其传递给 process_data
函数。
-
解析 JSON 块:
def process_data(data):
try:
if not data.startswith("{"):
data = "{" + data
if not data.endswith("}"):
data += "}"
data = json.loads(data)
success = execute_connection(data)
if not success:
return None
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
return None
process_data
函数尝试解析 JSON 数据。如果 JSON 数据块不以 {
开头,就在开头添加 {
,如果不以 }
结尾,就在末尾添加 }
。然后,使用 json.loads
解析 JSON 数据,将其转换为 Python 字典。接着,调用 execute_connection
函数,传递解析得到的数据字典。
-
执行数据库连接操作:
def execute_connection(data):
host = data.get("con_db_host", "")
port = data.get("con_db_port", "")
user = data.get("con_db_id", "")
password = data.get("con_db_pass", "")
url = data.get("url", "")
if host.lower() == "localhost":
if url:
ip_address = resolve_hostname(url)
if ip_address:
host = ip_address
if not host or not port or not user or not password:
return None
try:
connection = pymysql.connect(
host=host,
port=int(port),
user=user,
password=password,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
print(f"Connection to {host}:{port} as {user} successful.")
# Save to ookk.txt
with open("ookk.txt", "a") as ookk_file:
ookk_file.write(f"{{\n\t'url': '{url}',\n\t'host': '{host}',\n\t'port': '{port}',\n\t'user': '{user}',\n\t'password': '{password}'\n}}\n")
connection.close()
return True
except pymysql.Error as e:
print(f"Connection to {host}:{port} as {user} failed. Error: {e}")
# Save to error.txt
with open("error.txt", "a") as error_file:
error_file.write(f"{{\n\t'url': '{url}',\n\t'host': '{host}',\n\t'port': '{port}',\n\t'user': '{user}',\n\t'password': '{password}'\n}}\n")
return False
execute_connection
函数首先获取连接信息,如 host
, port
, user
, password
。如果 con_db_host
是 “localhost”,则通过解析 “url” 获取对应的 IP 地址,并将其赋值给 host
。接着,尝试使用 pymysql.connect
建立数据库连接。如果连接成功,输出连接信息并保存到 ookk.txt
文件。如果连接失败,输出错误信息并保存到 error.txt
文件。
这样,整个处理流程就是读取 xxx.txt
文件,解析每个 JSON 块,执行数据库连接操作,并根据连接结果保存到对应文件。
暂无评论内容