diff --git a/scraper.py b/scraper.py index 5fe3dff..d5e8663 100644 --- a/scraper.py +++ b/scraper.py @@ -47,3 +47,234 @@ def read_profiles() -> list[str]: if url.startswith("http"): urls.append(url) return urls + + +AUTH_STATE = Path("auth_state.json") + + +def is_logged_in(page) -> bool: + page.goto("https://www.instagram.com/", wait_until="networkidle", timeout=30000) + return page.locator("input[name='username']").count() == 0 + + +def ensure_authenticated(browser) -> object: + if AUTH_STATE.exists(): + context = browser.new_context(storage_state=str(AUTH_STATE)) + page = context.new_page() + if is_logged_in(page): + print("[auth] Loaded saved session.") + return context, page + print("[auth] Saved session expired, need to re-login.") + context.close() + + context = browser.new_context(viewport={"width": 1280, "height": 900}) + page = context.new_page() + page.goto("https://www.instagram.com/", wait_until="networkidle", timeout=30000) + print("[auth] Please log in to Instagram in the browser window.") + print("[auth] Press Enter here when you can see your feed...") + input() + if not is_logged_in(page): + print("[auth] Still not logged in. Restart the script and try again.") + sys.exit(1) + context.storage_state(path=str(AUTH_STATE)) + print(f"[auth] Session saved to {AUTH_STATE}") + return context, page + + +def get_post_urls(page, profile_url: str, count: int = POSTS_PER_PROFILE) -> list[str]: + slug = profile_slug_from_url(profile_url) + print(f"[{slug}] Navigating to profile...") + try: + page.goto(profile_url, wait_until="networkidle", timeout=30000) + except Exception as e: + print(f"[{slug}] Failed to load profile: {e}") + return [] + + try: + page.wait_for_selector("a[href*='/p/']", timeout=15000) + except Exception: + print(f"[{slug}] No posts found or profile is private.") + return [] + + links = page.locator("a[href*='/p/']").all() + seen = set() + urls = [] + for link in links: + href = link.get_attribute("href") + if href and href not in seen: + seen.add(href) + full = "https://www.instagram.com" + href if href.startswith("/") else href + urls.append(full) + if len(urls) >= count: + break + + print(f"[{slug}] Found {len(urls)} post URLs.") + return urls + + +def scrape_post(page, post_url: str, profile_slug: str) -> dict: + print(f" Scraping {post_url}") + result = { + "profile": profile_slug, + "post_url": post_url, + "date": "", + "caption": "", + "likes": "", + "image_urls": "", + "hashtags": "", + "mentions": "", + "location": "", + "media_type": "", + } + + try: + page.goto(post_url, wait_until="networkidle", timeout=30000) + page.wait_for_timeout(1500) + except Exception as e: + print(f" Failed to load post: {e}") + return result + + # Date — first time[datetime] is always the post date + try: + time_el = page.locator("time[datetime]").first + result["date"] = time_el.get_attribute("datetime") or "" + except Exception: + pass + + # Caption — walk text nodes in section containing profile link + try: + caption_parts = page.evaluate("""(slug) => { + const sections = Array.from(document.querySelectorAll('section')); + for (const sec of sections) { + if (!sec.querySelector('a[href*="/' + slug + '/"]')) continue; + const walker = document.createTreeWalker(sec, NodeFilter.SHOW_TEXT); + const texts = []; + let n; + while ((n = walker.nextNode())) { + const t = n.textContent.trim(); + if (t.length > 20 && !/^\\d+ [wdhm]$/.test(t) && !/^\\d+$/.test(t)) { + texts.push(t); + } + } + if (texts.length > 0) return texts; + } + return []; + }""", profile_slug) + result["caption"] = " ".join(caption_parts) + except Exception: + pass + + # Likes — first span with purely numeric text + try: + like_span = page.locator("span").filter(has_text=re.compile(r"^\d+$")).first + if like_span.count(): + result["likes"] = like_span.inner_text().strip() + except Exception: + pass + + # Media type + try: + if page.locator("button[aria-label='Next']").count(): + result["media_type"] = "carousel" + elif page.locator("video").count(): + result["media_type"] = "video" + else: + result["media_type"] = "photo" + except Exception: + result["media_type"] = "photo" + + # Image URLs — use JS to find CDN images, filter out profile pictures + try: + img_urls = page.evaluate("""() => { + const imgs = Array.from(document.querySelectorAll('img')); + return imgs + .filter(img => img.src.includes('cdninstagram') && !img.src.includes('/s150x150/') && img.width > 100) + .map(img => img.src); + }""") + result["image_urls"] = ", ".join(img_urls) + except Exception: + pass + + # Location — scoped to avoid footer "Locations" link + try: + loc_links = page.locator("a[href*='/explore/locations/']").all() + for loc in loc_links: + text = loc.inner_text().strip() + if text and text.lower() != "locations": + result["location"] = text + break + except Exception: + pass + + # Hashtags and mentions from caption + result["hashtags"] = ", ".join(extract_hashtags(result["caption"])) + result["mentions"] = ", ".join(extract_mentions(result["caption"])) + + return result + + +FIELDS = ["profile", "post_url", "date", "caption", "likes", "image_urls", + "hashtags", "mentions", "location", "media_type"] + + +def write_csv(posts: list[dict]) -> None: + with OUTPUT_CSV.open("w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=FIELDS) + writer.writeheader() + writer.writerows(posts) + print(f"[output] CSV saved to {OUTPUT_CSV} ({len(posts)} posts)") + + +def write_markdown(posts: list[dict]) -> None: + from itertools import groupby + with OUTPUT_MD.open("w", encoding="utf-8") as f: + f.write("# Instagram Scrape Results\n\n") + for profile, group in groupby(posts, key=lambda p: p["profile"]): + f.write(f"## {profile}\n\n") + for post in group: + f.write(f"### [{post['post_url']}]({post['post_url']})\n\n") + for field in FIELDS: + if field in ("profile", "post_url"): + continue + value = post.get(field, "") + if value: + f.write(f"- **{field}:** {value}\n") + f.write("\n") + print(f"[output] Markdown saved to {OUTPUT_MD}") + + +def main(): + profiles = read_profiles() + if not profiles: + print("No profiles found in profiles.txt") + return + + print(f"[main] Loaded {len(profiles)} profiles.") + all_posts = [] + + with sync_playwright() as p: + browser = p.chromium.launch(headless=False) + context, page = ensure_authenticated(browser) + + try: + for profile_url in profiles: + slug = profile_slug_from_url(profile_url) + post_urls = get_post_urls(page, profile_url) + for post_url in post_urls: + post = scrape_post(page, post_url, slug) + all_posts.append(post) + except KeyboardInterrupt: + print("\n[main] Interrupted. Saving collected data...") + + context.close() + browser.close() + + if all_posts: + write_csv(all_posts) + write_markdown(all_posts) + else: + print("[main] No posts collected.") + + +if __name__ == "__main__": + main()