290 lines
11 KiB
Python
290 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
import datetime
|
|
import json
|
|
import base64
|
|
import hashlib
|
|
import os
|
|
import random
|
|
|
|
import gvcode
|
|
import logging
|
|
|
|
import pytz
|
|
import requests
|
|
from odoo import http, tools
|
|
from .tool import MakeResponse
|
|
from passlib.context import CryptContext
|
|
from aliyunsdkcore.client import AcsClient
|
|
from aliyunsdkdysmsapi.request.v20170525 import SendSmsRequest
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
def send_sms(template_code, phone_number):
|
|
ACCESS_KEY_ID = 'LTAI5tRWntKUTfyrjgtitNwR'
|
|
ACCESS_KEY_SECRET = 'GTvSx3dGfOlk4UCKpUdksciKzVJQ3Z'
|
|
SIGN_NAME = '有数了平台'
|
|
|
|
code = "".join([str(random.randint(0, 9)) for r in range(6)])
|
|
|
|
acs_client = AcsClient(ACCESS_KEY_ID, ACCESS_KEY_SECRET, 'cn-hangzhou')
|
|
|
|
# 创建request实例
|
|
request = SendSmsRequest.SendSmsRequest()
|
|
|
|
request.set_TemplateCode(template_code)
|
|
request.set_TemplateParam('{"code":' + code + '}')
|
|
request.set_SignName(SIGN_NAME)
|
|
request.set_PhoneNumbers(phone_number)
|
|
|
|
# 发送短信
|
|
acs_client.do_action_with_exception(request)
|
|
|
|
return {"code": code, "sendTime": datetime.datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S")}
|
|
|
|
|
|
class BaseController(http.Controller):
|
|
|
|
@http.route('/tx_base/login', methods=['post'], auth='public', csrf=False, website=True, cors="*")
|
|
def login(self, **kwargs):
|
|
try:
|
|
login = http.request.params['username']
|
|
password = http.request.params['password']
|
|
|
|
users = http.request.env["res.users"].sudo().search([("login", "=", login)], limit=1)
|
|
if users.__len__() == 0:
|
|
return MakeResponse.error("用户不存在")
|
|
|
|
if users.opentker_user_type == "opentker":
|
|
ctx = CryptContext(
|
|
['pbkdf2_sha512', 'plaintext'],
|
|
deprecated=['auto'],
|
|
pbkdf2_sha512__rounds=max(350000, 0),
|
|
)
|
|
password = ctx.hash(login+password+users.salt)
|
|
|
|
data = json.dumps({
|
|
'params': {
|
|
'db': http.request.session.db,
|
|
'login': login,
|
|
'password': password,
|
|
},
|
|
})
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
response = requests.post(url=f"{http.request.env.user.get_base_url()}/web/session/authenticate", data=data,
|
|
headers=headers)
|
|
session = response.cookies["session_id"]
|
|
loginData = response.json()
|
|
uid = loginData['result']['uid']
|
|
users = http.request.env["res.users"].sudo().search([('id', '=', uid)])
|
|
return MakeResponse.success({
|
|
"loginName": users.login,
|
|
"phonenumber": users.partner_id.phone or "15888888888",
|
|
"label": users.label_ids.ids,
|
|
"userName": users.partner_id.name,
|
|
"userId": users.id,
|
|
"pwdChanged": True,
|
|
"email": users.partner_id.email,
|
|
"session": session
|
|
})
|
|
|
|
except Exception as e:
|
|
_logger.error(e)
|
|
return MakeResponse.error("账号/密码错误")
|
|
|
|
@http.route('/tx_base/pub/getLoginUser', methods=['POST'], auth='public', csrf=False, website=True, cors="*")
|
|
def get_login_user(self, **kwargs):
|
|
try:
|
|
loginName = http.request.params['loginName']
|
|
|
|
users = http.request.env["res.users"].sudo().search([("name", "=", loginName)], limit=1)
|
|
|
|
MakeResponse.success({
|
|
"loginName": users.login,
|
|
"phonenumber": users.partner_id.phone or "15888888888",
|
|
"label": users.label_ids.ids or "",
|
|
"userName": users.partner_id.name,
|
|
"userId": users.id,
|
|
"pwdChanged": True,
|
|
"email": users.partner_id.email,
|
|
})
|
|
|
|
except Exception as e:
|
|
_logger.error(e)
|
|
return MakeResponse.error("系统错误")
|
|
|
|
@http.route('/tx_base/api/login/getLoginByLoginName', methods=['GET'], auth='public', csrf=False, website=True, cors="*")
|
|
def get_login_by_login_name(self, **kwargs):
|
|
try:
|
|
loginName = http.request.params['loginName']
|
|
|
|
users = http.request.env["res.users"].sudo().search([("name", "=", loginName)], limit=1)
|
|
|
|
MakeResponse.success({
|
|
"loginName": users.login,
|
|
"phonenumber": users.partner_id.phone or "15888888888",
|
|
"label": users.label_ids.ids or "",
|
|
"userName": users.partner_id.name,
|
|
"userId": users.id,
|
|
"pwdChanged": True,
|
|
"email": users.partner_id.email,
|
|
})
|
|
|
|
except Exception as e:
|
|
_logger.error(e)
|
|
return MakeResponse.error("系统错误")
|
|
|
|
@http.route('/tx_base/api/label/saveLabel', methods=['POST'], auth='public', csrf=False, website=True, cors="*")
|
|
def save_label(self, **kwargs):
|
|
try:
|
|
params = kwargs or json.loads(http.request.httprequest.data.decode("utf-8"))
|
|
userId = params['userId']
|
|
label = params['label']
|
|
|
|
users = http.request.env["res.users"].sudo().search([("id", "=", userId)], limit=1)
|
|
users.write({
|
|
"label_ids": [(6, 0, [int(id) for id in label.split(",")])]
|
|
})
|
|
|
|
return MakeResponse.success("添加成功")
|
|
|
|
except Exception as e:
|
|
_logger.error(e)
|
|
return MakeResponse.error("系统错误")
|
|
|
|
@http.route('/tx_base/register2', methods=['post'], auth='public', csrf=False, website=True, cors="*")
|
|
def signup(self, **kwargs):
|
|
try:
|
|
login = http.request.params['loginName']
|
|
password = http.request.params['password']
|
|
confirm_password = http.request.params['againPassword']
|
|
user_type = http.request.params['type']
|
|
|
|
if password != confirm_password:
|
|
return MakeResponse.error("两次密码不相同")
|
|
|
|
group_portal = http.request.env['ir.model.data'].sudo()._xmlid_to_res_id('base.group_portal')
|
|
user = http.request.env['res.users'].sudo().create({
|
|
"login": login,
|
|
"name": login,
|
|
"password": password,
|
|
"groups_id": [(6, 0, [group_portal])]
|
|
})
|
|
|
|
return MakeResponse.success({"userId": user.id, "UserName": user.name})
|
|
|
|
except Exception as e:
|
|
_logger.error(e)
|
|
return MakeResponse.error(e)
|
|
|
|
@http.route('/tx/user/logout', methods=['post'], auth='user', csrf=False, sitemap=False, cors="*")
|
|
def logout(self):
|
|
http.request.session.logout(keep_db=True)
|
|
return MakeResponse.success('退出登录')
|
|
|
|
@http.route('/tx_base/api/dict/selectDictDataByType', type='http', auth="public", website=True, cors="*")
|
|
def select_dict_data_by_type(self, dict_type, **kwargs):
|
|
try:
|
|
result = []
|
|
dict_obj = http.request.env["tx.data.dict"].sudo()
|
|
dict_id = dict_obj.search([("code", "=", dict_type)], limit=1)
|
|
if dict_id.__len__():
|
|
data_ids = dict_obj.search([("parent_id", "=", dict_id.id)])
|
|
for data_id in data_ids:
|
|
result.append({"dictLabel": data_id.id, "dictValue": data_id.name})
|
|
return MakeResponse.success(result)
|
|
except Exception as e:
|
|
_logger.warning(e)
|
|
return MakeResponse.error("服务器错误")
|
|
|
|
@http.route("/tx_base/captcha/captchaImage", type='http', auth="public", website=True, cors="*")
|
|
def captcha_image(self, **kwargs):
|
|
rootPath = os.path.dirname(os.path.dirname(__file__))
|
|
s, v = gvcode.generate()
|
|
image_str = "data:image/png;base64,"
|
|
originFilePath = os.path.join(rootPath, '%s.jpg' % v)
|
|
s.save(originFilePath)
|
|
|
|
with open(originFilePath, 'rb') as file:
|
|
image_bs64 = base64.b64encode(file.read())
|
|
os.remove(originFilePath)
|
|
image_data = f"""{image_str}{image_bs64.decode("utf-8")}"""
|
|
return MakeResponse.success({"image": image_data, "code": v})
|
|
|
|
@http.route("/tx_base/pub/sendRegCode", methods=['POST'], auth='public', csrf=False, website=True, cors="*")
|
|
def send_reg_code(self, **kw):
|
|
try:
|
|
template_code = 'SMS_242695354'
|
|
phone_number = kw.get("loginName")
|
|
|
|
data = send_sms(template_code, phone_number)
|
|
|
|
return MakeResponse.success(data)
|
|
except Exception as e:
|
|
_logger.error(e)
|
|
return MakeResponse.error("系统错误")
|
|
|
|
# 有数了 -- 首页banner 列表
|
|
@http.route("/tx_base/api/describe/list", methods=['POST'], auth='public', csrf=False, website=True, cors="*")
|
|
def describe_list(self, **kw):
|
|
model_obj = http.request.env["tx.home.banner"].sudo()
|
|
records = []
|
|
|
|
domain = [("home_id", "=", 1)]
|
|
|
|
dataAll = model_obj.search(domain)
|
|
|
|
for record in dataAll:
|
|
records.append({
|
|
"pcFile": f"""{record.get_base_url()}{http.request.website.image_url(record, 'img')}""",
|
|
"oneTitle": record.banner_one_title,
|
|
"twoTitle": record.banner_two_title,
|
|
"threeTitle": record.banner_three_title,
|
|
"buttonName": record.banner_btn_name,
|
|
"url": record.banner_btn_url,
|
|
})
|
|
return MakeResponse.success(records)
|
|
|
|
@http.route("/tx_base/api/upload/uploadFile", methods=['POST'], auth='public', csrf=False, website=True, cors="*")
|
|
def upload_file(self, **kwargs):
|
|
try:
|
|
model_obj = http.request.env["ir.attachment"].sudo()
|
|
file = kwargs.get("file", False)
|
|
file_name = file.filename
|
|
file_store_path = tools.config.filestore(http.request.env.cr.dbname)
|
|
|
|
# 获取模块根目录
|
|
rootPath = os.path.dirname(os.path.dirname(__file__))
|
|
|
|
# 文件临时存放路径
|
|
temp_path = os.path.join(rootPath, "temp_file")
|
|
if not os.path.exists(temp_path):
|
|
os.mkdir(temp_path)
|
|
|
|
temp_file = os.path.join(temp_path, file_name)
|
|
with open(temp_file, "wb") as f:
|
|
f.write(file.read())
|
|
f.flush()
|
|
size = 0
|
|
with open(temp_file, "rb") as rb:
|
|
file_data = rb.read()
|
|
size = rb.seek(0, os.SEEK_END)
|
|
attachment_id = model_obj.create({
|
|
"datas": base64.encodebytes(file_data),
|
|
"type": "binary",
|
|
"name": file_name,
|
|
})
|
|
os.remove(temp_file)
|
|
return MakeResponse.success({"realName": file_name, "url": "", "fileId": attachment_id.id, "size": size})
|
|
|
|
except Exception as e:
|
|
_logger.error(e)
|
|
return MakeResponse.error("上传失败")
|
|
|
|
# 有数了 -- 中信联登录
|
|
@http.route("/tx_base/api/login/zxlLogin", methods=['POST'], auth='public', csrf=False, website=True, cors="*")
|
|
def zxl_Login(self, **kw):
|
|
params = json.loads(http.request.httprequest.data.decode("utf-8")) or {}
|
|
records = requests.post(f"https://www.youshule.cn/base/api/login/zxlLogin", params)
|
|
return MakeResponse.success(records)
|