|
各位论坛的大佬、同行们,大家好!
一直在用简道云相关工具做数据处理、流程管理,平时也会搭配简道云做表单收集、文件存储,但长期以来有个痛点一直困扰着我——本地文件(Excel、PDF、文档等)需要手动上传到简道云,每次批量处理、文件更新后还要重新手动上传,不仅繁琐,还容易遗漏、重复上传,浪费不少时间。琢磨了一段时间,不仅搞定了本地文件直接上传简道云的操作方法,还写了一段可直接运行的代码,实现本地文件实时同步上传——只要文件夹里有文件新增、修改,就能自动同步到简道云,还能防止同一文件重复上传,全程不用手动操作、不用复杂配置!今天特意录制了完整实操视频,同时附上全部代码,分享给大家,也想找找有没有和我有相同需求的同行,一起交流优化、避坑!
核心解决的需求(看看你是不是也有同样困扰?)
- 实时同步上传:监听指定本地文件夹,文件新增、修改(覆盖)时自动触发上传,无需手动操作,彻底解放双手;
- 防重复上传:通过文件指纹(大小+修改时间)判断,避免同一版本文件重复上传,减少冗余、节省流量和时间;
- 文件稳定校验:等待文件写入完成(大小稳定)后再上传,避免因文件未保存完整导致的上传失败;
- 新手友好:代码可直接运行,搭配简单配置(填写简道云appId、密钥等),跟着视频一步就能完成部署;适配多场景:支持办公文档、表格、图片等多种格式,适配需要频繁将本地文件同步到简道云的日常办公、数据同步场景(比如表单附件补充、数据批量导入配套文件等);
- 完整可复用:代码开源可修改,可根据自身需求调整监听文件夹、允许上传的文件格式等,灵活性拉满。
万能的python 代码如下:
# -*- coding: utf-8 -*-
"""
支持:
- 新建文件上传
- 修改/覆盖文件重新上传
- 防止同一写入事件重复上传
"""
import os
import json
import time
import logging
from pathlib import Path
import urllib3
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from jdy import APIUtils
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
CONFIG_FILE = "config.json"
def load_config():
if not os.path.exists(CONFIG_FILE):
raise FileNotFoundError(f"配置文件 {CONFIG_FILE} 不存在,请重新安装程序。")
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
class FileHandler(FileSystemEventHandler):
def __init__(self, config):
self.config = config
# 缓存已处理的文件指纹: {filepath: (size, mtime)}
self.processed_fingerprints = {}
def _get_file_fingerprint(self, filepath):
"""获取文件指纹:(size, mtime)"""
try:
stat = os.stat(filepath)
return (stat.st_size, stat.st_mtime)
except (OSError, FileNotFoundError):
return None
def _is_already_processed(self, filepath):
"""判断该版本的文件是否已处理过"""
fingerprint = self._get_file_fingerprint(filepath)
if fingerprint is None:
return True # 文件不存在,跳过
return self.processed_fingerprints.get(filepath) == fingerprint
def _mark_as_processed(self, filepath):
"""记录当前文件版本已处理"""
fingerprint = self._get_file_fingerprint(filepath)
if fingerprint:
self.processed_fingerprints[filepath] = fingerprint
def _wait_for_file_stable(self, filepath, timeout=5):
"""等待文件写入完成(大小稳定)"""
for _ in range(timeout * 2):
try:
size1 = os.path.getsize(filepath)
time.sleep(0.5)
size2 = os.path.getsize(filepath)
if size1 == size2 and size1 > 0:
return True
except (OSError, FileNotFoundError):
return False
return False
def on_created(self, event):
self._process_file(event)
def on_modified(self, event):
self._process_file(event)
def _process_file(self, event):
if event.is_directory:
return
filepath = event.src_path
ext = Path(filepath).suffix.lower()
allowed_exts = [e.lower() for e in self.config["allowed_extensions"]]
if ext not in allowed_exts:
return
# 等待文件写入完成
if not self._wait_for_file_stable(filepath):
logging.warning(f"文件不稳定或为空,跳过: {filepath}")
return
# 如果这个版本已经处理过,跳过
if self._is_already_processed(filepath):
logging.debug(f"跳过已处理版本: {filepath}")
return
logging.info(f"检测到文件变更: {filepath}")
self.upload_file(filepath)
self._mark_as_processed(filepath) # 记录新版本
def upload_file(self, filepath):
cfg = self.config["jiandaoyun"]
appId = cfg['appId']
entryId = cfg['entryId']
api_key = cfg['key']
jdy = APIUtils(api_key)
try:
filename = os.path.basename(filepath)
with open(filepath, 'rb') as f:
files = {'file': (filename, f, 'application/octet-stream')}
logging.info(f"正在上传文件: {filepath}")
key, transaction_id = jdy.upload_file(appId, entryId, files)
if key:
filename_base = os.path.splitext(os.path.basename(filepath))[0]
json_data = {"file_file": {"value": [key]},"code":{"value":filename_base}}
logging.info(f"上传成功: {filepath} | key={key}")
#查看简道云是否存在数据
filter_data = {
"rel": "and",
"cond": [
{"field": "code", "method": "in", "value": filename_base}
]
}
existing_data_ids = jdy.get_all_dataId(appId, entryId, filter_data)
if len(existing_data_ids) > 0:
jdy.update_data(appId, entryId,existing_data_ids[0] ,json_data, transaction_id=transaction_id)
return
jdy.create_data(appId, entryId, json_data, transaction_id=transaction_id)
else:
logging.error(f"上传失败: {filepath} - 返回 key 为空")
except Exception as e:
logging.exception(f"上传异常: {e}")
def main():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
config = load_config()
watch_folder = config["watch_folder"]
os.makedirs(watch_folder, exist_ok=True)
event_handler = FileHandler(config)
observer = Observer()
observer.schedule(event_handler, watch_folder, recursive=True)
observer.start()
logging.info(f"监听页面启动,监听文件夹: {watch_folder}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.info("停止监听...")
observer.stop()
observer.join()
if __name__ == "__main__":
main() |