分享:云函数&前端事件 代码参考(20210729直播)

楼主
简道云应用场景探索者

说明:

因部分代码涉及到敏感信息,做了些微调,使用时请自行补充相关内容;另本篇内容建议大家仅做参考。

特别说明:部分代码基于网上内容修订。

补充:

导航:云函数&前端事件 内容集 

 

试卷类综合函数(参考)

 

# -*- coding: utf8 -*-
import json
import requests
import random
class Factory:    
    def __init__(self,tkl,ctl,xx,da):     
        self.release={}
        self.release["处理状态"]="反馈正常" 
        if not((len(xx)>0 and len(da)>0) or (len(tkl)>0 and len(ctl)>0)): 
            self.release["处理状态"]="参数无效" 
        if len(ctl)>0:  
            if len(tkl)==0 or int(tkl)<int(ctl):
                tkl=ctl      
            self.tkl=int(tkl)
            self.ctl=int(ctl)
            self.ctxl()
        if len(xx)>0 and len(da)>0: 
            self.release["cs1"]="xxda" 
            self.xx=[i for i in xx.split("###") if i !='']
            self.da=[i for i in da.split("###") if i !='']
            self.yy=self.xx        
            self.sjxx()
    def ctxl(self):
        self.release["试卷题号"]=list(range(1,int(self.ctl)+1))
        self.release["按序编号"]=list(range(1,int(self.ctl)+1))
        self.release["随机编号"]=random.sample(range(1, int(self.tkl)+1), int(self.ctl))
    def csxx(self):
        self.release["cs4"]=self.xx
    def sjxx(self):
        self.yy=self.xxmx(self.xx) 
        da=[i for i in self.damx(self.yy).split("###") if i!=""]
        self.release["当前答案"]=da
        self.yy=self.cznr(self.yy)
        self.release["当前选项"]=self.yy        
    def xxmx(self,xx):
        x=0
        s=""       
        self.release["cs2-1"]=xx
        while x<len(xx):
            self.release["cs2-2"]=x
            yy=[i for i in xx[x].split("^^^") if i !='']
            self.release["cs2-3"]=yy
            y=0
            random.shuffle(yy)
            while y<len(yy):
                self.release["cs2-4"]=y
                s=s+yy[y]+"^^^"
                y=y+1
            s=s+"###"
            x=x+1
        self.release["cs2"]="xxmx" 
        return s
    def damx(self,yy):
        x=0
        s=""
        yy=[i for i in yy.split("###") if i !='']
        while x<len(yy):
            yyy=[i for i in yy[x].split("^^^") if i !='']
            da=[i for i in self.da[x].split("^^^") if i !='']
            y=0
            ss=""
            while y<len(da):
                self.release["cs3-1"]="damx" 
                ss=ss+chr(yyy.index(da[y])+65)
                y=y+1
            ss=list(ss)
            ss.sort()
            ss="".join(ss)
            s=s+ss+"###"
            x=x+1
        self.release["cs3"]="damx" 
        return s
    def cznr(self,x):        
        q={}
        q["a"]=[i for i in x.split("###") if i !=""]
        print("b2:",q,len(q))
        l=len(q["a"])
        s=""
        b=0
        while b<l:
            c=[i for i in q["a"].split("^^^") if i != ""]
            d=0
            while d<len(c):
                s=s+chr(d+65)+". "+c[d]+"\n"
                d=d+1
            s=s+"###"
            print("b3:",s,b)
            b=b+1
        return [i for i in s.split("###") if i != ""]
    def Release(self):     
        return {
        "isBase64Encoded": False,
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps(self.release)
        } 
def main_handler(event, context):
    # 处理获取到的参数  
    H=event['headers']
    S=event["queryString"]
    if "tkl" in H:
        tkl= H["tkl"]
    else:
        tkl = ""
    if "ctl" in H:
        ctl= H["ctl"]
    else:
        ctl  = ""
    if "xx" in S:
        xx = S["xx"]
    else:
        xx = ""
    if "da" in S:
        da = S["da"]
    else:
        da = ""
    chuli = Factory(tkl,ctl,xx,da)
    # 返回处理结果
    return chuli.Release()

 

抖音解析函数(参考)

 基于网络源码修订

 

# -*- coding: utf8 -*-
import json
import re
import requests
class Douyin:
    def __init__(self, url):
        self.__url = url
        self.__aweme = "https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids="
        self.__title = ""
        self.__author = ""
        self.__mp3_title = ""
        self.__mp3_url = ""
        self.__mp4_url = ""
        self.__void_url = ""
    def __request(self):        
        share = requests.get(self.__url).url       
        void_id = re.search(r'video/.*?p', share)    
        str_id = void_id.group()
        str_id = str_id.replace("video/", "").replace("?p", "")
        self.__str_id = str_id  
    def request_vide(self):
        headers = {'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36'}
        self.__request()
        dow_url = self.__aweme + self.__str_id
        print("dow_url------",dow_url) 
        date = requests.get(dow_url,headers=headers).json()
        print("datel------",date)
        self.__title = date['item_list'][0]['desc']
        self.__author = date['item_list'][0]['author']['nickname']
        self.__mp3_title = date['item_list'][0]['music']['title']
        self.__mp3_url = date['item_list'][0]['music']['play_url']['uri']
        video_url = str(date['item_list'][0]['video']['play_addr']['url_list'][0])
        video_url = video_url.replace("playwm", "play")
        self.__void_url = video_url
    def response(self):
        self.__mp4_url = requests.get(self.__void_url).url
        print("SELFMP4URL------",self.__mp4_url) 
        return {
            "isBase64Encoded": False,
            "statusCode": 200,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({'mp3_title': self.__mp3_title, 'mp3_url': self.__mp3_url, 'video_url': self.__mp4_url,  'title': self.__title, 'author': self.__author})
        }
def main_handler(event, context):
    url = event['queryString']['cs']
    douyin = Douyin(url)
    douyin.request_vide()
    return douyin.response()

 

群通知函数(参考)

注意事项:

1、在钉钉群或企业微信群中创建群机器人;

2、获取 webhook 等信息,这个要自己保存好,泄露有安全风险;

 

# 企业微信群通知
import requests
import json
def main_handler(event, context):         
     headers = {"Content-Type": "text/plain"}
     url = event['queryString']['webhook']
     ss = event['queryString']['text']
     s=""
     l=0
     while l<len(ss.split("###")):
          s=s+"\n"+ss.split("###")[l]+"\n"
          l=l+1
     data = {
          "msgtype": "text",
          "text": {
          "content": s,
          }
     }
     requests.post(url, headers=headers, json=data)
     r={}
     r["release"]="ok"
     return {
     "isBase64Encoded": False,
     "statusCode": 200,
     "headers": {"Content-Type": "application/json"},
     "body": json.dumps(r)
     }
# 钉钉群通知
import requests
import json
def main_handler(event, context):
     headers = {'Content-Type': 'application/json'} 
     url = event['queryString']['webhook']
     ss= event['queryString']['text']  
     s=""
     l=0     
     while l<len(ss.split("###")):
          s=s+"\n"+ss.split("###")[l]+"\n"
          l=l+1
     s=s + "来自:微分享"   # 关键词
     data = {
          "msgtype": "text",
          "text": {
          "content": s,
          }
     }
     requests.post(url, headers=headers, json=data)     
     r={}
     r["release"]="ok"
     return {
    "isBase64Encoded": False,
    "statusCode": 200,
    "headers": {"Content-Type": "application/json"},
    "body": json.dumps(r)    
    } 

 

邮件通知函数(参考)

参考资料:

https://www.52pojie.cn/thread-1286071-1-1.html

 

# -*- coding: utf8 -*-
import json
import requests
import urllib.parse
from smtplib import SMTP_SSL
from email.mime.text import MIMEText
def main_handler(event, context):
    #功能配置
    mail_from = "xxxxxxx@qq.com"  #发送者邮箱
    pwd ="password"   #授权码
    ss = SMTP_SSL("smtp.qq.com")    #邮箱服务器
    #发送内容   
    mail_to = urllib.parse.unquote(event['queryString']["m"])  
    subject = urllib.parse.unquote(event['queryString']["s"])
    content = urllib.parse.unquote(event['queryString']["c"].replace("###","\n"))
    print("mail_to:",mail_to)
    print("subject:",subject)
    print("content:",content)
    #邮件发送
    msg = MIMEText(content)
    msg['Subject'] = subject
    msg['From'] = mail_from
    msg['To'] = mail_to    
    ss.login(mail_from,pwd)
    ss.sendmail(mail_from,mail_to,msg.as_string())
    ss.close()
    #发送状态
    try:
        r="success send"        
    except:
        r="error"  
    return{
            "isBase64Encoded": False,
            "statusCode": 200,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps(r)
        }

 

外连数据库函数(参考)

参考资料:

https://www.cnblogs.com/jiahuasir/p/10728045.html

https://blog.csdn.net/qq_39905917/article/details/88981867

 

import json
import requests
import pymysql
def tianjia(Mn,Mt): # 添加数据
    r = {}
    db = pymysql.connect(host='数据库地址', port=端口号,user='用户名',passwd='密码',db="数据库名",charset='utf8',cursorclass=pymysql.cursors.DictCursor)
    cursor=db.cursor() #获取数据库的操作游标
    sql="insert into 数据表名(myname,mytext) values(%s,%s)"
    try:
        cursor.execute(sql,(Mn,Mt))
        db.commit()  #这个方法才是真正将语句提交到数据库执行的方法
        r["release"] = "添加成功"
        r["myname"] = Mn
    except:
        print("failed")
        db.rollback() #回滚
        r["release"] = "添加失败"
    db.close()
    return r
def gengxin(Mn,Mt): # 更新数据
    r = {}
    db = pymysql.connect(host='数据库地址', port=端口号,user='用户名',passwd='密码',db="数据库名",charset='utf8',cursorclass=pymysql.cursors.DictCursor)
    cursor=db.cursor() #获取数据库的操作游标
    sql="update 数据表名 set mytext = %s where myname = %s"
    try:
        if cursor.execute(sql,(Mt,Mn)):
            db.commit()
            r["release"] = "更新成功"
            r["mytext"] = Mt
    except:
        db.rollback()
        r["release"] = "更新失败"
    db.close()
    return r
def tiqu(): # 提取数据
    r = {}
    db = pymysql.connect(host='数据库地址', port=端口号,user='用户名',passwd='密码',db="数据库名",charset='utf8',cursorclass=pymysql.cursors.DictCursor)
    cursor=db.cursor() #获取数据库的操作游标
    sql="select * from 数据表名"
    try:
        Mn=[]
        Mt=[]
        if cursor.execute(sql):
            print("Count:",cursor.rowcount) #查询出的总条数
            one=cursor.fetchone()  #获取结果的第一条数据
            print("One:",one)
            result=cursor.fetchall()  #获取所有结果
            for row in result:
                print(row)
                Mn.append(row["myname"])
                Mt.append(row["mytext"])
        r["myname"]=list(reversed(Mn))
        r["mytext"]=list(reversed(Mt))
        r["release"]= "提取成功"
    except:
        r["release"]= "提取失败" 
    db.close()
    return r
def chaxun(Mn): # 查询数据
    r = {}
    db = pymysql.connect(host='数据库地址', port=端口号,user='用户名',passwd='密码',db="数据库名",charset='utf8',cursorclass=pymysql.cursors.DictCursor)
    cursor=db.cursor() #获取数据库的操作游标
    sql="select * from 数据表名"
    try:
        t={}
        if cursor.execute(sql):
            result=cursor.fetchall()  #获取所有结果
            for row in result:
                #print(row)
                t[row["myname"]]=row["mytext"]
        r["mytext"]=t[Mn]
        r["release"]= "查询成功"        
    except:
        r["release"]= "查询失败" 
        r["mytext"]="未查询到此相关内容"
    db.close()
    return r
def shanchu(Mn): # 删除数据
    r = {}
    db = pymysql.connect(host='数据库地址', port=端口号,user='用户名',passwd='密码',db="数据库名",charset='utf8',cursorclass=pymysql.cursors.DictCursor)
    cursor=db.cursor() #获取数据库的操作游标
    table="数据表名"
    condition="myname="+chr(34)+Mn+chr(34)
    sql="delete from {table} where {condition}".format(table=table,condition=condition)
    print("sql:",sql)
    try:
        if cursor.execute(sql):
            db.commit()
            r["release"]= "删除成功"
            r["myname"] = Mn
    except:
        db.rollback()
        r["release"]= "删除失败"
    return r
def main_handler(event, context):
    r = {}
    cz = event['queryString']["cz"]
    if "添加" in cz:        
        r = tianjia(event['queryString']['mn'],event['queryString']['mt'])    
    elif "更新" in cz:        
        r = gengxin(event['queryString']['mn'],event['queryString']['mt'])    
    elif "提取" in cz:
        r = tiqu()    
    elif "查询" in cz:    
        r = chaxun(event['queryString']['mn'])    
    elif "删除" in cz:
        r = shanchu(event['queryString']['mn'])       
    return {
            "isBase64Encoded": False,
            "statusCode": 200,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps(r)
    }  

 

多功能函数(参考)

 

# -*- coding: utf8 -*-
import json
import random
import numpy
import urllib.parse
import requests
def main_handler(event, context):
    string=urllib.parse.unquote(event['queryString']['string'])
    if( "requests" in string):
        eval(string)
        s={}
        s["string"]="已发送"
        return {
        "isBase64Encoded": False,
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps(s)
        }
    else:
        r={}
        r["release"]=eval(string)
        r["string"]=str(r["release"])
        return {
        "isBase64Encoded": False,
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps(r)
        }

 

 

 

都看到这里了
如果您觉得有用
赞一个呗
赏一个呗
偶会更有动力哈
 
 
每天参与论坛“摇一摇”活动可免费获得F豆
更多沟通交流可添加微信(zmlnow)
添加时请备注:简道云
编辑于 2021-7-30 12:06
分享扩散:

沙发
发表于 2021-8-5 19:07:28
太牛了,原来你试卷带了这么多参数啊
板凳
发表于 2021-10-29 23:18:31
很棒很棒!
地板
发表于 2022-3-22 14:06:34
很厉害
5楼
发表于 2022-3-23 16:13:17 发布于APP客户端
最后一个能解释下代码具体的功能吗,谢谢。
6楼
发表于 2022-3-26 12:38:46 发布于APP客户端
感谢?
7楼
发表于 2022-6-4 15:51:52
腾讯云收费了,百度云函数也不会免费多久,张总研究下自建云函数服务吧,不然到时候还是折腾;
8楼
发表于 2024-4-5 13:15:11
不知道派什么用,用在哪里,怎么才能使用,感受到巨大的差距
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

10回帖数 11关注人数 9507浏览人数
最后回复于:2024-4-6 20:47

返回顶部 返回列表