- Make a Session instead of issuing individual Requests
get
; this promotes explicit connection pooling, cookie sharing etc.
- There's no need for your current
print
s. If you find them to be of very high value, convert them into real logging calls
- Pre-define your script tag loading via a
SoupStrainer
- Use
urljoin
and centralize your root URL definition
- Do not keep
results
as a member; it's the result of a method call
- Do not represent results as a list; it can be an iterator so that results can be depaginated and streamed to disk while keeping memory occupation relatively low
- Parametrize your fetch function to represent the actual parameters on the web call
- Consider using PEP484 type hints
- Your
open
is missing newline=''
Suggested
from functools import partial
from typing import Any, Dict, Iterable, List
import json
import csv
from urllib.parse import urljoin
from bs4 import BeautifulSoup, SoupStrainer
from requests import Session
JSON = Dict[str, Any]
class ZooplaScraper:
ROOT = 'https://zoopla.co.uk'
from_root = partial(urljoin, ROOT)
def __init__(self):
self.session = Session()
strainer = SoupStrainer('script', id='__NEXT_DATA__')
self.load_script = partial(
BeautifulSoup, features='html.parser', parse_only=strainer,
)
def fetch(
self, query: str = 'London', radius: int = 0,
sort: str = 'newest_listings', page: int = 1,
) -> str:
with self.session.get(
self.from_root(f'for-sale/property/{query.lower()}/'),
params={
'page_size': 25,
'q': query,
'radius': radius,
'results_sort': sort,
'pn': page,
}
) as resp:
resp.raise_for_status()
return resp.text
def load(self, html: str) -> List[JSON]:
script = self.load_script(html)
data = json.loads(script.string)
return data['props']['initialProps']['pageProps']['regularListingsFormatted']
@classmethod
def serialise(cls, listings: Iterable[JSON]) -> Iterable[JSON]:
for listing in listings:
yield {
'listing_id': listing['listingId'],
'name_title': listing['title'],
'names': listing['branch']['name'],
'addresses': listing['address'],
'agent': cls.from_root(listing['branch']['branchDetailsUri']),
'phone_no': listing['branch']['phone'],
'picture': listing['image']['src'],
'prices': listing['price'],
'listed_on': listing['publishedOn'],
'listing_detail_link': cls.from_root(listing['listingUris']['detail']),
}
def run(
self,
query: str = 'London', radius: int = 0, sort: str = 'newest_listings',
) -> Iterable[JSON]:
for page in range(1, 5):
yield from self.serialise(
self.load(
self.fetch(query, radius, sort, page)
)
)
@staticmethod
def to_csv(results: Iterable[JSON], filename: str = 'zoopla.csv') -> None:
with open(filename, 'w', newline='') as csv_file:
first = next(results)
writer = csv.DictWriter(csv_file, fieldnames=first.keys())
writer.writeheader()
writer.writerow(first)
writer.writerows(results)
if __name__ == '__main__':
scraper = ZooplaScraper()
scraper.to_csv(scraper.run())
Experimental
This is an experimental, alternate implementation that:
- Streams the HTTP response and does not need the entire response content to complete
- Streams the parsed HTML elements and does not need the entire document tree to complete
- Streams the JSON body and does not need the entire dictionary tree to complete
It is somewhat iterator-heavy, and built more as a proof of concept to demonstrate that this is possible. Advantages include that worst-case memory usage should be reduced, and that BeautifulSoup is no longer needed. Disadvantages include that a new dependency, JsonSlicer, is needed; and this might introduce subtle HTTP inefficiencies from connections that are reset before complete response transmission.
import csv
import logging
from functools import partial
from html.parser import HTMLParser
from typing import Any, Dict, Iterable, Tuple, Optional
from urllib.parse import urljoin
from jsonslicer import JsonSlicer
from requests import Session, Response
JSON = Dict[str, Any]
class StreamParser(HTMLParser):
def __init__(self, resp: Response):
resp.raise_for_status() # If the response failed, it can't be parsed
self.resp = resp # Keep the response so we can stream from it
self.in_tag = False # Parser state: if we're in the script tag
self.done = False # Whether we're done the script tag
self.queue = [] # Queue of text element chunks in the script
super().__init__() # Initialize the base parser
def __enter__(self):
# Start the data chunk iterator
self.chunks = self.data_chunks()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
# When we're done, tell the HTTP response stream to close
self.resp.close()
def data_chunks(self) -> Iterable[str]:
# Stream in arbitrary-sized chunks from the response
for chunk in self.resp.iter_content(
chunk_size=None, # Get whatever chunks are sent our way
decode_unicode=True, # Needed for HTMLParser compatibility
):
logging.debug(
f'{len(chunk)}-character chunk: '
f'{chunk[:10]}...{chunk[-10:]}'
)
# Feed this chunk to the parser, which will in turn call our handle
# methods and populate the queue
self.feed(chunk)
yield from self.queue
self.queue.clear()
# We only care about one tag. Once that's parsed, we're done
# iterating
if self.done:
break
def read(self, n: Optional[int] = -1) -> str:
# Will be called by JsonSlicer. We only support partial reads for
# efficiency's sake; we do not build up our own buffer string.
if n is None or n < 0:
raise NotImplementedError('Read-to-end not supported')
try:
return next(self.chunks)
except StopIteration:
return '' # end of stream
def handle_starttag(self, tag: str, attrs: Iterable[Tuple[str, str]]):
self.in_tag = tag == 'script' and any(
k == 'id' and v == '__NEXT_DATA__' for k, v in attrs
)
def handle_data(self, data: str) -> None:
if self.in_tag:
self.queue.append(data)
def handle_endtag(self, tag: str) -> None:
if self.in_tag:
self.in_tag = False
self.done = True
def __iter__(self) -> Iterable[JSON]:
# Iterating over this object will magically produce individual listing
# dictionaries. We're an iterator; we delegate to the JsonSlicer
# iterator; and it in turn invokes read() which uses our data_chunks
# iterator.
return JsonSlicer(file=self, path_prefix=(
'props', 'initialProps', 'pageProps', 'regularListingsFormatted', None,
))
class ZooplaScraper:
ROOT = 'https://zoopla.co.uk'
from_root = partial(urljoin, ROOT)
def __init__(self):
self.session = Session()
def fetch(
self, query: str = 'London', radius: int = 0,
sort: str = 'newest_listings', page: int = 1,
) -> StreamParser:
resp = self.session.get(
self.from_root(f'for-sale/property/{query.lower()}/'),
params={
'page_size': 25,
'q': query,
'radius': radius,
'results_sort': sort,
'pn': page,
},
stream=True,
)
return StreamParser(resp)
@classmethod
def serialise(cls, listing: JSON) -> JSON:
# Convert from the site's representation of a listing dict to our own
return {
'listing_id': listing['listingId'],
'name_title': listing['title'],
'names': listing['branch']['name'],
'addresses': listing['address'],
'agent': cls.from_root(listing['branch']['branchDetailsUri']),
'phone_no': listing['branch']['phone'],
'picture': listing['image']['src'],
'prices': listing['price'],
'listed_on': listing['publishedOn'],
'listing_detail_link': cls.from_root(listing['listingUris']['detail']),
}
def run(
self,
query: str = 'London', radius: int = 0, sort: str = 'newest_listings',
max_pages: int = 4,
) -> Iterable[JSON]:
for page in range(1, max_pages + 1):
with self.fetch(query, radius, sort, page) as stream:
for n_listings, data in enumerate(stream):
yield self.serialise(data)
logging.info(f'Page {page}: {n_listings} listings')
@staticmethod
def to_csv(results: Iterable[JSON], filename: str = 'zoopla.csv') -> None:
with open(filename, 'w', newline='') as csv_file:
first = next(results)
writer = csv.DictWriter(csv_file, fieldnames=first.keys())
writer.writeheader()
writer.writerow(first)
writer.writerows(results)
logging.info(f'Write to {filename} complete')
if __name__ == '__main__':
# Will include debugging statements from urllib3
logging.basicConfig(level=logging.INFO) # Switch to DEBUG for more verbosity
scraper = ZooplaScraper()
scraper.to_csv(scraper.run())