解放双手.甲骨文的 Oracle NetSuite ERP 数据自动同步到简道云,避坑,已封装代码。

楼主
我是社区第322918位番薯,欢迎点我头像关注我哦~
各位帆软、简道云的同行们,大家好!
 
     做企业级数据管理的朋友应该都有同感:很多公司会用 甲骨文的 Oracle NetSuite ERP 做核心业务管理(销售订单、采购、库存、生产等),同时用简道云做轻量化的应用(如CRM、PLM、报工、报销等管理系统),或是数据展示、流程审批、报表可视化,但两者之间的数据互通一直是个痛点 ——
 
  1. 要么手动导出 NetSuite 数据再导入简道云,繁琐且容易出错;要么找专业集成商,成本高且灵活性差;要么自己开发对接,又要踩 OAuth 认证、分页查询、速率限制、增量同步的各种坑。
  2. 琢磨了很久,我终于搞定了 NetSuite ERP 与简道云的自动化数据对接方案,并编写了完整的 Python 代码,支持增量同步、自动重试、多业务表查询,今天分享给大家,也想寻找有同类需求的同行一起交流优化!  我的微信(drchina)QQ( 58556211) ,欢迎加我讨论。
  3. 核心解决的需求(看看你是不是也有同样困扰?)
 
  • 免手动同步:无需导出 / 导入 Excel,代码自动从 NetSuite 拉取数据,直接对接简道云(后续可搭配简道云 API 完成数据写入,本次先分享 NetSuite 数据拉取核心代码);
  • 增量同步高效:基于lastModifiedDate筛选,配合 Redis 记录上次同步时间,只拉取更新 / 新增数据,避免全量查询浪费资源;
  • 抗异常能力强:支持接口自动重试,处理 NetSuite 429 并发限制、500 服务器错误,无需人工值守;
  • 多业务表支持:内置支持销售订单(SalesOrd)、采购订单(PurchOrd)、物料(item)、库存(InventoryBalance)等多种核心表,可直接扩展;
  • OAuth1.0 安全认证:遵循 NetSuite 官方认证规范,采用 HMAC-SHA256 签名,保障数据传输安全;
  • 分页查询兼容:自动处理大数据量分页,支持rel=next链接翻页,避免数据遗漏。
 

代码核心亮点说明

 
本次分享的代码是整个对接方案的核心(NetSuite 数据拉取),关键模块已做详细注释,新手也能看懂:
 

 

  1. NetSuiteOAuth1类:封装认证、请求、重试逻辑,解耦代码,便于复用;
  2. make_request_with_retry方法:核心异常处理,自动识别并发限制并等待,最大重试 15 次,保障请求成功率;
  3. get_orders_by_lastmodified_suiteql方法:增量同步核心,用 SuiteQL 查询过滤数据,Redis 记录同步断点;
  4. 多日期格式兼容:自动解析 NetSuite 多种返回时间格式,避免格式转换报错。

 

 

# -*- coding: utf-8 -*-

"""

Created on Wed Sep 27 09:14:35 2023

 

@author: jw_chen

"""

 

from requests_oauthlib import OAuth1

import requests,json,html

from time import sleep

from datetime import datetime, timedelta,timezone

import pytz

 

import redis,threading

 

class NetSuiteOAuth1:

    def __init__(self, account_id, consumer_key, consumer_secret, token_id, token_secret, realm=None):

        """

        初始化NetSuite OAuth 1.0认证

 

        参数:

            account_id: NetSuite账户ID (如: 1234567)

            consumer_key: 消费者密钥

            consumer_secret: 消费者密钥

            token_id: 访问令牌ID

            token_secret: 访问令牌密钥

            realm: 可选,通常是账户ID

        """

        self.account_id = account_id

        self.consumer_key = consumer_key

        self.consumer_secret = consumer_secret

        self.token_id = token_id

        self.token_secret = token_secret

        self.realm = realm or account_id

        self.base_url = f"https://{account_id}.suitetalk.api.netsuite.com"

 

    def get_auth(self):

        """返回OAuth1认证对象"""

        return OAuth1(

            self.consumer_key,

            client_secret=self.consumer_secret,

            resource_owner_key=self.token_id,

            resource_owner_secret=self.token_secret,

            realm=self.realm,

            signature_method='HMAC-SHA256',  # NetSuite推荐使用SHA256

            signature_type='auth_header'

        )

    def headers(self):

        return {

            'Content-Type': 'application/json',

            'Accept': 'application/json',

            'Accept-Language': 'en'

            }

   

    def get_headers(self, content_type="application/json"):

        nlauth_account = str(self.account_id).split('-')[0]

        return {

            "Authorization": (

                f"NLAuth nlauth_account={nlauth_account}, "

                f"nlauth_consumer_key={self.consumer_key}, "

                f"nlauth_consumer_secret={self.consumer_secret}, "

                f"nlauth_token_id={self.token_id}, "

                f"nlauth_token_secret={self.token_secret}"

            ),

            "Content-Type": content_type,

            "Prefer": "return=representation"

        }

 

    def make_request(self, endpoint, method='GET', params=None, json=None, timeout=60):

        """

         修复:添加 timeout 参数

        """

        # 1. 构造 URL

        if not endpoint.startswith("http"):

            # 确保包含 /services/rest

            if not endpoint.startswith("/services/rest"):

                endpoint = f"/services/rest{endpoint}"

            url = f"{self.base_url.rstrip('/')}{endpoint}"

        else:

            url = endpoint

 

        # 2. 设置 headers

        headers = {

            'Content-Type': 'application/json',

            'Accept': 'application/json',

            # 'Prefer': 'transient',

            "Accept-Language": "en"

        }

 

        # 3. 获取 OAuth1 签名

        auth = self.get_auth()

 

        try:

            response = requests.request(

                method=method,

                url=url,

                headers=headers,

                auth=auth,

                params=params,

                json=json,  #  使用 json= 而不是 data=

                timeout=timeout  #  关键:把 timeout 传给 requests

            )

            return response

        except requests.exceptions.RequestException as e:

            # 捕获所有网络异常,便于重试

            raise e

 

    def make_request_with_retry(self, endpoint, method='GET', params=None, json=None, timeout=60, max_retries=15):

        retry_count = 0

        while True:

            try:

                response = self.make_request(

                    endpoint=endpoint,

                    method=method,

                    params=params,

                    json=json,

                    timeout=timeout

                )

    

                if response.status_code == 429:

                    try:

                        error_json = response.json()

                        details = error_json.get("o:errorDetails", [])

                        for d in details:

                            if d.get("o:errorCode") == "CONCURRENCY_LIMIT_EXCEEDED":

                                print("🚦 并发限制,强制等待 30 秒后重试...")

                                time.sleep(30)

                                break

                        else:

                            retry_after = int(response.headers.get('Retry-After', 10))

                            print(f"⏳ 速率限制,等待 {retry_after} 秒...")

                            time.sleep(retry_after)

                    except Exception as ex:

                        print(f"⚠️ 无法解析429响应,默认等待10秒: {ex}")

                        time.sleep(10)

                    continue

    

                if response.status_code >= 500:

                    print(f"⚠️ 服务器错误 {response.status_code},重试中...")

                    time.sleep(2 ** retry_count)

                    retry_count += 1

                    continue

    

                return response

    

            except requests.exceptions.RequestException as e:

                print(f"❌ 请求异常: {e}")

                time.sleep(5)

                retry_count += 1

    

            if retry_count > max_retries:

                raise Exception("❌ 最大重试次数已用尽")

 

 

    def get_all_list(self, endpoint, limit=100):

        """

        使用 rel=next 链接方式获取所有分页数据

        """

        all_items = []

        url = endpoint

        params = {

            'limit': limit,

            'offset': 0

        }

    

        while True:

            response = self.make_request_with_retry(url, method='GET', params=params)

            if response.status_code != 200:

                print(f"❌ 请求失败: {response.status_code} {response.text}")

                break

    

            data = response.json()

            items = data.get('items', [])

            all_items.extend(items)

    

            print(f"📥 当前总获取数量: {len(all_items)}")

    

            # 查找下一页链接

            links = data.get('links', [])

            next_link = None

            for link in links:

                if link.get('rel') == 'next':

                    next_link = link['href']

                    break

    

            if next_link:

                # 下一页已包含完整 URL,不能再传 params

                url = next_link

                params = {}

            else:

                break

        return all_items

 

 

    def get_orders_by_lastmodified_suiteql(self, key, limit=1000):

        ALLOWED_TABLES = {'bomrevision', 'SalesOrd', 'PurchOrd', 'WorkOrd', 'VendBill', 'customer', 'bin', 'item', 'productionplan',"WOCompl","InventoryBalance","manufacturingrouting","InventoryBalance_group"}

        if key not in ALLOWED_TABLES:

            raise ValueError(f"不支持的表名: {key}")

 

        headers = {

            'Content-Type': 'application/json',

            'Accept': 'application/json',

            'prefer': 'transient',

            'Accept-Language': 'en'

        }

 

        endpoint = "/services/rest/query/v1/suiteql"

        offset = 0

        all_items = []

        max_last_modified = None

 

        redis_client = redis.Redis(host='localhost', port=6379, db=0)

        redis_key = f"{key}:last_sync_time"

 

        # === 获取上次同步时间 ===

        last_sync_time_str = redis_client.get(redis_key)

        if last_sync_time_str is None:

            default_time = "2025-12-09 00:00:00"

            redis_client.set(redis_key, default_time)

            last_sync_time_str = default_time

        else:

            last_sync_time_str = last_sync_time_str.decode('utf-8')

 

        print(f"上次同步时间: {last_sync_time_str}")

 

        # === 自动解析函数 ===

        def parse_date(date_str):

            """尝试多种日期格式解析"""

            for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ"):

                try:

                    return datetime.strptime(date_str, fmt)

                except ValueError:

                    continue

            raise ValueError(f"无法识别的时间格式: {date_str}")

 

        # 将 Redis 的时间字符串转为 datetime

        #last_sync_time = parse_date(last_sync_time_str)

 

        print(f"查询的表: {key}")

        

 

        while True:

            if key == 'SalesOrd':

                paginated_query = f"""

                SELECT id, TO_CHAR(lastModifiedDate, 'YYYY-MM-DD HH24:MI:SS')  as lastmodifieddate 

                FROM (

                    SELECT *,

                          ROW_NUMBER() OVER (ORDER BY lastModifiedDate DESC) AS row_num

                    FROM transaction 

                    WHERE lastModifiedDate   > TO_TIMESTAMP('{last_sync_time_str}', 'YYYY-MM-DD HH24:MI:SS')

                      AND type = 'SalesOrd'

                ) AS subquery

                WHERE row_num > {offset} AND row_num <= {offset + limit}

                """

            

            elif key == 'productionplan':

                paginated_query = f"""

                select id, TO_CHAR(lastModifiedDate, 'YYYY-MM-DD HH24:MI:SS')  as lastmodifieddate,row_num  from (select id, tranid , lastModifiedDate,ROW_NUMBER() OVER (ORDER BY lastModifiedDate DESC) AS row_num  from (SELECT id, tranid,lastModifiedDate, row_number() over (PARTITION BY id,tranid ORDER BY lastModifiedDate DESC) as rn FROM transaction WHERE type = 'WorkOrd' AND lastModifiedDate > TO_TIMESTAMP('{last_sync_time_str}', 'YYYY-MM-DD HH24:MI:SS')) tt where rn =1) tt2 

                WHERE row_num > {offset} AND row_num <= {offset + limit}

                """

            elif key =="test":

                pp = f"""

                select id, TO_CHAR(lastModifiedDate, 'YYYY-MM-DD HH24:MI:SS')  as lastmodifieddate,row_num  from (select id, tranid , lastModifiedDate,ROW_NUMBER() OVER (ORDER BY lastModifiedDate DESC) AS row_num  from (SELECT id, tranid,lastModifiedDate, row_number() over (PARTITION BY id,tranid ORDER BY lastModifiedDate DESC) as rn FROM transaction WHERE type = 'WOCompl' AND lastModifiedDate > TO_TIMESTAMP('{last_sync_time_str}', 'YYYY-MM-DD HH24:MI:SS')) tt where rn =1) tt2 

                WHERE row_num > {offset} AND row_num <= {offset + limit}

                """

            elif key =="WOCompl":

                paginated_query = f"""

                select id, TO_CHAR(lastModifiedDate, 'YYYY-MM-DD HH24:MI:SS')  as lastmodifieddate,row_num  from (select id, tranid , lastModifiedDate,ROW_NUMBER() OVER (ORDER BY lastModifiedDate DESC) AS row_num  from (SELECT id, tranid,lastModifiedDate, row_number() over (PARTITION BY id,tranid ORDER BY lastModifiedDate DESC) as rn FROM transaction WHERE type = 'WOCompl' AND lastModifiedDate > TO_TIMESTAMP('{last_sync_time_str}', 'YYYY-MM-DD HH24:MI:SS')) tt where rn =1) tt2 

                WHERE row_num > {offset} AND row_num <= {offset + limit}

                """

            elif key == 'item':

                paginated_query = f"""

                SELECT id, TO_CHAR(lastModifiedDate, 'YYYY-MM-DD HH24:MI:SS')  as lastmodifieddate,row_num  FROM ( SELECT id,lastModifiedDate, ROW_NUMBER() OVER (ORDER BY lastModifiedDate DESC) AS row_num  FROM item WHERE 1=1 and lastModifiedDate> TO_TIMESTAMP('{last_sync_time_str}', 'YYYY-MM-DD HH24:MI:SS')) AS subquery

                WHERE row_num > {offset} AND row_num <= {offset + limit}

                """

            elif key == 'manufacturingrouting':

                paginated_query = f"""

                SELECT *  FROM ( SELECT *, ROW_NUMBER() OVER (ORDER BY id DESC) AS row_num  FROM manufacturingrouting WHERE 1=1 ) AS subquery

                WHERE row_num > {offset} AND row_num <= {offset + limit}

                """

            

            elif key == 'InventoryBalance':

                paginated_query = f"""

                SELECT * FROM ( SELECT i.custitem_wh_category, ROW_NUMBER() OVER (ORDER BY t1.item, t2.binnumber) AS row_number, t1.item AS item_id, BUILTIN.DF(t1.item) AS item_name, i.itemid AS item_code, i.displayname AS display_name, i.description AS item_description, i.purchasedescription AS purchase_description, i.custitem_item_sales_desc_eng AS sales_description_eng, i.manufacturer, i.mpn, i.cost, i.lastpurchaseprice, i.createddate, TO_CHAR(t1.lastmodifieddate, 'YYYY-MM-DD HH24:MI:SS')  as lastmodifieddate

                , i.itemtype, BUILTIN.DF(i.unitstype) AS unitstype_name, BUILTIN.DF(i.stockunit) AS stockunit_name, BUILTIN.DF(i.purchaseunit) AS purchaseunit_name, BUILTIN.DF(i.saleunit) AS saleunit_name, i.minimumquantity, i.custitem_item_moq, i.custitem_item_lead_time, t1.location, BUILTIN.DF(t1.location) AS location_name, t2.id AS bin_id, t2.binnumber AS bin_number, t1.quantityonhand, t1.quantityavailable, TO_CHAR(t1.lastmodifieddate,'YYYY-MM-DD HH24:MI:SS') AS last_change FROM InventoryBalance t1 LEFT JOIN Bin t2 ON t1.binnumber = t2.id LEFT JOIN Item i ON t1.item = i.id WHERE t1.lastmodifieddate > TO_TIMESTAMP('1979-12-01 00:00:00','YYYY-MM-DD HH24:MI:SS') AND t1.location = 18 ) sub WHERE sub.row_number > {offset} AND sub.row_number <= {offset + limit}

 

 

                """

            elif key == 'InventoryBalance_group':

                

                paginated_query = f"""

                SELECT * FROM ( select * , ROW_NUMBER() OVER (ORDER BY item_id,  bin_number) AS row_number  from  ( SELECT t1.item AS item_id,BUILTIN.DF(t1.item) AS item_name,CASE WHEN t2.binnumber LIKE 'Assembly%' OR t2.binnumber LIKE 'NS%' OR t2.binnumber LIKE 'Rework%' OR t2.binnumber LIKE 'Supplier%' OR t2.binnumber LIKE 'BF%' THEN '是' else '否' end AS bin_number,sum(t1.quantityonhand) as quantityonhand,sum(t1.quantityavailable) as quantityavailable    FROM InventoryBalance t1 LEFT JOIN Bin t2 ON t1.binnumber = t2.id LEFT JOIN Item i ON t1.item = i.id WHERE  t1.location = 18 group by t1.item, BUILTIN.DF(t1.item),CASE WHEN t2.binnumber LIKE 'Assembly%' OR t2.binnumber LIKE 'NS%' OR t2.binnumber LIKE 'Rework%' OR t2.binnumber LIKE 'Supplier%' OR t2.binnumber LIKE 'BF%' THEN '是' else '否' end) tt order by item_id,bin_number ) sub WHERE sub.row_number > {offset} AND sub.row_number <= {offset + limit}

                """

 

            

            elif key in ['bomrevision', 'customer', 'bin']:

                paginated_query = f"""

                SELECT id, TO_CHAR(lastModifiedDate, 'YYYY-MM-DD HH24:MI:SS')  as lastmodifieddate  

                FROM (

                    SELECT *,

                          ROW_NUMBER() OVER (ORDER BY lastModifiedDate DESC) AS row_num

                    FROM {key}  

                    WHERE lastModifiedDate   > TO_TIMESTAMP('{last_sync_time_str}', 'YYYY-MM-DD HH24:MI:SS')

                ) AS subquery

                WHERE row_num > {offset} AND row_num <= {offset + limit}

                """

            else:

                paginated_query = f"""

                SELECT id, TO_CHAR(lastModifiedDate, 'YYYY-MM-DD HH24:MI:SS')  as lastmodifieddate  

                FROM (

                    SELECT  id, lastmodifieddate,

                          ROW_NUMBER() OVER (ORDER BY lastModifiedDate DESC) AS row_num

                    FROM transaction 

                    WHERE lastModifiedDate > TO_TIMESTAMP('{last_sync_time_str}', 'YYYY-MM-DD HH24:MI:SS')   

                      AND type = '{key}'

                ) AS subquery

                WHERE row_num > {offset} AND row_num <= {offset + limit}

                """

            

            print(paginated_query)

 

            response = requests.post(

                f"{self.base_url}{endpoint}",

                json={"q": paginated_query},

                auth=self.get_auth(),

                headers=headers,

            )

 

            if response.status_code != 200:

                raise Exception(f"SuiteQL查询失败: {response.status_code} - {response.text}")

 

            data = response.json()

            items = data.get("items", [])

            all_items.extend(items)

 

 

            if len(items) < limit:

                break

 

            offset += limit

 

    

        filtered_ids = [i.get("id") for i in all_items]

        # === 更新 Redis ===

        try:

            max_last_modified = max( [i.get("lastmodifieddate") for i in all_items] )

        except:

            max_last_modified=[]

            

        res_dic={}

        print("最后时间:",max_last_modified)

        if len(max_last_modified)>0:

            redis_client.set(redis_key, str(max_last_modified))

 

        #是以下的表单信息,则返回所有的字段信息

        if key in ( 'InventoryBalance','manufacturingrouting','InventoryBalance_group'):

            return all_items,max_last_modified,res_dic

    

        return filtered_ids,max_last_modified,res_dic

 

分享扩散:

沙发
发表于 2026-1-23 18:32:16
自己占一楼,我的微信(drchina)QQ( 58556211) ,欢迎加我讨论。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

1回帖数 1关注人数 292浏览人数
最后回复于:2026-1-23 18:32

返回顶部 返回列表