#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021-3-5 16:54
# @Author : FreezeJ
# @File : ftp_client.py
# @Software: PyCharm
# @Comment : FTP客户端脚本
import json
import os
import queue
import sys
import threading
from concurrent.futures import ThreadPoolExecutor
from ftplib import FTP as _FTP
# 重写ftp 修复同网段内 ip返回异常
class FTP(_FTP):
def connect(self, host='', port=0, timeout=-999, *args):
_FTP.connect(self, host, port, timeout)
self.host = host
return self.welcome
def makepasv(self):
host, port = _FTP.makepasv(self)
return self.host, port
def dir(self, *args): # 复写方法,添加返回值
cmd = 'LIST'
temp_list = []
if args[-1:] and type(args[-1]) != type(''):
args, func = args[:-1], args[-1]
for arg in args:
if arg:
cmd = cmd + (' ' + arg)
self.retrlines(cmd, temp_list.append)
return temp_list
class FtpOps(object):
""" FTP操作
ftp_info = {
"ip": 'xxxx', # 主机ip
"user": 'xxxx', # 用户名
"passwd": 'xxxx', # 密码
"base_path": 'xxxx', # ftp根目录,可选
}
my_ftp = FtpOps(**ftp_info) # 普通操作,带状态
my_ftp.print_usage() # 查看用法
"""
__slots__ = ['_ftp', 'ip', 'port', 'user', 'passwd', 'base_path']
usage = {
"mkdir": ("path", "递归创建目录"),
"chdir": ("path", "切换目录"),
"rmdir": ("path", "删除空目录"),
"rename": ("path, new_path", "重命名文件或目录"),
"dir": ("path", "列出路径文件信息"),
"nlst": ("path", "列出路径文件"),
"delete": ("path", "删除文件"),
"file_size": ("path", "查看文件大小"),
"upload": ("local_path, remote_path", "上传文件"),
"download": ("local_path, remote_path", "下载文件"),
}
@classmethod
def print_usage(cls):
for k, v in cls.usage.items():
print(f'my_ftp.{k}({v[0]}) # {v[1]}')
print(f'my_ftp.pwd # 当前路径')
def __init__(self, ip, user, passwd, port=21, base_path=None):
"""
Ftp操作类
:param ip: ip地址
:param port: 端口
:param user: 用户
:param base_path: 操作根目录
"""
self._ftp = None
self.ip = ip
self.port = port
self.user = user
self.passwd = passwd
self.base_path = base_path
if self.base_path:
code, result = self.chdir(self.base_path)
if code != 0:
raise Exception(f'无法切换到目录{self.base_path}')
@property
def ftp(self):
"""
单例ftp连接
:return:
"""
if not self._ftp:
try:
ftp = FTP()
ftp.connect(self.ip, self.port)
ftp.login(self.user, self.passwd)
ftp.encoding = 'utf-8'
self._ftp = ftp
except Exception as e:
print(e)
return None
return self._ftp
def mkdir(self, path):
"""
创建目录
:param path: 目录路径
:return: code: 状态码(0:成功 1:失败) result: 执行结果
"""
result = f"{path}创建失败!"
try:
dir_list = path.strip('/').split('/')
assert path, "目录路径为空"
for i in range(len(dir_list)):
parent_dir = dir_list[0: i + 1] # 父目录
if not parent_dir:
continue
dir_path = '/'.join(parent_dir)
if not self.is_exist(dir_path): # 目录不存在
result = self.ftp.mkd(dir_path)
else:
result = dir_path
code = 0
except Exception as e:
result = repr(e)
code = 1
return code, result
def chdir(self, path):
"""
切换目录
:param path: 目录路径
:return: code: 状态码(0:成功 1:失败) result: 执行结果
"""
try:
result = self.ftp.cwd(path)
code = 0
except Exception as e:
result = repr(e)
code = 1
return code, result
def rmdir(self, path):
"""
删除目录(空目录)
:param path: 目录路径
:return: code: 状态码(0:成功 1:失败) result: 执行结果
"""
try:
print(self.dir())
result = self.ftp.rmd(path)
code = 0
except Exception as e:
result = repr(e)
code = 1
return code, result
def rename(self, path, new_path):
"""
重命名文件或目录
:param path: 旧路径
:param new_path: 新路径
:return: code: 状态码(0:成功 1:失败) result: 执行结果
"""
try:
result = self.ftp.rename(path, new_path)
code = 0
except Exception as e:
result = repr(e)
code = 1
return code, result
@property
def pwd(self):
"""
当前路径
:return: 当前路径
"""
return self.ftp.pwd()
def dir(self, path=None):
"""
列出路径下的文件,默认列出当前路径
:param path: 目录路径
:return: code: 状态码(0:成功 1:失败) result: 执行结果
"""
try:
result = self.ftp.dir(path)
code = 0
except Exception as e:
result = repr(e)
code = 1
return code, result
def nlst(self, path=None):
"""
列出路径下的文件,默认列出当前路径
:param path: 目录路径
:return: code: 状态码(0:成功 1:失败) result: 执行结果
"""
try:
result = self.ftp.nlst(path)
code = 0
except Exception as e:
result = repr(e)
code = 1
return code, result
def is_exist(self, path):
"""
判断文件是否存在
:param path: 路径
:return:
"""
path_dir = os.path.dirname(path)
code, result = self.nlst(path_dir)
return True if path in result else False
def delete(self, path):
"""
删除文件
:param path: 文件路径
:return: code: 状态码(0:成功 1:失败) result: 执行结果
"""
try:
result = self.ftp.delete(path)
code = 0
print(f"{path} 删除成功!")
except Exception as e:
result = repr(e)
code = 1
print(f"{path} 删除失败!")
return code, result
def file_size(self, path):
"""
获取文件大小
:param path: 文件路径
:return: code: 状态码(0:成功 1:失败) result: 执行结果
"""
try:
result = self.ftp.size(path)
code = 0
except Exception as e:
result = repr(e)
code = 1
return code, result
def upload(self, local_path, remote_path):
"""
上传文件
:param local_path: 本地文件路径
:param remote_path: 远程文件路径
:return: code: 状态码(0:成功 1:失败) result: 执行结果
"""
try:
if not os.path.isfile(local_path):
raise FileNotFoundError(local_path)
local_size = os.path.getsize(local_path) # 本地文件大小
code, remote_size = self.file_size(remote_path) # 远端文件大小
if code == 0 and local_size == remote_size:
result = f"{remote_path}文件已存在!"
code = 0
else:
remote_path_dir = os.path.dirname(remote_path)
self.mkdir(remote_path_dir)
print('开始上传文件' + local_path)
fb = open(local_path, 'rb')
result = self.ftp.storbinary(f"stor {remote_path}", fb)
code = 0
# print(f"{local_path} 上传成功!")
except Exception as e:
result = repr(e)
code = 1
print(f"{local_path} 上传失败!")
return code, result
def download(self, local_path, remote_path):
"""
下载文件
:param local_path: 本地文件路径
:param remote_path: 远程文件路径
:return: code: 状态码(0:成功 1:失败) result: 执行结果
"""
try:
code, remote_size = self.file_size(remote_path) # 远端文件大小
if code == 0: # 远端文件存在
if os.path.isfile(local_path): # 本地已经存在文件
local_size = os.path.getsize(local_path) # 本地文件大小
print(f"local_size:{local_size}")
print(f"remote_size:{remote_size}")
if local_size == remote_size: # 文件已经存在,且文件大小匹配,不用重新下载
result = f"{remote_path}文件已存在!"
print(result)
else: # 文件已经存在,但文件大小不匹配,重新下载
result = self.ftp.retrbinary(f"retr {remote_path}",
open(local_path, 'wb').write)
code = 0
elif os.path.exists(local_path):
result = f"{local_path}已经存在但不是一个文件!"
code = 1
else: # 文件不存在
local_path_dir = os.path.dirname(local_path)
os.makedirs(local_path_dir, exist_ok=True) # 创建本地目录
result = self.ftp.retrbinary(f"retr {remote_path}",
open(local_path, 'wb').write)
code = 0
else:
result = f"{remote_path}文件不存在!"
code = 1
except Exception as e:
result = repr(e)
code = 1
if code == 0:
print(f"{local_path} 下载成功!")
else:
print(f"{local_path} 下载失败!")
return code, result
class FtpThreadOps(object):
""" 多线程FTP操作
ftp_info = {
"ip": 'xxxx', # 主机ip
"user": 'xxxx', # 用户名
"passwd": 'xxxx', # 密码
"port": xx, # 端口
"base_path": 'xxxx', # ftp根目录,可选
}
my_thread_ftp = FtpOps(**ftp_info) # 普通操作,带状态
my_thread_ftp.print_usage() # 查看用法
"""
usage = {
"rmdir": ("path", "删除空目录列表"),
"rename": ("path, new_path", "重命名文件或目录列表"),
"delete": ("path", "删除文件列表"),
"upload": ("local_path, remote_path", "上传文件列表"),
"download": ("local_path, remote_path", "下载文件列表"),
}
@classmethod
def print_usage(cls):
for k, v in cls.usage.items():
args = f'[{v[0]}]' if len(v[0].split(',')) > 1 else f'{v[0]}'
print(f'my_thread_ftp.thread_ops(ops_name="{k}", args_list=[{args},], max_workers=5) # {v[1]}')
def __init__(self, ip, user, passwd, port=21, base_path=None):
self.ip = ip
self.port = port
self.user = user
self.passwd = passwd
self.base_path = base_path
self.ftp_pool_num = 0 # 当前连接数量
self._ftp_pool = queue.Queue(maxsize=100)
self.lock = threading.Lock() # 线程锁
@property
def ftp_pool(self):
"""
ftp实例池
:return:
"""
if self._ftp_pool.qsize() == 0: # 动态增加实例池数量,由线程并发数来控制
try:
self._ftp_pool.put(FtpOps(
ip=self.ip, # 主机ip
user=self.user, # 用户名
passwd=self.passwd, # 密码
base_path=self.base_path, # ftp根目录
port=self.port # 端口
))
self.ftp_pool_num += 1
except Exception as e:
print(f"创建FTP实例失败! {e}")
return None
return self._ftp_pool
def ops(self, ops_name, *args):
"""
从实例池获取实例,根据attr执行FTP实例操作,执行结束返回实例池(不能确保目录状态,每次拿到的实例可能不同)
:param ops_name: ftp操作名称
:param args: 参数
:return:
"""
self.lock.acquire() # 加锁
ftp = self.ftp_pool.get() # 获取连接实例
self.lock.release() # 释放
ops_func = getattr(ftp, ops_name, None)
if not ops_func:
raise Exception(f'不支持的操作{ops_func}')
code, result = getattr(ftp, ops_name)(*args)
self.lock.acquire() # 加锁
self.ftp_pool.put(ftp) # 回收连接实例
self.lock.release() # 释放
return code, result
def thread_ops(self, ops_name, args_list, max_workers=5):
"""
FTP多线程操作(FTP协议只支持一个连接通道同时做一个操作,所以开启多线程会创建多个FTP实例复用操作)
:param ops_name: ftp操作名称
:param args_list: ftp操作参数
:param max_workers: 线程数(也会影响FTP实例个数)
:return:
"""
with ThreadPoolExecutor(max_workers=max_workers) as t:
thread_list = []
if ops_name not in self.usage.keys():
raise Exception(f"不支持的操作:{ops_name}, 只支持{self.usage.keys()}")
for args in args_list: # 去重
task = t.submit(self.ops, ops_name, *args)
thread_list.append({
"task": task, # 任务对象
"args": args, # 任务参数
})
t.shutdown(wait=True) # 等待所有线程结束
success_count = 0
fail_count = 0
fail_args = []
fail_args_result = []
for thread in thread_list:
code, result = thread["task"].result()
if code == 0:
success_count += 1
# sys.stdout.flush()
# print(f"{ops_name}成功! args: {thread['args']}")
else:
sys.stdout.flush()
print(f"{ops_name}失败!\nargs: {thread['args']}\nresult: {result}")
fail_count += 1
fail_args.append(thread["args"])
fail_args_result.append(f'args: {thread["args"]}\nresult: {thread["task"].result()}')
print(f"{ops_name}成功:{success_count} 失败:{fail_count}")
if fail_count:
print(f"{ops_name}失败参数:")
print(json.dumps(fail_args))
if fail_count > 0:
code = 1
else:
code = 0
return code, {
"success_count": success_count,
"fail_count": fail_count,
"fail_path": fail_args,
"fail_path_result": fail_args_result,
}
if __name__ == '__main__':
ftp_info = {
"ip": '', # 主机ip
"user": '', # 用户名
"passwd": '', # 密码
"base_path": '', # ftp根目录
"port": 21, # 端口
}
# 实例化
my_ftp = FtpOps(**ftp_info) # 普通操作,带状态
my_ftp.print_usage()
my_thread_ftp = FtpThreadOps(**ftp_info) # 多线程操作,不带状态
my_thread_ftp.print_usage()
赏
使用支付宝打赏
使用微信打赏
若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏