# -*- coding: utf-8 -*- """ :authors: python273 :license: Apache License, Version 2.0, see LICENSE file :copyright: (c) 2019 python273 """ import json import logging import random import re import threading import time import urllib.parse from hashlib import md5 import requests import jconfig from .enums import VkUserPermissions from .exceptions import * from .utils import ( code_from_number, search_re, clear_string, cookies_to_list, set_cookies_from_list ) RE_LOGIN_TO = re.compile(r'"to":"(.*?)"') RE_LOGIN_IP_H = re.compile(r'name="ip_h" value="([a-z0-9]+)"') RE_LOGIN_LG_H = re.compile(r'name="lg_h" value="([a-z0-9]+)"') RE_LOGIN_LG_DOMAIN_H = re.compile(r'name="lg_domain_h" value="([a-z0-9]+)"') RE_CAPTCHAID = re.compile(r"onLoginCaptcha\('(\d+)'") RE_NUMBER_HASH = re.compile(r"al_page: '3', hash: '([a-z0-9]+)'") RE_AUTH_HASH = re.compile(r"Authcheck\.init\('([a-z_0-9]+)'") RE_TOKEN_URL = re.compile(r'location\.href = "(.*?)"\+addr;') RE_PHONE_PREFIX = re.compile(r'label ta_r">\+(.*?)<') RE_PHONE_POSTFIX = re.compile(r'phone_postfix">.*?(\d+).*?<') DEFAULT_USERAGENT = 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0' DEFAULT_USER_SCOPE = sum(VkUserPermissions) def get_unknown_exc_str(s): return ( f'Unknown error ({s}). Please send a bugreport to GitHub: ' 'https://github.com/python273/vk_api/issues' ) class VkApi(object): """ :param login: Логин ВКонтакте (лучше использовать номер телефона для автоматического обхода проверки безопасности) :type login: str :param password: Пароль ВКонтакте (если пароль не передан, то будет попытка использовать сохраненные данные) :type password: str :param token: access_token :type token: str :param auth_handler: Функция для обработки двухфакторной аутентификации, должна возвращать строку с кодом и булево значение, означающее, стоит ли запомнить это устройство, для прохождения аутентификации. :param captcha_handler: Функция для обработки капчи, см. :func:`captcha_handler` :param config: Класс для сохранения настроек :type config: :class:`jconfig.base.BaseConfig` :param config_filename: Расположение config файла для :class:`jconfig.config.Config` :param api_version: Версия API :type api_version: str :param app_id: app_id Standalone-приложения :type app_id: int :param scope: Запрашиваемые права, можно передать строкой или числом. См. :class:`VkUserPermissions` :type scope: int or str :param client_secret: Защищенный ключ приложения для Client Credentials Flow авторизации приложения (https://vk.com/dev/client_cred_flow). Внимание: Этот способ авторизации устарел, рекомендуется использовать сервисный ключ из настроек приложения. `login` и `password` необходимы для автоматического получения токена при помощи Implicit Flow авторизации пользователя и возможности работы с веб-версией сайта (включая :class:`vk_api.audio.VkAudio`) :param session: Кастомная сессия со своими параметрами(из библиотеки requests) :type session: :class:`requests.Session` """ RPS_DELAY = 0.34 # ~3 requests per second def __init__(self, login=None, password=None, token=None, auth_handler=None, captcha_handler=None, config=jconfig.Config, config_filename='vk_config.v2.json', api_version='5.92', app_id=6222115, scope=DEFAULT_USER_SCOPE, client_secret=None, session=None): self.login = login self.password = password self.token = {'access_token': token} self.api_version = api_version self.app_id = app_id self.scope = scope self.client_secret = client_secret self.storage = config(self.login, filename=config_filename) self.http = session or requests.Session() if not session: self.http.headers['User-agent'] = DEFAULT_USERAGENT self.last_request = 0.0 self.error_handlers = { NEED_VALIDATION_CODE: self.need_validation_handler, CAPTCHA_ERROR_CODE: captcha_handler or self.captcha_handler, TOO_MANY_RPS_CODE: self.too_many_rps_handler, TWOFACTOR_CODE: auth_handler or self.auth_handler } self.lock = threading.Lock() self.logger = logging.getLogger('vk_api') @property def _sid(self): return ( self.http.cookies.get('remixsid', domain='.vk.com') or self.http.cookies.get('remixsid6', domain='.vk.com') or self.http.cookies.get('remixsid', domain='.vk.ru') or self.http.cookies.get('remixsid6', domain='.vk.ru') ) def auth(self, reauth=False, token_only=False): """ Аутентификация :param reauth: Позволяет переавторизоваться, игнорируя сохраненные куки и токен :param token_only: Включает оптимальную стратегию аутентификации, если необходим только access_token Например если сохраненные куки не валидны, но токен валиден, то аутентификация пройдет успешно При token_only=False, сначала проверяется валидность куки. Если кука не будет валидна, то будет произведена попытка аутетификации с паролем. Тогда если пароль не верен или пароль не передан, то аутентификация закончится с ошибкой. Если вы не делаете запросы к веб версии сайта используя куки, то лучше использовать token_only=True """ if not self.login: raise LoginRequired('Login is required to auth') self.logger.info('Auth with login: {}'.format(self.login)) set_cookies_from_list( self.http.cookies, self.storage.setdefault('cookies', []) ) self.token = self.storage.setdefault( 'token', {} ).setdefault( 'app' + str(self.app_id), {} ).get('scope_' + str(self.scope)) if token_only: self._auth_token(reauth=reauth) else: self._auth_cookies(reauth=reauth) def _auth_cookies(self, reauth=False): if reauth: self.logger.info('Auth forced') self.storage.clear_section() self._vk_login() self._api_login() return if not self.check_sid(): self.logger.info( 'remixsid from config is not valid: {}'.format( self._sid ) ) self._vk_login() else: self._pass_security_check() if not self._check_token(): self.logger.info( 'access_token from config is not valid: {}'.format( self.token ) ) self._api_login() else: self.logger.info('access_token from config is valid') def _auth_token(self, reauth=False): if not reauth and self._check_token(): self.logger.info('access_token from config is valid') return if reauth: self.logger.info('Auth (API) forced') if self.check_sid(): self._pass_security_check() self._api_login() elif self.password: self._vk_login() self._api_login() def _vk_login(self, captcha_sid=None, captcha_key=None): """ Авторизация ВКонтакте с получением cookies remixsid :param captcha_sid: id капчи :type captcha_key: int or str :param captcha_key: ответ капчи :type captcha_key: str """ self.logger.info('Logging in...') if not self.password: raise PasswordRequired('Password is required to login') self.http.cookies.clear() # Get cookies response = self.http.get('https://vk.com/login') if response.url.startswith('https://vk.com/429.html?'): hash429_md5 = md5(self.http.cookies['hash429'].encode('ascii')).hexdigest() self.http.cookies.pop('hash429') response = self.http.get(f'{response.url}&key={hash429_md5}') headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Referer': 'https://vk.com/', 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': 'https://vk.com', } values = { 'act': 'login', 'role': 'al_frame', 'expire': '', 'to': search_re(RE_LOGIN_TO, response.text), 'recaptcha': '', 'captcha_sid': '', 'captcha_key': '', '_origin': 'https://vk.com', 'utf8': '1', 'ip_h': search_re(RE_LOGIN_IP_H, response.text), 'lg_h': search_re(RE_LOGIN_LG_H, response.text), 'lg_domain_h': search_re(RE_LOGIN_LG_DOMAIN_H, response.text), 'ul': '', 'email': self.login, 'pass': self.password } if captcha_sid and captcha_key: self.logger.info( 'Using captcha code: {}: {}'.format( captcha_sid, captcha_key ) ) values['captcha_sid'] = captcha_sid values['captcha_key'] = captcha_key response = self.http.post( 'https://login.vk.com/?act=login', data=values, headers=headers ) if 'onLoginCaptcha(' in response.text: self.logger.info('Captcha code is required') captcha_sid = search_re(RE_CAPTCHAID, response.text) captcha = Captcha(self, captcha_sid, self._vk_login) return self.error_handlers[CAPTCHA_ERROR_CODE](captcha) if 'onLoginReCaptcha(' in response.text: self.logger.info('Captcha code is required (recaptcha)') captcha_sid = str(random.random())[2:16] captcha = Captcha(self, captcha_sid, self._vk_login) return self.error_handlers[CAPTCHA_ERROR_CODE](captcha) if 'onLoginFailed(4' in response.text: raise BadPassword('Bad password') if 'act=authcheck' in response.text: self.logger.info('2FA is required') response = self.http.get('https://vk.com/login?act=authcheck') self._pass_twofactor(response) if self._sid: self.logger.info('Got remixsid') self.storage.cookies = cookies_to_list(self.http.cookies) self.storage.save() else: raise AuthError(get_unknown_exc_str('AUTH; no sid')) response = self._pass_security_check(response) if 'act=blocked' in response.url: raise AccountBlocked('Account is blocked') def _pass_twofactor(self, auth_response): """ Двухфакторная аутентификация :param auth_response: страница с приглашением к аутентификации """ auth_hash = search_re(RE_AUTH_HASH, auth_response.text) if not auth_hash: raise TwoFactorError(get_unknown_exc_str('2FA; no hash')) code, remember_device = self.error_handlers[TWOFACTOR_CODE]() values = { 'al': '1', 'code': code, 'hash': auth_hash, 'remember': int(remember_device), } response = self.http.post( 'https://vk.com/al_login.php?act=a_authcheck_code', values ) data = json.loads(response.text.lstrip('