微信对接中的3个token

1.小程序access_token

  • APP_ID: 小程序
  • APP_SECRET: 小程序
def get_wx_access_token(ttl_timeout=60, timeout=30):
    # 1.从缓存中获取
    access_token_ttl = ttl_key(WX_ACCESS_TOKEN)
    if access_token_ttl and access_token_ttl > ttl_timeout:
        access_token = get_access_token_wx()
        logger.info("access_token:{} ttl:{} > {}s".format(
            access_token, access_token_ttl, ttl_timeout))
        return access_token, None

    # 2.请求微信服务器获取最新的access_token
    params = {
        "grant_type": "client_credential",
        "appid": settings.APP_ID,
        "secret": settings.APP_SECRET,
    }
    url = settings.WX_API_TOKEN
    logger.info("params:{}".format(params))
    try:
        resp = requests.get(params=params, url=url, timeout=timeout)
        data = resp.json()
    except Exception as e:
        msg = "{}".format(e)
        logger.error(msg, exc_info=True)
        return None, msg
    if "errcode" in data or "errmsg" in data:
        logger.error("data:{}".format(data))
        return None, data["errcode"]
    access_token = data["access_token"]
    expires_in = data["expires_in"]
    set_access_token_wx(access_token, expires_in)
    logger.info("set_access_token_wx ok:{}".format(access_token, expires_in))
    return data["access_token"], None

2.订阅号access_token

# 订阅号使用
def _post(url=None, json_data=None):
    access_token_subscribe = cm.get_access_token_wx_subscribe()
    if not access_token_subscribe:
        client = WeChatClient(settings.WECHAT_APPID, settings.WECHAT_SECRET)
        access_token_subscribe = client.access_token
        timestamp = time.time()
        expires_at = client.expires_at - timestamp
        logger.info(
            "new access_token_subscribe:{} expires_at:{}".format(
                access_token_subscribe, expires_at))
        cm.set_access_token_wx_subscribe(
            access_token_subscribe, expires_at)
    else:
        logger.info("cache access_token_subscribe:{}".format(
            access_token_subscribe))
    params = {
        "access_token": access_token_subscribe,
    }
    logger.debug("url:{} params:{} json:{}".format(
        url, params, json_data))
    try:
        resp = requests.post(url, params=params, json=json_data)
        resp = resp.json()
    except Exception as e:
        logger.error("e:{}".format(e))
        resp = None
        return Response(make_response(data=resp, err_msg="{}".format(e)))
    if "errcode" in resp or "errmsg" in resp:
        return Response(make_response(data=resp))
    logger.info("resp:{}".format(resp))
    return Response(make_response(data=resp))

3.订阅号js ticket

class SignatureView(generics.GenericAPIView):
    permission_classes = ()
    authentication_classes = ()
    serializer_class = serializers.SignatureSerializer

    def post(self, request, *args, **kwargs):
        """
        微信JSAPI签名

        ---
        serializer: serializers.SignatureSerializer

        """
        serializer = serializers.SignatureSerializer(data=request.data)
        if serializer.is_valid():
            url = serializer.validated_data["url"]
            noncestr = "{0}{1}".format(datetime.datetime.now().strftime(
                "%Y%m%d%H%M%S"), random.randint(1000, 10000))
            timestamp = int(time.time())
            # url = url.strip("#")
            url = url.split("#")[0]
            logger.debug(
                "APPID:{} SECRET:{} url:{} noncestr:{} timestamp:{}".format(
                    settings.WECHAT_APPID, settings.WECHAT_SECRET, url,
                    noncestr, timestamp)
            )
            client = WeChatClient(settings.WECHAT_APPID,
                                  settings.WECHAT_SECRET)
            ticket = cm.get_ticket_wx()
            if not ticket:
                ticket = client.jsapi.get_ticket()
                logger.info("new ticket:{}".format(ticket))
                cm.set_ticket_wx(ticket, ticket["expires_in"])
            else:
                logger.info("cache ticket:{}".format(ticket))

            signature = client.jsapi.get_jsapi_signature(
                noncestr,
                ticket["ticket"],
                timestamp,
                url
            )
            data = {
                "noncestr": noncestr,
                "timestamp": timestamp,
                "url": url,
                "signature": signature,
            }
            return Response(make_response(data), status=status.HTTP_200_OK)
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

wechatpy 源码走读

wechatpy 订阅号access_token

1.获取access_token

from wechatpy.session.memorystorage import MemoryStorage

class BaseWeChatClient(object):
    # 抽象方法
    def fetch_access_token(self):
        raise NotImplementedError()

    @property
    def access_token(self):
        """ WeChat access token """
        access_token = self.session.get(self.access_token_key) # session 基本使用MemoryStorage存储
        if access_token:
            if not self.expires_at:
                # user provided access_token, just return it
                return access_token

            timestamp = time.time()
            if self.expires_at - timestamp > 60:
                return access_token				# 未过期直接返回

        self.fetch_access_token()
        return self.session.get(self.access_token_key)

2.刷新