Как я могу улучшить свой код, чтобы избежать выброса floodcontrol при работе с vk_api?

Рейтинг: 0Ответов: 0Опубликовано: 01.08.2025

Всем привет, я разрабатываю сервис для работы с vk. Ключевая задача этого сервиса - выкладывание историй в подключенные сообщества. Столкнулся с такой проблемой: Как не пытаюсь, все время вылетает FloodControl от vk_api. Разработку веду на ЯП Python. Причем все меры, которые я мог принять - уже приняты. Что на данный момент реализовано для попытки обхода:

  1. time.sleep после каждой отправки запроса от 0.2 до 0.8.
  2. Обработка ошибки FloodControl, time.sleep(х), где х - время, которое берётся из отправленной мне ошибки от vk_api.
  3. Пытался кешировать запрос для vk_api, но в этом случае ломалась нужная логика в выкладывании историй. Вот текущий код:
    Функции выкладывания видео
def upload_to_vk_and_save(video_io, group, access_token, source_type="content", ad_text=None, ad_url=None):
    for attemp in range(1, MAX_RETRIES + 1):
        try:
            logger.info(f"[AUTOPOST] Старт публикации {source_type} видео в группу {group.vk_group_id}")

            params = {
                "group_id": group.vk_group_id,
                "add_to_news": 1,
                "v": VK_API_VERSION,
                "access_token": access_token,
            }

            if source_type == "ad" and ad_text and ad_url:
                params["link_text"] = ad_text
                params["link_url"] = ad_url

            upload_response = requests.get("https://api.vk.com/method/stories.getVideoUploadServer", params=params).json()

            if "error" in upload_response:
                code = upload_response["error"].get("error_code")
                if code == 9:
                    retry_after = upload_response["error"].get("captcha_time", upload_response["error"].get("retry_after", BASE_DELAY))
                    logger.warning(f"[AUTOPOST] FloodControl on getVideoUploadServer, ждём {retry_after}s")
                    time.sleep(retry_after)
                    continue
                logger.error(f"[AUTOPOST] Error getVideoUploadServer: {upload_response}")
                return False

            upload_url = upload_response["response"]["upload_url"]
            files = {
                "video_file": ("story.mp4", video_io, "video/mp4")
            }

            upload_result = requests.post(upload_url, files=files).json()
            if "error" in upload_result:
                code = upload_response["error"].get("error_code")
                if code == 9:
                    retry_after = upload_result["error"].get("captcha_time", upload_result["error"].get("retry_after", BASE_DELAY))
                    logger.warning(f"[AUTOPOST] FloodControl on getVideoUploadServer, ждём {retry_after}s")
                    time.sleep(retry_after)
                    continue 
                logger.error(f"[AUTOPOST] Ошибка загрузки видео на upload_url: {upload_result}")
                return False

            upload_results_str = upload_result.get("response", {}).get("upload_result") or \
                                upload_result.get("response", {}).get("video")
            if not upload_results_str:
                logger.error(f"[AUTOPOST] Нет upload_result или video в ответе VK: {upload_result}")
                return False

            data = {
                "upload_results": upload_results_str,
                "v": VK_API_VERSION,
                "access_token": access_token,
            }
            if source_type == "ad" and ad_text and ad_url:
                data["link_text"] = ad_text
                data["link_url"]  = ad_url
                logger.info(f"[AUTOPOST] stories.save data for ad: {data}")

            save_response = requests.post("https://api.vk.com/method/stories.save", data=data).json()

            if "error" in save_response:
                logger.error(f"[AUTOPOST] Ошибка save от VK: {save_response}")
                return False

            if save_response.get("response", {}).get("count", 0) == 0:
                logger.warning(f"[AUTOPOST] VK stories.save не сохранил сторис: {save_response}")
                return False

            logger.info(f"✅ Успешно опубликовано {source_type} видео в группу {group.vk_group_id}")

            if source_type == "ad":
                response = save_response.get("response", {})
                items = response.get("items", [])
                if items:
                    story = items[0]
                    return {
                        "story_id": story.get("id"),
                        "story_owner_id": story.get("owner_id")
                    }
                else:
                    return {
                        "story_id": None,
                        "story_owner_id": None
                    }

            return True
        except Exception as e:
            logger.error(f"[AUTOPOST] Ошибка публикации видео в VK: {str(e)}")
            return False

def allow_request():
    now = time.time()
    with r.pipeline() as pipe:
        pipe.zadd(BUCKET, {now: now})
        pipe.zremrangebyscore(BUCKET, 0, now-1)
        pipe.zcard(BUCKET)
        pipe.expire(BUCKET, 1)
        count = pipe.execute()[-2]
    return count <= RATE
 

@shared_task
def auto_publish_videos():
    statuses = CommunityStatus.objects.filter(is_enabled=True)
    logger.info(f"Запуск автопубликации: включено {statuses.count()} сообществ")

    ad_cfg = get_dynamics_ad_config()
    base_delay = 5
    if not ad_cfg:
        logger.warning(f"[AUTOPOST] Нет рекламной конфигурации для ключа {ad_time_key}, реклама не будет опубликована.")
        ad_cfg = None  # Явно передадим None
    logger.info(f"[AUTOPOST] Реклманая конфигурация: {ad_cfg}")
    for i, status in enumerate(statuses):
        jitter = random.uniform(0.5, 1.5)
        delay = i * base_delay + jitter
        publish_to_group.apply_async(
            kwargs={"status_id": status.id, "ad_cfg": ad_cfg},
            countdown = delay
       )  # запускаем подзадачу


@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_backoff_max=600, retry_jitter=True, max_retries=10)
def publish_to_group(self, status_id, ad_cfg):
    session = boto3.session.Session()
    s3 = session.client(
        service_name='s3',
        aws_access_key_id=config('S3_ACCESS_KEY'),
        aws_secret_access_key=config('S3_SECRET_KEY'),
        endpoint_url=config('S3_ENDPOINT'),
        region_name='ru-7'
    )

    status = CommunityStatus.objects.select_related("group").get(id=status_id)
    group = status.group

    if not is_allowed(group.vk_group_id):
        logger.info(f"[AUTOPOST] Публикация в группу {group.vk_group_id} заблокирована (cooldown).")
        return
    
    group_token_obj = VKGroupToken.objects.filter(group_id=group.vk_group_id).first()
    if not group_token_obj:
        logger.warning(f"[AUTOPOST] Нет VKGroupToken для группы {group.vk_group_id}, пропускаем.")
        return

    access_token = group_token_obj.access_token
    published_count = 0

    # СНАЧАЛА РЕКЛАМА

    if ad_cfg:
        video_io = download_video_from_s3(s3, ad_cfg["filename"])
        if video_io:
            time.sleep(random.uniform(0.2, 0.8))
            upload_response = upload_to_vk_and_save(
                video_io,
                group,
                access_token,
                source_type="ad",
                ad_text=ad_cfg["link_text"],
                ad_url=ad_cfg["link_url"]
            )
            if isinstance(upload_response, dict):
                sid = upload_response.get("story_id")
                oid = upload_response.get("story_owner_id")
                if sid is not None and oid is not None:
                    PublishedAd.objects.create(
                        group=group,
                        ad_key=ad_cfg["filename"],
                        published_at=timezone.now(),
                        story_id=sid,
                        story_owner_id=oid,
                    )
                    published_count += 1

    for i, delay_minutes in enumerate([5, 10, 15]):
        publish_delayed_content.apply_async(
            kwargs={"status_id": status.id},
            countdown=delay_minutes * 60  # через 5, 10, 15 минут
        )

    if published_count:
        set_cooldown(group.vk_group_id)
        logger.info(f"[AUTOPOST] Всего опубликовано {published_count} видео в группу {group.vk_group_id}")


@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_kwargs={'max_retries': 3})
def publish_delayed_content(self, status_id):
    logger.info(f"[AUTOPOST] Старт отложенной публикации контентного видео. Task ID: {self.request.id}, Status ID: {status_id}")

    session = boto3.session.Session()
    s3 = session.client(
        service_name='s3',
        aws_access_key_id=config('S3_ACCESS_KEY'),
        aws_secret_access_key=config('S3_SECRET_KEY'),
        endpoint_url=config('S3_ENDPOINT'),
        region_name='ru-7'
    )

    status = CommunityStatus.objects.select_related("group").get(id=status_id)
    group = status.group

    group_token_obj = VKGroupToken.objects.filter(group_id=group.vk_group_id).first()
    if not group_token_obj:
        logger.warning(f"[AUTOPOST] Нет VKGroupToken для группы {group.vk_group_id}, пропускаем отложенную публикацию.")
        return

    access_token = group_token_obj.access_token

    used_video_ids = PublishedVideo.objects.filter(group=group).values_list("video_id", flat=True)
    available_videos = TelegramVideo.objects.filter(channel=status.current_source).exclude(id__in=used_video_ids)

    logger.info(f"[AUTOPOST] Найдено {available_videos.count()} доступных видео для группы {group.vk_group_id}")

    if not available_videos.exists():
        logger.warning(f"[AUTOPOST] Нет доступных видео для отложенной публикации в группу {group.vk_group_id}")
        return

    video = available_videos.order_by("?").first()
    key = video.file_path.replace(f"https://{config('S3_ENDPOINT').replace('https://', '')}/{config('S3_BUCKET_NAME')}/", "")
    
    logger.info(f"[AUTOPOST] Попытка загрузить видео {video.id} из S3 по ключу {key}")

    video_io = download_video_from_s3(s3, key)
    if not video_io:
        logger.error(f"[AUTOPOST] Не удалось загрузить видео {key} для отложенной публикации в группу {group.vk_group_id}")
        return
    time.sleep(random.uniform(0.2, 0.8))
    success = upload_to_vk_and_save(video_io, group, access_token, source_type="content")
    if success:
        PublishedVideo.objects.create(group=group, video=video)
        logger.info(f"[AUTOPOST] Отложенно опубликовано 1 контентное видео {video.id} в группу {group.vk_group_id}")


Планировщик задач для celery

app.conf.beat_schedule = {
    'test_schedule': {
        'task': 'commission.tasks.run_patch_task',
        'schedule': crontab(hour=10, minute=56),
        'options': {'expires': 3600 },
    },
    'auto_publish_morning': {
        'task': 'communities.tasks.auto_publish_videos',
        'schedule': crontab(hour=8, minute=0),
        'options': {'expires': 3600 * 4},
    },
    'auto_publish_afternoon': {
        'task': 'communities.tasks.auto_publish_videos',
        'schedule': crontab(hour=16, minute=0),
        'options': {'expires': 3600 * 4},
    },
    'auto_publish_evening': {
        'task': 'communities.tasks.auto_publish_videos',
        'schedule': crontab(hour=23, minute=59),
        'options': {'expires': 3600 * 4},
    },

    'daily_update_story_ratings': {
        'task': 'communities.tasks.daily_update_story_ratings',
        'schedule': crontab(hour=3, minute=30),
    },
}

Также используются access и refresh токены от vk, которые обновляются каждые 5 сек. Эти токены также храняться в БД.

Если кто-то сталкивался с подобной проблемой, или знает как решить проблему с floodcontrol, прошу рассказать о своем опыте.

Добавляю, что в поддержку vk я обращался, документацию читал. Поддержка отвечает по скрипту и не дает внятных ответов, в документации все прописано невнятно.

Ответы

Ответов пока нет.