算法公开课-web自动遍历

web爬虫

import logging
from dataclasses import dataclass
from time import sleep
from typing import List

from selenium import webdriver
from selenium.webdriver.common.by import By

log = logging.getLogger('demo')
log.setLevel(logging.DEBUG)


class Node:
    def __init__(self, data=None):
        self.data: Item = data
        self.children: List[Node] = []

    def travel(self, node=None, depth=1, root=True):
        if root:
            node = self
        if node is None:
            return

        yield node, depth
        depth += 1
        for child in node.children:
            yield from self.travel(child, depth, False)
        depth -= 1

    def mindmap(self):
        content = []
        content.append('@startmindmap')
        for node, depth in self.travel():
            content.append(f'{"*" * depth} {node.data}')

        content.append('@endmindmap')
        return '\n'.join(content)

    def __repr__(self):
        return str(self.data)


class Stack:
    def __init__(self):
        self._data: List[Node] = []

    def push(self, data):
        self._data.append(data)

    def pop(self):
        return self._data.pop()

    def top(self):
        return self._data[-1]

    def __contains__(self, item: Node):
        for _item in self._data:
            for child in _item.children:
                if child.data.uuid() == item.data.uuid():
                    return True
        return False

    def pop_to_item(self, item: Node):
        if len(self._data) <= 2:
            return None
        for _item in self._data[0:-2]:
            for child in _item.children:
                if child.data.uuid() == item.data.uuid():
                    log.debug(f'pop to {_item}')
                    while self.top().data.uuid() != _item.data.uuid():
                        self.pop()
                    return child
        return None


@dataclass
class Item:
    name: str = None
    href: str = None
    clicked: bool = False
    click_index: int = 0

    def uuid(self):
        return f'{self.name} {self.href}'

    # def __repr__(self):
    #     return f'{self.name} {self.href} {self.clicked}'


def test_crawler():
    url = 'https://ceshiren.com'
    current_status = {}
    current_status['index'] = 0

    driver = webdriver.Chrome()
    driver.maximize_window()
    driver.get(url)

    root = Node(Item(href=url, name='', clicked=True))
    stack = Stack()

    def update_node(node):
        # driver.execute_script('console.log("crawler")')
        elements = driver.find_elements(by=By.CSS_SELECTOR, value='a')

        element_map = {}
        for element in elements[0:20]:
            try:
                if not element.is_displayed():
                    continue

                href = element.get_property('href')
                name = element.text
                item = Item(name=name, href=href)
                match_elements = [e for e in node.children if e.data.uuid() == item.uuid()]
                element_map[item.uuid()] = element
                if not match_elements:
                    node.children.append(Node(item))
            except Exception as e:
                log.warning(e)
        return element_map

    def crawler(node, element_map=None):
        log.info(node.data)
        stack.push(node)
        if current_status['index'] % 10 == 0:
            with open(f'{__name__}.puml', 'w') as f:
                f.write(root.mindmap())

        if element_map is None:
            log.info(f'refresh {node}')
            element_map = update_node(node)
            log.info(f'{element_map}')

        target_node = None
        for child in node.children:
            if not child.data.clicked and child.data.uuid() in element_map:
                target_node = child
                log.info(f'found {target_node}')
                break
        if target_node is not None:
            parent_node = stack.pop_to_item(target_node)
            if parent_node:
                target_node = parent_node
                log.info(f'back to {target_node}')
            else:
                log.info(element_map)
                target_element = element_map[target_node.data.uuid()]
                log.info(f'click {target_node}')
                log.info(f'click {target_element.id}')
                try:

                    target_node.data.clicked = True
                    if target_element.is_displayed():
                        target_element.click()
                        current_status['index'] += 1
                        target_node.data.click_index = current_status['index']
                    else:
                        log.warning('not displayed skipped')
                except Exception as e:
                    log.warning(e)
            crawler(target_node, None)
        else:
            log.info('all clicked')

    crawler(root)


2 个赞