Как я могу улучшить свой код, чтобы избежать выброса floodcontrol при работе с vk_api?
Всем привет, я разрабатываю сервис для работы с vk. Ключевая задача этого сервиса - выкладывание историй в подключенные сообщества. Столкнулся с такой проблемой: Как не пытаюсь, все время вылетает FloodControl от vk_api. Разработку веду на ЯП Python. Причем все меры, которые я мог принять - уже приняты. Что на данный момент реализовано для попытки обхода:
- time.sleep после каждой отправки запроса от 0.2 до 0.8.
- Обработка ошибки FloodControl, time.sleep(х), где х - время, которое берётся из отправленной мне ошибки от vk_api.
- Пытался кешировать запрос для vk_api, но в этом случае ломалась нужная логика в выкладывании историй.
Вот текущий код:
Функции выкладывания видео
def upload_to_vk_and_save(video_io, group, access_token, source_type="content", ad_text=None, ad_url=None):
for attemp in range(1, MAX_RETRIES + 1):
try:
logger.info(f"[AUTOPOST] Старт публикации {source_type} видео в группу {group.vk_group_id}")
params = {
"group_id": group.vk_group_id,
"add_to_news": 1,
"v": VK_API_VERSION,
"access_token": access_token,
}
if source_type == "ad" and ad_text and ad_url:
params["link_text"] = ad_text
params["link_url"] = ad_url
upload_response = requests.get("https://api.vk.com/method/stories.getVideoUploadServer", params=params).json()
if "error" in upload_response:
code = upload_response["error"].get("error_code")
if code == 9:
retry_after = upload_response["error"].get("captcha_time", upload_response["error"].get("retry_after", BASE_DELAY))
logger.warning(f"[AUTOPOST] FloodControl on getVideoUploadServer, ждём {retry_after}s")
time.sleep(retry_after)
continue
logger.error(f"[AUTOPOST] Error getVideoUploadServer: {upload_response}")
return False
upload_url = upload_response["response"]["upload_url"]
files = {
"video_file": ("story.mp4", video_io, "video/mp4")
}
upload_result = requests.post(upload_url, files=files).json()
if "error" in upload_result:
code = upload_response["error"].get("error_code")
if code == 9:
retry_after = upload_result["error"].get("captcha_time", upload_result["error"].get("retry_after", BASE_DELAY))
logger.warning(f"[AUTOPOST] FloodControl on getVideoUploadServer, ждём {retry_after}s")
time.sleep(retry_after)
continue
logger.error(f"[AUTOPOST] Ошибка загрузки видео на upload_url: {upload_result}")
return False
upload_results_str = upload_result.get("response", {}).get("upload_result") or \
upload_result.get("response", {}).get("video")
if not upload_results_str:
logger.error(f"[AUTOPOST] Нет upload_result или video в ответе VK: {upload_result}")
return False
data = {
"upload_results": upload_results_str,
"v": VK_API_VERSION,
"access_token": access_token,
}
if source_type == "ad" and ad_text and ad_url:
data["link_text"] = ad_text
data["link_url"] = ad_url
logger.info(f"[AUTOPOST] stories.save data for ad: {data}")
save_response = requests.post("https://api.vk.com/method/stories.save", data=data).json()
if "error" in save_response:
logger.error(f"[AUTOPOST] Ошибка save от VK: {save_response}")
return False
if save_response.get("response", {}).get("count", 0) == 0:
logger.warning(f"[AUTOPOST] VK stories.save не сохранил сторис: {save_response}")
return False
logger.info(f"✅ Успешно опубликовано {source_type} видео в группу {group.vk_group_id}")
if source_type == "ad":
response = save_response.get("response", {})
items = response.get("items", [])
if items:
story = items[0]
return {
"story_id": story.get("id"),
"story_owner_id": story.get("owner_id")
}
else:
return {
"story_id": None,
"story_owner_id": None
}
return True
except Exception as e:
logger.error(f"[AUTOPOST] Ошибка публикации видео в VK: {str(e)}")
return False
def allow_request():
now = time.time()
with r.pipeline() as pipe:
pipe.zadd(BUCKET, {now: now})
pipe.zremrangebyscore(BUCKET, 0, now-1)
pipe.zcard(BUCKET)
pipe.expire(BUCKET, 1)
count = pipe.execute()[-2]
return count <= RATE
@shared_task
def auto_publish_videos():
statuses = CommunityStatus.objects.filter(is_enabled=True)
logger.info(f"Запуск автопубликации: включено {statuses.count()} сообществ")
ad_cfg = get_dynamics_ad_config()
base_delay = 5
if not ad_cfg:
logger.warning(f"[AUTOPOST] Нет рекламной конфигурации для ключа {ad_time_key}, реклама не будет опубликована.")
ad_cfg = None # Явно передадим None
logger.info(f"[AUTOPOST] Реклманая конфигурация: {ad_cfg}")
for i, status in enumerate(statuses):
jitter = random.uniform(0.5, 1.5)
delay = i * base_delay + jitter
publish_to_group.apply_async(
kwargs={"status_id": status.id, "ad_cfg": ad_cfg},
countdown = delay
) # запускаем подзадачу
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_backoff_max=600, retry_jitter=True, max_retries=10)
def publish_to_group(self, status_id, ad_cfg):
session = boto3.session.Session()
s3 = session.client(
service_name='s3',
aws_access_key_id=config('S3_ACCESS_KEY'),
aws_secret_access_key=config('S3_SECRET_KEY'),
endpoint_url=config('S3_ENDPOINT'),
region_name='ru-7'
)
status = CommunityStatus.objects.select_related("group").get(id=status_id)
group = status.group
if not is_allowed(group.vk_group_id):
logger.info(f"[AUTOPOST] Публикация в группу {group.vk_group_id} заблокирована (cooldown).")
return
group_token_obj = VKGroupToken.objects.filter(group_id=group.vk_group_id).first()
if not group_token_obj:
logger.warning(f"[AUTOPOST] Нет VKGroupToken для группы {group.vk_group_id}, пропускаем.")
return
access_token = group_token_obj.access_token
published_count = 0
# СНАЧАЛА РЕКЛАМА
if ad_cfg:
video_io = download_video_from_s3(s3, ad_cfg["filename"])
if video_io:
time.sleep(random.uniform(0.2, 0.8))
upload_response = upload_to_vk_and_save(
video_io,
group,
access_token,
source_type="ad",
ad_text=ad_cfg["link_text"],
ad_url=ad_cfg["link_url"]
)
if isinstance(upload_response, dict):
sid = upload_response.get("story_id")
oid = upload_response.get("story_owner_id")
if sid is not None and oid is not None:
PublishedAd.objects.create(
group=group,
ad_key=ad_cfg["filename"],
published_at=timezone.now(),
story_id=sid,
story_owner_id=oid,
)
published_count += 1
for i, delay_minutes in enumerate([5, 10, 15]):
publish_delayed_content.apply_async(
kwargs={"status_id": status.id},
countdown=delay_minutes * 60 # через 5, 10, 15 минут
)
if published_count:
set_cooldown(group.vk_group_id)
logger.info(f"[AUTOPOST] Всего опубликовано {published_count} видео в группу {group.vk_group_id}")
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_kwargs={'max_retries': 3})
def publish_delayed_content(self, status_id):
logger.info(f"[AUTOPOST] Старт отложенной публикации контентного видео. Task ID: {self.request.id}, Status ID: {status_id}")
session = boto3.session.Session()
s3 = session.client(
service_name='s3',
aws_access_key_id=config('S3_ACCESS_KEY'),
aws_secret_access_key=config('S3_SECRET_KEY'),
endpoint_url=config('S3_ENDPOINT'),
region_name='ru-7'
)
status = CommunityStatus.objects.select_related("group").get(id=status_id)
group = status.group
group_token_obj = VKGroupToken.objects.filter(group_id=group.vk_group_id).first()
if not group_token_obj:
logger.warning(f"[AUTOPOST] Нет VKGroupToken для группы {group.vk_group_id}, пропускаем отложенную публикацию.")
return
access_token = group_token_obj.access_token
used_video_ids = PublishedVideo.objects.filter(group=group).values_list("video_id", flat=True)
available_videos = TelegramVideo.objects.filter(channel=status.current_source).exclude(id__in=used_video_ids)
logger.info(f"[AUTOPOST] Найдено {available_videos.count()} доступных видео для группы {group.vk_group_id}")
if not available_videos.exists():
logger.warning(f"[AUTOPOST] Нет доступных видео для отложенной публикации в группу {group.vk_group_id}")
return
video = available_videos.order_by("?").first()
key = video.file_path.replace(f"https://{config('S3_ENDPOINT').replace('https://', '')}/{config('S3_BUCKET_NAME')}/", "")
logger.info(f"[AUTOPOST] Попытка загрузить видео {video.id} из S3 по ключу {key}")
video_io = download_video_from_s3(s3, key)
if not video_io:
logger.error(f"[AUTOPOST] Не удалось загрузить видео {key} для отложенной публикации в группу {group.vk_group_id}")
return
time.sleep(random.uniform(0.2, 0.8))
success = upload_to_vk_and_save(video_io, group, access_token, source_type="content")
if success:
PublishedVideo.objects.create(group=group, video=video)
logger.info(f"[AUTOPOST] Отложенно опубликовано 1 контентное видео {video.id} в группу {group.vk_group_id}")
Планировщик задач для celery
app.conf.beat_schedule = {
'test_schedule': {
'task': 'commission.tasks.run_patch_task',
'schedule': crontab(hour=10, minute=56),
'options': {'expires': 3600 },
},
'auto_publish_morning': {
'task': 'communities.tasks.auto_publish_videos',
'schedule': crontab(hour=8, minute=0),
'options': {'expires': 3600 * 4},
},
'auto_publish_afternoon': {
'task': 'communities.tasks.auto_publish_videos',
'schedule': crontab(hour=16, minute=0),
'options': {'expires': 3600 * 4},
},
'auto_publish_evening': {
'task': 'communities.tasks.auto_publish_videos',
'schedule': crontab(hour=23, minute=59),
'options': {'expires': 3600 * 4},
},
'daily_update_story_ratings': {
'task': 'communities.tasks.daily_update_story_ratings',
'schedule': crontab(hour=3, minute=30),
},
}
Также используются access и refresh токены от vk, которые обновляются каждые 5 сек. Эти токены также храняться в БД.
Если кто-то сталкивался с подобной проблемой, или знает как решить проблему с floodcontrol, прошу рассказать о своем опыте.
Добавляю, что в поддержку vk я обращался, документацию читал. Поддержка отвечает по скрипту и не дает внятных ответов, в документации все прописано невнятно.