3
\$\begingroup\$

I wrote a simple Zoopla real estate scraper for just practicing what I learned so far in Python, Requests, BeautifulSoup and overall web scraping fundamentals. By looking at my code I feel like there would be a better and more elegant way to write it but unfortunately as a beginner I don't know yet. So, I believe I should let experienced guys here at Stack Exchange to review my code for enhancement.

import requests
import json
import csv
import time
from bs4 import BeautifulSoup as bs

class ZooplaScraper:

    results = []

def fetch(self, url):
    print(f'HTTP GET request to URL: {url}', end='')
    res = requests.get(url)
    print(f' | Status code: {res.status_code}')

    return res

def parse(self, html):
    content = bs(html, 'html.parser')
    content_array = content.select('script[id="__NEXT_DATA__"]')
    content_dict = json.loads(content_array[0].string)
    content_details = content_dict['props']['initialProps']['pageProps']['regularListingsFormatted']

    for listing in content_details:
        self.results.append ({
            'listing_id': listing['listingId'],
            'name_title': listing['title'],
            'names': listing['branch']['name'],
            'addresses': listing['address'],
            'agent': 'https://zoopla.co.uk' + listing['branch']['branchDetailsUri'],
            'phone_no': listing['branch']['phone'],
            'picture': listing['image']['src'],
            'prices': listing['price'],
            'Listed_on': listing['publishedOn'],
            'listing_detail_link': 'https://zoopla.co.uk' + listing['listingUris']['detail']


        })

def to_csv(self):
    
    with open('zoopla.csv', 'w') as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames=self.results[0].keys())
        writer.writeheader()

        for row in self.results:
            writer.writerow(row)

        print('Stored results to "zoopla.csv"')

def run(self):

    for page in range(1, 5):
        url = 'https://www.zoopla.co.uk/for-sale/property/london/?page_size=25&q=London&radius=0&results_sort=newest_listings&pn='
        url += str(page)
        res = self.fetch(url)
        self.parse(res.text)
        time.sleep(2)
    self.to_csv()

  if __name__ == '__main__':
     scraper = ZooplaScraper()
     scraper.run()

Basically in this scraper I mostly did is JSON parsing. Problem was all the data on the website was coming from JavaScript under the script tag so I have to select that tag and then pass it to json.loads() and parse the JSON dict to find the right key-value pair.

\$\endgroup\$
1
  • 1
    \$\begingroup\$ Your code is incorrectly indented, particularly the class methods. \$\endgroup\$
    – Reinderien
    Commented May 20, 2021 at 15:32

1 Answer 1

1
\$\begingroup\$
  • Make a Session instead of issuing individual Requests get; this promotes explicit connection pooling, cookie sharing etc.
  • There's no need for your current prints. If you find them to be of very high value, convert them into real logging calls
  • Pre-define your script tag loading via a SoupStrainer
  • Use urljoin and centralize your root URL definition
  • Do not keep results as a member; it's the result of a method call
  • Do not represent results as a list; it can be an iterator so that results can be depaginated and streamed to disk while keeping memory occupation relatively low
  • Parametrize your fetch function to represent the actual parameters on the web call
  • Consider using PEP484 type hints
  • Your open is missing newline=''

Suggested

from functools import partial
from typing import Any, Dict, Iterable, List

import json
import csv
from urllib.parse import urljoin

from bs4 import BeautifulSoup, SoupStrainer
from requests import Session


JSON = Dict[str, Any]


class ZooplaScraper:
    ROOT = 'https://zoopla.co.uk'
    from_root = partial(urljoin, ROOT)

    def __init__(self):
        self.session = Session()

        strainer = SoupStrainer('script', id='__NEXT_DATA__')
        self.load_script = partial(
            BeautifulSoup, features='html.parser', parse_only=strainer,
        )

    def fetch(
        self, query: str = 'London', radius: int = 0,
        sort: str = 'newest_listings', page: int = 1,
    ) -> str:
        with self.session.get(
            self.from_root(f'for-sale/property/{query.lower()}/'),
            params={
                'page_size': 25,
                'q': query,
                'radius': radius,
                'results_sort': sort,
                'pn': page,
            }
        ) as resp:
            resp.raise_for_status()
            return resp.text

    def load(self, html: str) -> List[JSON]:
        script = self.load_script(html)
        data = json.loads(script.string)
        return data['props']['initialProps']['pageProps']['regularListingsFormatted']

    @classmethod
    def serialise(cls, listings: Iterable[JSON]) -> Iterable[JSON]:
        for listing in listings:
            yield {
                'listing_id': listing['listingId'],
                'name_title': listing['title'],
                'names': listing['branch']['name'],
                'addresses': listing['address'],
                'agent': cls.from_root(listing['branch']['branchDetailsUri']),
                'phone_no': listing['branch']['phone'],
                'picture': listing['image']['src'],
                'prices': listing['price'],
                'listed_on': listing['publishedOn'],
                'listing_detail_link': cls.from_root(listing['listingUris']['detail']),
            }

    def run(
        self,
        query: str = 'London', radius: int = 0, sort: str = 'newest_listings',
    ) -> Iterable[JSON]:
        for page in range(1, 5):
            yield from self.serialise(
                self.load(
                    self.fetch(query, radius, sort, page)
                )
            )

    @staticmethod
    def to_csv(results: Iterable[JSON], filename: str = 'zoopla.csv') -> None:
        with open(filename, 'w', newline='') as csv_file:
            first = next(results)
            writer = csv.DictWriter(csv_file, fieldnames=first.keys())
            writer.writeheader()
            writer.writerow(first)
            writer.writerows(results)


if __name__ == '__main__':
    scraper = ZooplaScraper()
    scraper.to_csv(scraper.run())

Experimental

This is an experimental, alternate implementation that:

  • Streams the HTTP response and does not need the entire response content to complete
  • Streams the parsed HTML elements and does not need the entire document tree to complete
  • Streams the JSON body and does not need the entire dictionary tree to complete

It is somewhat iterator-heavy, and built more as a proof of concept to demonstrate that this is possible. Advantages include that worst-case memory usage should be reduced, and that BeautifulSoup is no longer needed. Disadvantages include that a new dependency, JsonSlicer, is needed; and this might introduce subtle HTTP inefficiencies from connections that are reset before complete response transmission.

import csv
import logging
from functools import partial
from html.parser import HTMLParser
from typing import Any, Dict, Iterable, Tuple, Optional
from urllib.parse import urljoin

from jsonslicer import JsonSlicer
from requests import Session, Response

JSON = Dict[str, Any]


class StreamParser(HTMLParser):
    def __init__(self, resp: Response):
        resp.raise_for_status()  # If the response failed, it can't be parsed
        self.resp = resp         # Keep the response so we can stream from it
        self.in_tag = False      # Parser state: if we're in the script tag
        self.done = False        # Whether we're done the script tag
        self.queue = []          # Queue of text element chunks in the script
        super().__init__()       # Initialize the base parser

    def __enter__(self):
        # Start the data chunk iterator
        self.chunks = self.data_chunks()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        # When we're done, tell the HTTP response stream to close
        self.resp.close()

    def data_chunks(self) -> Iterable[str]:
        # Stream in arbitrary-sized chunks from the response
        for chunk in self.resp.iter_content(
            chunk_size=None,  # Get whatever chunks are sent our way
            decode_unicode=True,  # Needed for HTMLParser compatibility
        ):
            logging.debug(
                f'{len(chunk)}-character chunk: '
                f'{chunk[:10]}...{chunk[-10:]}'
            )
            # Feed this chunk to the parser, which will in turn call our handle
            # methods and populate the queue
            self.feed(chunk)
            yield from self.queue
            self.queue.clear()

            # We only care about one tag. Once that's parsed, we're done
            # iterating
            if self.done:
                break

    def read(self, n: Optional[int] = -1) -> str:
        # Will be called by JsonSlicer. We only support partial reads for
        # efficiency's sake; we do not build up our own buffer string.
        if n is None or n < 0:
            raise NotImplementedError('Read-to-end not supported')
        try:
            return next(self.chunks)
        except StopIteration:
            return ''  # end of stream

    def handle_starttag(self, tag: str, attrs: Iterable[Tuple[str, str]]):
        self.in_tag = tag == 'script' and any(
            k == 'id' and v == '__NEXT_DATA__' for k, v in attrs
        )

    def handle_data(self, data: str) -> None:
        if self.in_tag:
            self.queue.append(data)

    def handle_endtag(self, tag: str) -> None:
        if self.in_tag:
            self.in_tag = False
            self.done = True

    def __iter__(self) -> Iterable[JSON]:
        # Iterating over this object will magically produce individual listing
        # dictionaries. We're an iterator; we delegate to the JsonSlicer
        # iterator; and it in turn invokes read() which uses our data_chunks
        # iterator.
        return JsonSlicer(file=self, path_prefix=(
            'props', 'initialProps', 'pageProps', 'regularListingsFormatted', None,
        ))


class ZooplaScraper:
    ROOT = 'https://zoopla.co.uk'
    from_root = partial(urljoin, ROOT)

    def __init__(self):
        self.session = Session()

    def fetch(
        self, query: str = 'London', radius: int = 0,
        sort: str = 'newest_listings', page: int = 1,
    ) -> StreamParser:

        resp = self.session.get(
            self.from_root(f'for-sale/property/{query.lower()}/'),
            params={
                'page_size': 25,
                'q': query,
                'radius': radius,
                'results_sort': sort,
                'pn': page,
            },
            stream=True,
        )

        return StreamParser(resp)

    @classmethod
    def serialise(cls, listing: JSON) -> JSON:
        # Convert from the site's representation of a listing dict to our own
        return {
            'listing_id': listing['listingId'],
            'name_title': listing['title'],
            'names': listing['branch']['name'],
            'addresses': listing['address'],
            'agent': cls.from_root(listing['branch']['branchDetailsUri']),
            'phone_no': listing['branch']['phone'],
            'picture': listing['image']['src'],
            'prices': listing['price'],
            'listed_on': listing['publishedOn'],
            'listing_detail_link': cls.from_root(listing['listingUris']['detail']),
        }

    def run(
        self,
        query: str = 'London', radius: int = 0, sort: str = 'newest_listings',
        max_pages: int = 4,
    ) -> Iterable[JSON]:
        for page in range(1, max_pages + 1):
            with self.fetch(query, radius, sort, page) as stream:
                for n_listings, data in enumerate(stream):
                    yield self.serialise(data)
            logging.info(f'Page {page}: {n_listings} listings')

    @staticmethod
    def to_csv(results: Iterable[JSON], filename: str = 'zoopla.csv') -> None:
        with open(filename, 'w', newline='') as csv_file:
            first = next(results)
            writer = csv.DictWriter(csv_file, fieldnames=first.keys())
            writer.writeheader()
            writer.writerow(first)
            writer.writerows(results)
        logging.info(f'Write to {filename} complete')


if __name__ == '__main__':
    # Will include debugging statements from urllib3
    logging.basicConfig(level=logging.INFO)  # Switch to DEBUG for more verbosity

    scraper = ZooplaScraper()
    scraper.to_csv(scraper.run())
\$\endgroup\$

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.