По вашему вопросу
....И все же асинхронная функция Vkbottle работает намного быстрее, чем тот же Vk api.... Данное утверждение ошибочно во многих смыслах. Все таки подробнее изучите и не путайте Vk api и асинхронная функция Vkbottle.
Данный пример просто демонстрирует использование
aiohttp
asyncio
на данных библиотеках базируется Ваш Vkbottle. Прошу Вас изучить их подробно.
Далее прошу Вас создать 2 py файла
import asyncio
from asyncvkapi import AsyncVKApi
token_id = '**************************'
group_id = 100000001
bot = AsyncVKApi(token_id)
# И так на каждое сообщение или событие. для каждого варианта свои вункции и вариант в Декораторе
# Все это будет работать при минимальных описаниях. для полноценного бота надо использовать БД
@bot.command(text="ошибка")
async def test(ctx):
await ctx.bot.send(ctx.message.user_id, 'рассматриваем вашу ошибку"')
user_id = ctx.message.user_id
# если есть администратор и у вас есть его ИД user_id = то можете ему пересылать сообщения если все настройки VK
# и доступ позволяют это сделать
def check(ctx):
return user_id == ctx.message.user_id and ctx.message.text == '123'
try:
ctx = await ctx.bot.wait_for(check, timeout=10)
await ctx.bot.send(ctx.message.user_id, 'Ответ')
except asyncio.TimeoutError:
await ctx.bot.send(ctx.message.user_id, 'истекло время')
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
bot.run_bot(group_id)
а вот второй файл
from random import randint
import aiohttp
import asyncio
class ApiVKException(Exception):
def __init__(self, method, error):
super(Exception, self).__init__()
self.method = method
self.code = error['error_code']
self.error = error
def __str__(self):
return '[{}] {}'.format(self.error['error_code'],
self.error['error_message'])
class Msg:
def __init__(self, event):
self.user_id = event['from_id']
self.text = event['text']
class Ctx:
def __init__(self, msg, bot):
[self.bot](http://self.bot) = bot
self.message = msg
def get_random_id():
return randint(1, 10000000)
class AsyncVKApi:
def __init__(self, token, version=5.131):
self.token = token
self.base_url = '[https://api.vk.com/method/](https://api.vk.com/method/)'
self.version = version
self.wait = 30
self.key = None
self.server = None
self.ts = None
[self.events](http://self.events) = []
self.listeners = []
self.commands = []
# self.__extensions = {}
def command(self, text):
def decorator(fn):
new_command = {
'execute': fn,
'text': text
}
def wrapper(ctx):
fn(ctx)
self.commands.append(new_command)
return wrapper
return decorator
async def wait_for(self, check, timeout=60):
future = self.loop.create_future()
self.listeners.append((future, check))
return await asyncio.wait_for(future, timeout)
# Старт
def run_bot(self, group_id):
[asyncio.run](http://asyncio.run)(self.update_bot_longpool(group_id))
[asyncio.run](http://asyncio.run)(self.start_loop())
async def send(self, peer_id, msg, keyboard=None, attachment=None):
base_url = self.base_url + 'messages.send'
params = {'access_token': self.token,
'v': self.version,
'user_id': peer_id,
'message': msg,
'random_id': get_random_id(), }
if keyboard:
params['keyboard'] = keyboard
if attachment:
params['attachment'] = attachment
async with aiohttp.ClientSession() as session:
async with [session.post](http://session.post)(base_url, data=params) as resp:
print(await resp.json())
async def update_bot_longpool(self, group_id, update_ts=True):
resoinse_option = await self.method('groups.getLongPollServer', {'group_id': group_id})
self.key = resoinse_option['key']
self.server = resoinse_option['server']
if update_ts:
self.ts = resoinse_option['ts']
self.group_id = group_id
async def start_loop(self):
self.loop = asyncio.get_event_loop()
print('Ready')
async for ctx in self.listen():
self.dispatch(ctx)
async def method(self, method_name, args=None):
base_url = self.base_url + method_name
params = {'access_token': self.token, 'v': self.version}
if args:
params = {**params, **args}
async with aiohttp.ClientSession() as session:
async with [session.post](http://session.post)(base_url, data=params) as resp:
response = await resp.json()
# Ошибка
print(response)
if 'error' in response.keys():
error = ApiVKException(method_name, {'error_message': response, 'error_code': response["error"]["error_code"]})
raise error
return response['response']
def dispatch(self, ctx):
# dispatch new event for listeners if has
for listener in self.listeners:
if not listener[0].cancelled():
if listener[1](ctx):
listener[0].set_result(ctx)
self.listeners.remove(listener)
return
else:
self.listeners.remove(listener)
# else if no listeners
asyncio.create_task(self.on_message(ctx))
# Сравнение с атрибутом декоратора text
async def on_message(self, ctx):
for command in self.commands:
text_command = ctx.message.text.lower()
if text_command.startswith(command['text']):
await command['execute'](ctx)
async def listen(self):
""" Слушать сервер """
while True:
async for ctx in self.check():
yield ctx
async def check(self):
""" Настройки """
values = {
'act': 'a_check',
'key': self.key,
'ts': self.ts,
'wait': self.wait,
}
async with aiohttp.ClientSession() as session:
async with [session.post](http://session.post)(self.server, params=values, timeout=self.wait + 20) as resp:
response = await resp.json()
if 'failed' not in response:
self.ts = response['ts']
new_events = [raw_event for raw_event in response['updates']]
new_events = filter(lambda event: event['type'] == 'message_new', new_events)
for event in new_events:
new_msg = Msg(event['object']['message'])
yield Ctx(new_msg, self)
elif response['failed'] == 1:
self.ts = response['ts']
elif response['failed'] == 2:
await self.update_bot_longpool(self.group_id, update_ts=False)
elif response['failed'] == 3:
await self.update_bot_longpool(self.group_id, update_ts=False)
Вот будет реагировать и отсылать сообщения для сообщества или пользователя, смотря куда что вы подключите.
Меня нет в Контакте, поэтому пришлось создавать профиль, что занимает время.
Класс можете править как хотите главное понять смысл его работы