import re import csv import sys from pathlib import Path from playwright.sync_api import sync_playwright, Page PROFILES_FILE = Path("profiles.txt") OUTPUT_CSV = Path("output.csv") OUTPUT_MD = Path("output.md") BROWSER_PROFILE = Path("browser_profile") POSTS_PER_PROFILE = 5 def extract_hashtags(text: str) -> list[str]: seen = set() result = [] for tag in re.findall(r"#\w+", text): if tag not in seen: seen.add(tag) result.append(tag) return result def extract_mentions(text: str) -> list[str]: seen = set() result = [] for mention in re.findall(r"@\w+", text): if mention not in seen: seen.add(mention) result.append(mention) return result def profile_slug_from_url(url: str) -> str: return url.rstrip("/").split("/")[-1] def read_profiles() -> list[str]: urls = [] for line in PROFILES_FILE.read_text().splitlines(): line = line.strip() if not line: continue # Lines may be prefixed with a number and tab parts = line.split("\t") url = parts[-1].strip() if url.startswith("http"): urls.append(url) return urls AUTH_STATE = Path("auth_state.json") def is_logged_in(page) -> bool: page.goto("https://www.instagram.com/", wait_until="networkidle", timeout=30000) return page.locator("input[name='username']").count() == 0 def ensure_authenticated(browser) -> object: if AUTH_STATE.exists(): context = browser.new_context(storage_state=str(AUTH_STATE)) page = context.new_page() if is_logged_in(page): print("[auth] Loaded saved session.") return context, page print("[auth] Saved session expired, need to re-login.") context.close() context = browser.new_context(viewport={"width": 1280, "height": 900}) page = context.new_page() page.goto("https://www.instagram.com/", wait_until="networkidle", timeout=30000) print("[auth] Please log in to Instagram in the browser window.") print("[auth] Press Enter here when you can see your feed...") input() if not is_logged_in(page): print("[auth] Still not logged in. Restart the script and try again.") sys.exit(1) context.storage_state(path=str(AUTH_STATE)) print(f"[auth] Session saved to {AUTH_STATE}") return context, page def get_post_urls(page, profile_url: str, count: int = POSTS_PER_PROFILE) -> list[str]: slug = profile_slug_from_url(profile_url) print(f"[{slug}] Navigating to profile...") try: page.goto(profile_url, wait_until="networkidle", timeout=30000) except Exception as e: print(f"[{slug}] Failed to load profile: {e}") return [] try: page.wait_for_selector("a[href*='/p/']", timeout=15000) except Exception: print(f"[{slug}] No posts found or profile is private.") return [] links = page.locator("a[href*='/p/']").all() seen = set() urls = [] for link in links: href = link.get_attribute("href") if href and href not in seen: seen.add(href) full = "https://www.instagram.com" + href if href.startswith("/") else href urls.append(full) if len(urls) >= count: break print(f"[{slug}] Found {len(urls)} post URLs.") return urls def scrape_post(page, post_url: str, profile_slug: str) -> dict: print(f" Scraping {post_url}") result = { "profile": profile_slug, "post_url": post_url, "date": "", "caption": "", "likes": "", "image_urls": "", "hashtags": "", "mentions": "", "location": "", "media_type": "", } try: page.goto(post_url, wait_until="networkidle", timeout=30000) page.wait_for_timeout(1500) except Exception as e: print(f" Failed to load post: {e}") return result # Date — first time[datetime] is always the post date try: time_el = page.locator("time[datetime]").first result["date"] = time_el.get_attribute("datetime") or "" except Exception: pass # Caption — walk text nodes in section containing profile link try: caption_parts = page.evaluate("""(slug) => { const sections = Array.from(document.querySelectorAll('section')); for (const sec of sections) { if (!sec.querySelector('a[href*="/' + slug + '/"]')) continue; const walker = document.createTreeWalker(sec, NodeFilter.SHOW_TEXT); const texts = []; let n; while ((n = walker.nextNode())) { const t = n.textContent.trim(); if (t.length > 20 && !/^\\d+ [wdhm]$/.test(t) && !/^\\d+$/.test(t)) { texts.push(t); } } if (texts.length > 0) return texts; } return []; }""", profile_slug) result["caption"] = " ".join(caption_parts) except Exception: pass # Likes — first span with purely numeric text try: like_span = page.locator("span").filter(has_text=re.compile(r"^\d+$")).first if like_span.count(): result["likes"] = like_span.inner_text().strip() except Exception: pass # Media type try: if page.locator("button[aria-label='Next']").count(): result["media_type"] = "carousel" elif page.locator("video").count(): result["media_type"] = "video" else: result["media_type"] = "photo" except Exception: result["media_type"] = "photo" # Image URLs — use JS to find CDN images, filter out profile pictures try: img_urls = page.evaluate("""() => { const imgs = Array.from(document.querySelectorAll('img')); return imgs .filter(img => img.src.includes('cdninstagram') && !img.src.includes('/s150x150/') && img.width > 100) .map(img => img.src); }""") result["image_urls"] = ", ".join(img_urls) except Exception: pass # Location — scoped to avoid footer "Locations" link try: loc_links = page.locator("a[href*='/explore/locations/']").all() for loc in loc_links: text = loc.inner_text().strip() if text and text.lower() != "locations": result["location"] = text break except Exception: pass # Hashtags and mentions from caption result["hashtags"] = ", ".join(extract_hashtags(result["caption"])) result["mentions"] = ", ".join(extract_mentions(result["caption"])) return result FIELDS = ["profile", "post_url", "date", "caption", "likes", "image_urls", "hashtags", "mentions", "location", "media_type"] def write_csv(posts: list[dict]) -> None: with OUTPUT_CSV.open("w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=FIELDS) writer.writeheader() writer.writerows(posts) print(f"[output] CSV saved to {OUTPUT_CSV} ({len(posts)} posts)") def write_markdown(posts: list[dict]) -> None: from itertools import groupby with OUTPUT_MD.open("w", encoding="utf-8") as f: f.write("# Instagram Scrape Results\n\n") for profile, group in groupby(posts, key=lambda p: p["profile"]): f.write(f"## {profile}\n\n") for post in group: f.write(f"### [{post['post_url']}]({post['post_url']})\n\n") for field in FIELDS: if field in ("profile", "post_url"): continue value = post.get(field, "") if value: f.write(f"- **{field}:** {value}\n") f.write("\n") print(f"[output] Markdown saved to {OUTPUT_MD}") def main(): profiles = read_profiles() if not profiles: print("No profiles found in profiles.txt") return print(f"[main] Loaded {len(profiles)} profiles.") all_posts = [] with sync_playwright() as p: browser = p.chromium.launch(headless=False) context, page = ensure_authenticated(browser) try: for profile_url in profiles: slug = profile_slug_from_url(profile_url) post_urls = get_post_urls(page, profile_url) for post_url in post_urls: post = scrape_post(page, post_url, slug) all_posts.append(post) except KeyboardInterrupt: print("\n[main] Interrupted. Saving collected data...") context.close() browser.close() if all_posts: write_csv(all_posts) write_markdown(all_posts) else: print("[main] No posts collected.") if __name__ == "__main__": main()