# -*- coding: utf-8 -*- import datetime import json import base64 import hashlib import os import random import gvcode import logging import pytz import requests from odoo import http, tools from .tool import MakeResponse from passlib.context import CryptContext from aliyunsdkcore.client import AcsClient from aliyunsdkdysmsapi.request.v20170525 import SendSmsRequest _logger = logging.getLogger(__name__) def send_sms(template_code, phone_number): ACCESS_KEY_ID = 'LTAI5tRWntKUTfyrjgtitNwR' ACCESS_KEY_SECRET = 'GTvSx3dGfOlk4UCKpUdksciKzVJQ3Z' SIGN_NAME = '有数了平台' code = "".join([str(random.randint(0, 9)) for r in range(6)]) acs_client = AcsClient(ACCESS_KEY_ID, ACCESS_KEY_SECRET, 'cn-hangzhou') # 创建request实例 request = SendSmsRequest.SendSmsRequest() request.set_TemplateCode(template_code) request.set_TemplateParam('{"code":' + code + '}') request.set_SignName(SIGN_NAME) request.set_PhoneNumbers(phone_number) # 发送短信 acs_client.do_action_with_exception(request) return {"code": code, "sendTime": datetime.datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S")} class BaseController(http.Controller): @http.route('/tx_base/login', methods=['post'], auth='public', csrf=False, website=True, cors="*") def login(self, **kwargs): try: login = http.request.params['username'] password = http.request.params['password'] users = http.request.env["res.users"].sudo().search([("login", "=", login)], limit=1) if users.__len__() == 0: return MakeResponse.error("用户不存在") if users.opentker_user_type == "opentker": ctx = CryptContext( ['pbkdf2_sha512', 'plaintext'], deprecated=['auto'], pbkdf2_sha512__rounds=max(350000, 0), ) password = ctx.hash(login+password+users.salt) data = json.dumps({ 'params': { 'db': http.request.session.db, 'login': login, 'password': password, }, }) headers = {"Content-Type": "application/json"} response = requests.post(url=f"{http.request.env.user.get_base_url()}/web/session/authenticate", data=data, headers=headers) session = response.cookies["session_id"] loginData = response.json() uid = loginData['result']['uid'] users = http.request.env["res.users"].sudo().search([('id', '=', uid)]) return MakeResponse.success({ "loginName": users.login, "phonenumber": users.partner_id.phone or "15888888888", "label": users.label_ids.ids, "userName": users.partner_id.name, "userId": users.id, "pwdChanged": True, "email": users.partner_id.email, "session": session }) except Exception as e: _logger.error(e) return MakeResponse.error("账号/密码错误") @http.route('/tx_base/pub/getLoginUser', methods=['POST'], auth='public', csrf=False, website=True, cors="*") def get_login_user(self, **kwargs): try: loginName = http.request.params['loginName'] users = http.request.env["res.users"].sudo().search([("name", "=", loginName)], limit=1) MakeResponse.success({ "loginName": users.login, "phonenumber": users.partner_id.phone or "15888888888", "label": users.label_ids.ids or "", "userName": users.partner_id.name, "userId": users.id, "pwdChanged": True, "email": users.partner_id.email, }) except Exception as e: _logger.error(e) return MakeResponse.error("系统错误") @http.route('/tx_base/api/login/getLoginByLoginName', methods=['GET'], auth='public', csrf=False, website=True, cors="*") def get_login_by_login_name(self, **kwargs): try: loginName = http.request.params['loginName'] users = http.request.env["res.users"].sudo().search([("name", "=", loginName)], limit=1) MakeResponse.success({ "loginName": users.login, "phonenumber": users.partner_id.phone or "15888888888", "label": users.label_ids.ids or "", "userName": users.partner_id.name, "userId": users.id, "pwdChanged": True, "email": users.partner_id.email, }) except Exception as e: _logger.error(e) return MakeResponse.error("系统错误") @http.route('/tx_base/api/label/saveLabel', methods=['POST'], auth='public', csrf=False, website=True, cors="*") def save_label(self, **kwargs): try: params = kwargs or json.loads(http.request.httprequest.data.decode("utf-8")) userId = params['userId'] label = params['label'] users = http.request.env["res.users"].sudo().search([("id", "=", userId)], limit=1) users.write({ "label_ids": [(6, 0, [int(id) for id in label.split(",")])] }) return MakeResponse.success("添加成功") except Exception as e: _logger.error(e) return MakeResponse.error("系统错误") @http.route('/tx_base/register2', methods=['post'], auth='public', csrf=False, website=True, cors="*") def signup(self, **kwargs): try: login = http.request.params['loginName'] password = http.request.params['password'] confirm_password = http.request.params['againPassword'] user_type = http.request.params['type'] if password != confirm_password: return MakeResponse.error("两次密码不相同") group_portal = http.request.env['ir.model.data'].sudo()._xmlid_to_res_id('base.group_portal') user = http.request.env['res.users'].sudo().create({ "login": login, "name": login, "password": password, "groups_id": [(6, 0, [group_portal])] }) return MakeResponse.success({"userId": user.id, "UserName": user.name}) except Exception as e: _logger.error(e) return MakeResponse.error(e) @http.route('/tx/user/logout', methods=['post'], auth='user', csrf=False, sitemap=False, cors="*") def logout(self): http.request.session.logout(keep_db=True) return MakeResponse.success('退出登录') @http.route('/tx_base/api/dict/selectDictDataByType', type='http', auth="public", website=True, cors="*") def select_dict_data_by_type(self, dict_type, **kwargs): try: result = [] dict_obj = http.request.env["tx.data.dict"].sudo() dict_id = dict_obj.search([("code", "=", dict_type)], limit=1) if dict_id.__len__(): data_ids = dict_obj.search([("parent_id", "=", dict_id.id)]) for data_id in data_ids: result.append({"dictLabel": data_id.id, "dictValue": data_id.name}) return MakeResponse.success(result) except Exception as e: _logger.warning(e) return MakeResponse.error("服务器错误") @http.route("/tx_base/captcha/captchaImage", type='http', auth="public", website=True, cors="*") def captcha_image(self, **kwargs): rootPath = os.path.dirname(os.path.dirname(__file__)) s, v = gvcode.generate() image_str = "data:image/png;base64," originFilePath = os.path.join(rootPath, '%s.jpg' % v) s.save(originFilePath) with open(originFilePath, 'rb') as file: image_bs64 = base64.b64encode(file.read()) os.remove(originFilePath) image_data = f"""{image_str}{image_bs64.decode("utf-8")}""" return MakeResponse.success({"image": image_data, "code": v}) @http.route("/tx_base/pub/sendRegCode", methods=['POST'], auth='public', csrf=False, website=True, cors="*") def send_reg_code(self, **kw): try: template_code = 'SMS_242695354' phone_number = kw.get("loginName") data = send_sms(template_code, phone_number) return MakeResponse.success(data) except Exception as e: _logger.error(e) return MakeResponse.error("系统错误") # 有数了 -- 首页banner 列表 @http.route("/tx_base/api/describe/list", methods=['POST'], auth='public', csrf=False, website=True, cors="*") def describe_list(self, **kw): model_obj = http.request.env["tx.home.banner"].sudo() records = [] domain = [("home_id", "=", 1)] dataAll = model_obj.search(domain) for record in dataAll: records.append({ "pcFile": f"""{record.get_base_url()}{http.request.website.image_url(record, 'img')}""", "oneTitle": record.banner_one_title, "twoTitle": record.banner_two_title, "threeTitle": record.banner_three_title, "buttonName": record.banner_btn_name, "url": record.banner_btn_url, }) return MakeResponse.success(records) @http.route("/tx_base/api/upload/uploadFile", methods=['POST'], auth='public', csrf=False, website=True, cors="*") def upload_file(self, **kwargs): try: model_obj = http.request.env["ir.attachment"].sudo() file = kwargs.get("file", False) file_name = file.filename file_store_path = tools.config.filestore(http.request.env.cr.dbname) # 获取模块根目录 rootPath = os.path.dirname(os.path.dirname(__file__)) # 文件临时存放路径 temp_path = os.path.join(rootPath, "temp_file") if not os.path.exists(temp_path): os.mkdir(temp_path) temp_file = os.path.join(temp_path, file_name) with open(temp_file, "wb") as f: f.write(file.read()) f.flush() size = 0 with open(temp_file, "rb") as rb: file_data = rb.read() size = rb.seek(0, os.SEEK_END) attachment_id = model_obj.create({ "datas": base64.encodebytes(file_data), "type": "binary", "name": file_name, }) os.remove(temp_file) return MakeResponse.success({"realName": file_name, "url": "", "fileId": attachment_id.id, "size": size}) except Exception as e: _logger.error(e) return MakeResponse.error("上传失败") # 有数了 -- 中信联登录 @http.route("/tx_base/api/login/zxlLogin", methods=['POST'], auth='public', csrf=False, website=True, cors="*") def zxl_Login(self, **kw): params = json.loads(http.request.httprequest.data.decode("utf-8")) or {} records = requests.post(f"https://www.youshule.cn/base/api/login/zxlLogin", params) return MakeResponse.success(records)