Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Deprecate send_catch_log_deferred().#7161

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Draft
wRAR wants to merge6 commits intoscrapy:master
base:master
Choose a base branch
Loading
fromwRAR:send_catch_log_async
Draft
Show file tree
Hide file tree
Changes from1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
PrevPrevious commit
NextNext commit
Refactor FeedExporter.close_spider().
  • Loading branch information
@wRAR
wRAR committedNov 28, 2025
commit367f8bfec1be30e72ca1d848f3aa3589546a02af
77 changes: 37 additions & 40 deletionsscrapy/extensions/feedexport.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -6,37 +6,37 @@

from __future__ import annotations

import asyncio
import contextlib
import logging
import re
import sys
import warnings
from abc import ABC, abstractmethod
from collections.abc import Callable
from collections.abc import Callable, Coroutine
from datetime import datetime, timezone
from pathlib import Path, PureWindowsPath
from tempfile import NamedTemporaryFile
from typing import IO, TYPE_CHECKING, Any, Protocol, TypeAlias, cast
from urllib.parse import unquote, urlparse

from twisted.internet.defer import Deferred, DeferredList, maybeDeferred
from twisted.internet.defer import Deferred, DeferredList
from twisted.internet.threads import deferToThread
from w3lib.url import file_uri_to_path
from zope.interface import Interface, implementer

from scrapy import Spider, signals
from scrapy.exceptions import NotConfigured, ScrapyDeprecationWarning
from scrapy.extensions.postprocessing import PostProcessingManager
from scrapy.utils.asyncio import is_asyncio_available
from scrapy.utils.conf import feed_complete_default_values_from_settings
from scrapy.utils.defer importmaybe_deferred_to_future
from scrapy.utils.defer importdeferred_from_coro, ensure_awaitable
from scrapy.utils.ftp import ftp_store_file
from scrapy.utils.log import failure_to_exc_info
from scrapy.utils.misc import build_from_crawler, load_object
from scrapy.utils.python import without_none_values

if TYPE_CHECKING:
from _typeshed import OpenBinaryMode
from twisted.python.failure import Failure

# typing.Self requires Python 3.11
from typing_extensions import Self
Expand DownExpand Up@@ -431,7 +431,7 @@ def finish_exporting(self) -> None:


class FeedExporter:
_pending_deferreds: list[Deferred[None]] = []
_pending_close_coros: list[Coroutine[Any, Any,None]] = []

@classmethod
def from_crawler(cls, crawler: Crawler) -> Self:
Expand DownExpand Up@@ -506,17 +506,24 @@ def open_spider(self, spider: Spider) -> None:
)

async def close_spider(self, spider: Spider) -> None:
for slot in self.slots:
self._close_slot(slot, spider)
self._pending_close_coros.extend(
self._close_slot(slot, spider) for slot in self.slots
)

# Await all deferreds
if self._pending_deferreds:
await maybe_deferred_to_future(DeferredList(self._pending_deferreds))
if self._pending_close_coros:
if is_asyncio_available():
await asyncio.wait(
[asyncio.create_task(coro) for coro in self._pending_close_coros]
)
else:
await DeferredList(
deferred_from_coro(coro) for coro in self._pending_close_coros
)

# Send FEED_EXPORTER_CLOSED signal
await self.crawler.signals.send_catch_log_async(signals.feed_exporter_closed)

def _close_slot(self, slot: FeedSlot, spider: Spider) -> Deferred[None] | None:
asyncdef _close_slot(self, slot: FeedSlot, spider: Spider) -> None:
def get_file(slot_: FeedSlot) -> IO[bytes]:
assert slot_.file
if isinstance(slot_.file, PostProcessingManager):
Expand All@@ -533,38 +540,28 @@ def get_file(slot_: FeedSlot) -> IO[bytes]:
slot.finish_exporting()
else:
# In this case, the file is not stored, so no processing is required.
return None
return

logmsg = f"{slot.format} feed ({slot.itemcount} items) in: {slot.uri}"
d: Deferred[None] = maybeDeferred(slot.storage.store, get_file(slot)) # type: ignore[call-overload]

d.addCallback(
self._handle_store_success, logmsg, spider, type(slot.storage).__name__
)
d.addErrback(
self._handle_store_error, logmsg, spider, type(slot.storage).__name__
)
self._pending_deferreds.append(d)
d.addCallback(
lambda _: self.crawler.signals.send_catch_log_deferred(
signals.feed_slot_closed, slot=slot
slot_type = type(slot.storage).__name__
assert self.crawler.stats
try:
await ensure_awaitable(slot.storage.store(get_file(slot)))
except Exception:
logger.error(
"Error storing %s",
logmsg,
exc_info=True,
extra={"spider": spider},
)
)
d.addBoth(lambda _: self._pending_deferreds.remove(d))

return d
self.crawler.stats.inc_value(f"feedexport/failed_count/{slot_type}")
else:
logger.info("Stored %s", logmsg, extra={"spider": spider})
self.crawler.stats.inc_value(f"feedexport/success_count/{slot_type}")

def _handle_store_error(
self, f: Failure, logmsg: str, spider: Spider, slot_type: str
) -> None:
logger.error(
"Error storing %s",
logmsg,
exc_info=failure_to_exc_info(f),
extra={"spider": spider},
await self.crawler.signals.send_catch_log_async(
signals.feed_slot_closed, slot=slot
)
assert self.crawler.stats
self.crawler.stats.inc_value(f"feedexport/failed_count/{slot_type}")

def _handle_store_success(
self, result: Any, logmsg: str, spider: Spider, slot_type: str
Expand DownExpand Up@@ -627,7 +624,7 @@ def item_scraped(self, item: Any, spider: Spider) -> None:
uri_params = self._get_uri_params(
spider, self.feeds[slot.uri_template]["uri_params"], slot
)
self._close_slot(slot, spider)
self._pending_close_coros.append(self._close_slot(slot, spider))
slots.append(
self._start_new_batch(
batch_id=slot.batch_id + 1,
Expand Down
6 changes: 6 additions & 0 deletionstests/test_feedexport.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -2762,6 +2762,12 @@ def parse(self, response):
assert len(CustomS3FeedStorage.stubs) == len(items)
for stub in CustomS3FeedStorage.stubs[:-1]:
stub.assert_no_pending_responses()
assert (
"feedexport/success_count/CustomS3FeedStorage" in crawler.stats.get_stats()
)
assert (
crawler.stats.get_value("feedexport/success_count/CustomS3FeedStorage") == 3
)


# Test that the FeedExporer sends the feed_exporter_closed and feed_slot_closed signals
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp