wechatpy.pay 源代码

# -*- coding: utf-8 -*-

import inspect
import logging

import requests
import xmltodict
from xml.parsers.expat import ExpatError
from optionaldict import optionaldict

from wechatpy.crypto import WeChatRefundCrypto
from wechatpy.utils import random_string
from wechatpy.exceptions import WeChatPayException, InvalidSignatureException
from wechatpy.pay.utils import (
    calculate_signature,
    calculate_signature_hmac,
    _check_signature,
    dict_to_xml,
)
from wechatpy.pay.api.base import BaseWeChatPayAPI
from wechatpy.pay import api

logger = logging.getLogger(__name__)


def _is_api_endpoint(obj):
    return isinstance(obj, BaseWeChatPayAPI)


[文档]class WeChatPay: """ 微信支付接口 :param appid: 微信公众号 appid :param sub_appid: 当前调起支付的小程序APPID :param api_key: 商户 key,不要在这里使用小程序的密钥 :param mch_id: 商户号 :param sub_mch_id: 可选,子商户号,受理模式下必填 :param mch_cert: 必填,商户证书路径 :param mch_key: 必填,商户证书私钥路径 :param timeout: 可选,请求超时时间,单位秒,默认无超时设置 :param sandbox: 可选,是否使用测试环境,默认为 False """ redpack = api.WeChatRedpack() """红包接口""" transfer = api.WeChatTransfer() """企业付款接口""" coupon = api.WeChatCoupon() """代金券接口""" order = api.WeChatOrder() """订单接口""" refund = api.WeChatRefund() """退款接口""" micropay = api.WeChatMicroPay() """刷卡支付接口""" tools = api.WeChatTools() """工具类接口""" jsapi = api.WeChatJSAPI() """公众号网页 JS 支付接口""" withhold = api.WeChatWithhold() """代扣接口""" app_auth = api.WeChatAppAuth() """实名认证接口""" profit_sharing = api.WechatProfitSharing() """分账接口""" API_BASE_URL = "https://api.mch.weixin.qq.com/" def __new__(cls, *args, **kwargs): self = super().__new__(cls) api_endpoints = inspect.getmembers(self, _is_api_endpoint) for name, _api in api_endpoints: api_cls = type(_api) _api = api_cls(self) setattr(self, name, _api) return self def __init__( self, appid, api_key, mch_id, sub_mch_id=None, mch_cert=None, mch_key=None, timeout=None, sandbox=False, sub_appid=None, ): self.appid = appid self.sub_appid = sub_appid self.api_key = api_key self.mch_id = mch_id self.sub_mch_id = sub_mch_id self.mch_cert = mch_cert self.mch_key = mch_key self._using_pkcs12_cert = False self.timeout = timeout self.sandbox = sandbox self._sandbox_api_key = None self._http = requests.Session() if mch_cert and mch_cert.endswith(".p12"): from requests_pkcs12 import Pkcs12Adapter # 商户 .p12 格式证书,证书密码默认为商户 ID self._http.mount( self.API_BASE_URL, Pkcs12Adapter(pkcs12_filename=self.mch_cert, pkcs12_password=self.mch_id) ) self._using_pkcs12_cert = True def _fetch_sandbox_api_key(self): nonce_str = random_string(32) sign = calculate_signature({"mch_id": self.mch_id, "nonce_str": nonce_str}, self.api_key) payload = dict_to_xml( { "mch_id": self.mch_id, "nonce_str": nonce_str, }, sign=sign, ) headers = {"Content-Type": "text/xml"} api_url = f"{self.API_BASE_URL}sandboxnew/pay/getsignkey" response = self._http.post(api_url, data=payload, headers=headers) return xmltodict.parse(response.text)["xml"].get("sandbox_signkey") def _request(self, method, url_or_endpoint, **kwargs): if not url_or_endpoint.startswith(("http://", "https://")): api_base_url = kwargs.pop("api_base_url", self.API_BASE_URL) if self.sandbox: api_base_url = f"{api_base_url}sandboxnew/" url = f"{api_base_url}{url_or_endpoint}" else: url = url_or_endpoint if isinstance(kwargs.get("data", ""), dict): data = kwargs["data"] if "mchid" not in data: # Fuck Tencent data.setdefault("mch_id", self.mch_id) data.setdefault("sub_mch_id", self.sub_mch_id) data.setdefault("nonce_str", random_string(32)) data = optionaldict(data) if data.get("sign_type", "MD5") == "HMAC-SHA256": sign = calculate_signature_hmac(data, self.sandbox_api_key if self.sandbox else self.api_key) else: sign = calculate_signature(data, self.sandbox_api_key if self.sandbox else self.api_key) body = dict_to_xml(data, sign) body = body.encode("utf-8") kwargs["data"] = body # 商户 PEM 证书 if not self._using_pkcs12_cert and self.mch_cert and self.mch_key: kwargs["cert"] = (self.mch_cert, self.mch_key) kwargs["timeout"] = kwargs.get("timeout", self.timeout) logger.debug("Request to WeChat API: %s %s\n%s", method, url, kwargs) res = self._http.request(method=method, url=url, **kwargs) try: res.raise_for_status() except requests.RequestException as reqe: raise WeChatPayException( return_code=None, client=self, request=reqe.request, response=reqe.response, ) return self._handle_result(res) def _handle_result(self, res): res.encoding = "utf-8-sig" xml = res.text logger.debug("Response from WeChat API \n %s", xml) try: data = xmltodict.parse(xml)["xml"] except (xmltodict.ParsingInterrupted, ExpatError): # 解析 XML 失败 logger.debug("WeChat payment result xml parsing error", exc_info=True) return xml return_code = data["return_code"] return_msg = data.get("return_msg", data.get("retmsg")) result_code = data.get("result_code", data.get("retcode")) errcode = data.get("err_code") errmsg = data.get("err_code_des") if return_code != "SUCCESS" or result_code != "SUCCESS": # 返回状态码不为成功 raise WeChatPayException( return_code, result_code, return_msg, errcode, errmsg, client=self, request=res.request, response=res, ) return data def get(self, url, **kwargs): return self._request(method="get", url_or_endpoint=url, **kwargs) def post(self, url, **kwargs): return self._request(method="post", url_or_endpoint=url, **kwargs) def check_signature(self, params): return _check_signature(params, self.api_key if not self.sandbox else self.sandbox_api_key)
[文档] @classmethod def get_payment_data(cls, xml): """ 解析微信支付结果通知,获得appid, mch_id, out_trade_no, transaction_id 如果你需要进一步判断,请先用appid, mch_id来生成WeChatPay, 然后用`wechatpay.parse_payment_result(xml)`来校验支付结果 使用示例:: from wechatpy.pay import WeChatPay # 假设你已经获取了微信服务器推送的请求中的xml数据并存入xml变量 data = WeChatPay.get_payment_appid(xml) { "appid": "公众号或者小程序的id", "mch_id": "商户id", } """ try: data = xmltodict.parse(xml) except (xmltodict.ParsingInterrupted, ExpatError): raise ValueError("invalid xml") if not data or "xml" not in data: raise ValueError("invalid xml") data = data["xml"] return { "appid": data["appid"], "mch_id": data["mch_id"], "out_trade_no": data["out_trade_no"], "transaction_id": data["transaction_id"], }
[文档] def parse_payment_result(self, xml): """解析微信支付结果通知""" try: data = xmltodict.parse(xml) except (xmltodict.ParsingInterrupted, ExpatError): raise InvalidSignatureException() if not data or "xml" not in data: raise InvalidSignatureException() data = data["xml"] sign = data.pop("sign", None) real_sign = calculate_signature(data, self.api_key if not self.sandbox else self.sandbox_api_key) if sign != real_sign: raise InvalidSignatureException() for key in ( "total_fee", "settlement_total_fee", "cash_fee", "coupon_fee", "coupon_count", ): if key in data: data[key] = int(data[key]) data["sign"] = sign return data
[文档] def parse_refund_notify_result(self, xml): """解析微信退款结果通知""" refund_crypto = WeChatRefundCrypto(self.api_key if not self.sandbox else self.sandbox_api_key) data = refund_crypto.decrypt_message(xml, self.appid, self.mch_id) for key in ( "total_fee", "settlement_total_fee", "refund_fee", "settlement_refund_fee", ): if key in data: data[key] = int(data[key]) return data
@property def sandbox_api_key(self): if self.sandbox and self._sandbox_api_key is None: self._sandbox_api_key = self._fetch_sandbox_api_key() return self._sandbox_api_key