|
各位帆软、简道云的同行们,大家好!
做企业级数据管理的朋友应该都有同感:很多公司会用 甲骨文的 Oracle NetSuite ERP 做核心业务管理(销售订单、采购、库存、生产等),同时用简道云做轻量化的应用(如CRM、PLM、报工、报销等管理系统),或是数据展示、流程审批、报表可视化,但两者之间的数据互通一直是个痛点 ——
- 要么手动导出 NetSuite 数据再导入简道云,繁琐且容易出错;要么找专业集成商,成本高且灵活性差;要么自己开发对接,又要踩 OAuth 认证、分页查询、速率限制、增量同步的各种坑。
- 琢磨了很久,我终于搞定了 NetSuite ERP 与简道云的自动化数据对接方案,并编写了完整的 Python 代码,支持增量同步、自动重试、多业务表查询,今天分享给大家,也想寻找有同类需求的同行一起交流优化! 我的微信(drchina)QQ( 58556211) ,欢迎加我讨论。
- 核心解决的需求(看看你是不是也有同样困扰?)
- 免手动同步:无需导出 / 导入 Excel,代码自动从 NetSuite 拉取数据,直接对接简道云(后续可搭配简道云 API 完成数据写入,本次先分享 NetSuite 数据拉取核心代码);
- 增量同步高效:基于
lastModifiedDate筛选,配合 Redis 记录上次同步时间,只拉取更新 / 新增数据,避免全量查询浪费资源;
- 抗异常能力强:支持接口自动重试,处理 NetSuite 429 并发限制、500 服务器错误,无需人工值守;
- 多业务表支持:内置支持销售订单(SalesOrd)、采购订单(PurchOrd)、物料(item)、库存(InventoryBalance)等多种核心表,可直接扩展;
- OAuth1.0 安全认证:遵循 NetSuite 官方认证规范,采用 HMAC-SHA256 签名,保障数据传输安全;
- 分页查询兼容:自动处理大数据量分页,支持
rel=next链接翻页,避免数据遗漏。
代码核心亮点说明
本次分享的代码是整个对接方案的核心(NetSuite 数据拉取),关键模块已做详细注释,新手也能看懂:
NetSuiteOAuth1类:封装认证、请求、重试逻辑,解耦代码,便于复用;
make_request_with_retry方法:核心异常处理,自动识别并发限制并等待,最大重试 15 次,保障请求成功率;
get_orders_by_lastmodified_suiteql方法:增量同步核心,用 SuiteQL 查询过滤数据,Redis 记录同步断点;
- 多日期格式兼容:自动解析 NetSuite 多种返回时间格式,避免格式转换报错。
# -*- coding: utf-8 -*-
"""
Created on Wed Sep 27 09:14:35 2023
@author: jw_chen
"""
from requests_oauthlib import OAuth1
import requests,json,html
from time import sleep
from datetime import datetime, timedelta,timezone
import pytz
import redis,threading
class NetSuiteOAuth1:
def __init__(self, account_id, consumer_key, consumer_secret, token_id, token_secret, realm=None):
"""
初始化NetSuite OAuth 1.0认证
参数:
account_id: NetSuite账户ID (如: 1234567)
consumer_key: 消费者密钥
consumer_secret: 消费者密钥
token_id: 访问令牌ID
token_secret: 访问令牌密钥
realm: 可选,通常是账户ID
"""
self.account_id = account_id
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.token_id = token_id
self.token_secret = token_secret
self.realm = realm or account_id
self.base_url = f"https://{account_id}.suitetalk.api.netsuite.com"
def get_auth(self):
"""返回OAuth1认证对象"""
return OAuth1(
self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.token_id,
resource_owner_secret=self.token_secret,
realm=self.realm,
signature_method='HMAC-SHA256', # NetSuite推荐使用SHA256
signature_type='auth_header'
)
def headers(self):
return {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Accept-Language': 'en'
}
def get_headers(self, content_type="application/json"):
nlauth_account = str(self.account_id).split('-')[0]
return {
"Authorization": (
f"NLAuth nlauth_account={nlauth_account}, "
f"nlauth_consumer_key={self.consumer_key}, "
f"nlauth_consumer_secret={self.consumer_secret}, "
f"nlauth_token_id={self.token_id}, "
f"nlauth_token_secret={self.token_secret}"
),
"Content-Type": content_type,
"Prefer": "return=representation"
}
def make_request(self, endpoint, method='GET', params=None, json=None, timeout=60):
"""
修复:添加 timeout 参数
"""
# 1. 构造 URL
if not endpoint.startswith("http"):
# 确保包含 /services/rest
if not endpoint.startswith("/services/rest"):
endpoint = f"/services/rest{endpoint}"
url = f"{self.base_url.rstrip('/')}{endpoint}"
else:
url = endpoint
# 2. 设置 headers
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
# 'Prefer': 'transient',
"Accept-Language": "en"
}
# 3. 获取 OAuth1 签名
auth = self.get_auth()
try:
response = requests.request(
method=method,
url=url,
headers=headers,
auth=auth,
params=params,
json=json, # 使用 json= 而不是 data=
timeout=timeout # 关键:把 timeout 传给 requests
)
return response
except requests.exceptions.RequestException as e:
# 捕获所有网络异常,便于重试
raise e
def make_request_with_retry(self, endpoint, method='GET', params=None, json=None, timeout=60, max_retries=15):
retry_count = 0
while True:
try:
response = self.make_request(
endpoint=endpoint,
method=method,
params=params,
json=json,
timeout=timeout
)
if response.status_code == 429:
try:
error_json = response.json()
details = error_json.get("o:errorDetails", [])
for d in details:
if d.get("o:errorCode") == "CONCURRENCY_LIMIT_EXCEEDED":
print("🚦 并发限制,强制等待 30 秒后重试...")
time.sleep(30)
break
else:
retry_after = int(response.headers.get('Retry-After', 10))
print(f"⏳ 速率限制,等待 {retry_after} 秒...")
time.sleep(retry_after)
except Exception as ex:
print(f"⚠️ 无法解析429响应,默认等待10秒: {ex}")
time.sleep(10)
continue
if response.status_code >= 500:
print(f"⚠️ 服务器错误 {response.status_code},重试中...")
time.sleep(2 ** retry_count)
retry_count += 1
continue
return response
except requests.exceptions.RequestException as e:
print(f"❌ 请求异常: {e}")
time.sleep(5)
retry_count += 1
if retry_count > max_retries:
raise Exception("❌ 最大重试次数已用尽")
def get_all_list(self, endpoint, limit=100):
"""
使用 rel=next 链接方式获取所有分页数据
"""
all_items = []
url = endpoint
params = {
'limit': limit,
'offset': 0
}
while True:
response = self.make_request_with_retry(url, method='GET', params=params)
if response.status_code != 200:
print(f"❌ 请求失败: {response.status_code} {response.text}")
break
data = response.json()
items = data.get('items', [])
all_items.extend(items)
print(f"📥 当前总获取数量: {len(all_items)}")
# 查找下一页链接
links = data.get('links', [])
next_link = None
for link in links:
if link.get('rel') == 'next':
next_link = link['href']
break
if next_link:
# 下一页已包含完整 URL,不能再传 params
url = next_link
params = {}
else:
break
return all_items
def get_orders_by_lastmodified_suiteql(self, key, limit=1000):
ALLOWED_TABLES = {'bomrevision', 'SalesOrd', 'PurchOrd', 'WorkOrd', 'VendBill', 'customer', 'bin', 'item', 'productionplan',"WOCompl","InventoryBalance","manufacturingrouting","InventoryBalance_group"}
if key not in ALLOWED_TABLES:
raise ValueError(f"不支持的表名: {key}")
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'prefer': 'transient',
'Accept-Language': 'en'
}
endpoint = "/services/rest/query/v1/suiteql"
offset = 0
all_items = []
max_last_modified = None
redis_client = redis.Redis(host='localhost', port=6379, db=0)
redis_key = f"{key}:last_sync_time"
# === 获取上次同步时间 ===
last_sync_time_str = redis_client.get(redis_key)
if last_sync_time_str is None:
default_time = "2025-12-09 00:00:00"
redis_client.set(redis_key, default_time)
last_sync_time_str = default_time
else:
last_sync_time_str = last_sync_time_str.decode('utf-8')
print(f"上次同步时间: {last_sync_time_str}")
# === 自动解析函数 ===
def parse_date(date_str):
"""尝试多种日期格式解析"""
for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ"):
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
raise ValueError(f"无法识别的时间格式: {date_str}")
# 将 Redis 的时间字符串转为 datetime
#last_sync_time = parse_date(last_sync_time_str)
print(f"查询的表: {key}")
while True:
if key == 'SalesOrd':
paginated_query = f"""
SELECT id, TO_CHAR(lastModifiedDate, 'YYYY-MM-DD HH24:MI:SS') as lastmodifieddate
FROM (
SELECT *,
ROW_NUMBER() OVER (ORDER BY lastModifiedDate DESC) AS row_num
FROM transaction
WHERE lastModifiedDate > TO_TIMESTAMP('{last_sync_time_str}', 'YYYY-MM-DD HH24:MI:SS')
AND type = 'SalesOrd'
) AS subquery
WHERE row_num > {offset} AND row_num <= {offset + limit}
"""
elif key == 'productionplan':
paginated_query = f"""
select id, TO_CHAR(lastModifiedDate, 'YYYY-MM-DD HH24:MI:SS') as lastmodifieddate,row_num from (select id, tranid , lastModifiedDate,ROW_NUMBER() OVER (ORDER BY lastModifiedDate DESC) AS row_num from (SELECT id, tranid,lastModifiedDate, row_number() over (PARTITION BY id,tranid ORDER BY lastModifiedDate DESC) as rn FROM transaction WHERE type = 'WorkOrd' AND lastModifiedDate > TO_TIMESTAMP('{last_sync_time_str}', 'YYYY-MM-DD HH24:MI:SS')) tt where rn =1) tt2
WHERE row_num > {offset} AND row_num <= {offset + limit}
"""
elif key =="test":
pp = f"""
select id, TO_CHAR(lastModifiedDate, 'YYYY-MM-DD HH24:MI:SS') as lastmodifieddate,row_num from (select id, tranid , lastModifiedDate,ROW_NUMBER() OVER (ORDER BY lastModifiedDate DESC) AS row_num from (SELECT id, tranid,lastModifiedDate, row_number() over (PARTITION BY id,tranid ORDER BY lastModifiedDate DESC) as rn FROM transaction WHERE type = 'WOCompl' AND lastModifiedDate > TO_TIMESTAMP('{last_sync_time_str}', 'YYYY-MM-DD HH24:MI:SS')) tt where rn =1) tt2
WHERE row_num > {offset} AND row_num <= {offset + limit}
"""
elif key =="WOCompl":
paginated_query = f"""
select id, TO_CHAR(lastModifiedDate, 'YYYY-MM-DD HH24:MI:SS') as lastmodifieddate,row_num from (select id, tranid , lastModifiedDate,ROW_NUMBER() OVER (ORDER BY lastModifiedDate DESC) AS row_num from (SELECT id, tranid,lastModifiedDate, row_number() over (PARTITION BY id,tranid ORDER BY lastModifiedDate DESC) as rn FROM transaction WHERE type = 'WOCompl' AND lastModifiedDate > TO_TIMESTAMP('{last_sync_time_str}', 'YYYY-MM-DD HH24:MI:SS')) tt where rn =1) tt2
WHERE row_num > {offset} AND row_num <= {offset + limit}
"""
elif key == 'item':
paginated_query = f"""
SELECT id, TO_CHAR(lastModifiedDate, 'YYYY-MM-DD HH24:MI:SS') as lastmodifieddate,row_num FROM ( SELECT id,lastModifiedDate, ROW_NUMBER() OVER (ORDER BY lastModifiedDate DESC) AS row_num FROM item WHERE 1=1 and lastModifiedDate> TO_TIMESTAMP('{last_sync_time_str}', 'YYYY-MM-DD HH24:MI:SS')) AS subquery
WHERE row_num > {offset} AND row_num <= {offset + limit}
"""
elif key == 'manufacturingrouting':
paginated_query = f"""
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (ORDER BY id DESC) AS row_num FROM manufacturingrouting WHERE 1=1 ) AS subquery
WHERE row_num > {offset} AND row_num <= {offset + limit}
"""
elif key == 'InventoryBalance':
paginated_query = f"""
SELECT * FROM ( SELECT i.custitem_wh_category, ROW_NUMBER() OVER (ORDER BY t1.item, t2.binnumber) AS row_number, t1.item AS item_id, BUILTIN.DF(t1.item) AS item_name, i.itemid AS item_code, i.displayname AS display_name, i.description AS item_description, i.purchasedescription AS purchase_description, i.custitem_item_sales_desc_eng AS sales_description_eng, i.manufacturer, i.mpn, i.cost, i.lastpurchaseprice, i.createddate, TO_CHAR(t1.lastmodifieddate, 'YYYY-MM-DD HH24:MI:SS') as lastmodifieddate
, i.itemtype, BUILTIN.DF(i.unitstype) AS unitstype_name, BUILTIN.DF(i.stockunit) AS stockunit_name, BUILTIN.DF(i.purchaseunit) AS purchaseunit_name, BUILTIN.DF(i.saleunit) AS saleunit_name, i.minimumquantity, i.custitem_item_moq, i.custitem_item_lead_time, t1.location, BUILTIN.DF(t1.location) AS location_name, t2.id AS bin_id, t2.binnumber AS bin_number, t1.quantityonhand, t1.quantityavailable, TO_CHAR(t1.lastmodifieddate,'YYYY-MM-DD HH24:MI:SS') AS last_change FROM InventoryBalance t1 LEFT JOIN Bin t2 ON t1.binnumber = t2.id LEFT JOIN Item i ON t1.item = i.id WHERE t1.lastmodifieddate > TO_TIMESTAMP('1979-12-01 00:00:00','YYYY-MM-DD HH24:MI:SS') AND t1.location = 18 ) sub WHERE sub.row_number > {offset} AND sub.row_number <= {offset + limit}
"""
elif key == 'InventoryBalance_group':
paginated_query = f"""
SELECT * FROM ( select * , ROW_NUMBER() OVER (ORDER BY item_id, bin_number) AS row_number from ( SELECT t1.item AS item_id,BUILTIN.DF(t1.item) AS item_name,CASE WHEN t2.binnumber LIKE 'Assembly%' OR t2.binnumber LIKE 'NS%' OR t2.binnumber LIKE 'Rework%' OR t2.binnumber LIKE 'Supplier%' OR t2.binnumber LIKE 'BF%' THEN '是' else '否' end AS bin_number,sum(t1.quantityonhand) as quantityonhand,sum(t1.quantityavailable) as quantityavailable FROM InventoryBalance t1 LEFT JOIN Bin t2 ON t1.binnumber = t2.id LEFT JOIN Item i ON t1.item = i.id WHERE t1.location = 18 group by t1.item, BUILTIN.DF(t1.item),CASE WHEN t2.binnumber LIKE 'Assembly%' OR t2.binnumber LIKE 'NS%' OR t2.binnumber LIKE 'Rework%' OR t2.binnumber LIKE 'Supplier%' OR t2.binnumber LIKE 'BF%' THEN '是' else '否' end) tt order by item_id,bin_number ) sub WHERE sub.row_number > {offset} AND sub.row_number <= {offset + limit}
"""
elif key in ['bomrevision', 'customer', 'bin']:
paginated_query = f"""
SELECT id, TO_CHAR(lastModifiedDate, 'YYYY-MM-DD HH24:MI:SS') as lastmodifieddate
FROM (
SELECT *,
ROW_NUMBER() OVER (ORDER BY lastModifiedDate DESC) AS row_num
FROM {key}
WHERE lastModifiedDate > TO_TIMESTAMP('{last_sync_time_str}', 'YYYY-MM-DD HH24:MI:SS')
) AS subquery
WHERE row_num > {offset} AND row_num <= {offset + limit}
"""
else:
paginated_query = f"""
SELECT id, TO_CHAR(lastModifiedDate, 'YYYY-MM-DD HH24:MI:SS') as lastmodifieddate
FROM (
SELECT id, lastmodifieddate,
ROW_NUMBER() OVER (ORDER BY lastModifiedDate DESC) AS row_num
FROM transaction
WHERE lastModifiedDate > TO_TIMESTAMP('{last_sync_time_str}', 'YYYY-MM-DD HH24:MI:SS')
AND type = '{key}'
) AS subquery
WHERE row_num > {offset} AND row_num <= {offset + limit}
"""
print(paginated_query)
response = requests.post(
f"{self.base_url}{endpoint}",
json={"q": paginated_query},
auth=self.get_auth(),
headers=headers,
)
if response.status_code != 200:
raise Exception(f"SuiteQL查询失败: {response.status_code} - {response.text}")
data = response.json()
items = data.get("items", [])
all_items.extend(items)
if len(items) < limit:
break
offset += limit
filtered_ids = [i.get("id") for i in all_items]
# === 更新 Redis ===
try:
max_last_modified = max( [i.get("lastmodifieddate") for i in all_items] )
except:
max_last_modified=[]
res_dic={}
print("最后时间:",max_last_modified)
if len(max_last_modified)>0:
redis_client.set(redis_key, str(max_last_modified))
#是以下的表单信息,则返回所有的字段信息
if key in ( 'InventoryBalance','manufacturingrouting','InventoryBalance_group'):
return all_items,max_last_modified,res_dic
return filtered_ids,max_last_modified,res_dic
|