281 lines
8.8 KiB
Python
281 lines
8.8 KiB
Python
import re
|
|
import csv
|
|
import sys
|
|
from pathlib import Path
|
|
from playwright.sync_api import sync_playwright, Page
|
|
|
|
PROFILES_FILE = Path("profiles.txt")
|
|
OUTPUT_CSV = Path("output.csv")
|
|
OUTPUT_MD = Path("output.md")
|
|
BROWSER_PROFILE = Path("browser_profile")
|
|
POSTS_PER_PROFILE = 5
|
|
|
|
|
|
def extract_hashtags(text: str) -> list[str]:
|
|
seen = set()
|
|
result = []
|
|
for tag in re.findall(r"#\w+", text):
|
|
if tag not in seen:
|
|
seen.add(tag)
|
|
result.append(tag)
|
|
return result
|
|
|
|
|
|
def extract_mentions(text: str) -> list[str]:
|
|
seen = set()
|
|
result = []
|
|
for mention in re.findall(r"@\w+", text):
|
|
if mention not in seen:
|
|
seen.add(mention)
|
|
result.append(mention)
|
|
return result
|
|
|
|
|
|
def profile_slug_from_url(url: str) -> str:
|
|
return url.rstrip("/").split("/")[-1]
|
|
|
|
|
|
def read_profiles() -> list[str]:
|
|
urls = []
|
|
for line in PROFILES_FILE.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
# Lines may be prefixed with a number and tab
|
|
parts = line.split("\t")
|
|
url = parts[-1].strip()
|
|
if url.startswith("http"):
|
|
urls.append(url)
|
|
return urls
|
|
|
|
|
|
AUTH_STATE = Path("auth_state.json")
|
|
|
|
|
|
def is_logged_in(page) -> bool:
|
|
page.goto("https://www.instagram.com/", wait_until="networkidle", timeout=30000)
|
|
return page.locator("input[name='username']").count() == 0
|
|
|
|
|
|
def ensure_authenticated(browser) -> object:
|
|
if AUTH_STATE.exists():
|
|
context = browser.new_context(storage_state=str(AUTH_STATE))
|
|
page = context.new_page()
|
|
if is_logged_in(page):
|
|
print("[auth] Loaded saved session.")
|
|
return context, page
|
|
print("[auth] Saved session expired, need to re-login.")
|
|
context.close()
|
|
|
|
context = browser.new_context(viewport={"width": 1280, "height": 900})
|
|
page = context.new_page()
|
|
page.goto("https://www.instagram.com/", wait_until="networkidle", timeout=30000)
|
|
print("[auth] Please log in to Instagram in the browser window.")
|
|
print("[auth] Press Enter here when you can see your feed...")
|
|
input()
|
|
if not is_logged_in(page):
|
|
print("[auth] Still not logged in. Restart the script and try again.")
|
|
sys.exit(1)
|
|
context.storage_state(path=str(AUTH_STATE))
|
|
print(f"[auth] Session saved to {AUTH_STATE}")
|
|
return context, page
|
|
|
|
|
|
def get_post_urls(page, profile_url: str, count: int = POSTS_PER_PROFILE) -> list[str]:
|
|
slug = profile_slug_from_url(profile_url)
|
|
print(f"[{slug}] Navigating to profile...")
|
|
try:
|
|
page.goto(profile_url, wait_until="networkidle", timeout=30000)
|
|
except Exception as e:
|
|
print(f"[{slug}] Failed to load profile: {e}")
|
|
return []
|
|
|
|
try:
|
|
page.wait_for_selector("a[href*='/p/']", timeout=15000)
|
|
except Exception:
|
|
print(f"[{slug}] No posts found or profile is private.")
|
|
return []
|
|
|
|
links = page.locator("a[href*='/p/']").all()
|
|
seen = set()
|
|
urls = []
|
|
for link in links:
|
|
href = link.get_attribute("href")
|
|
if href and href not in seen:
|
|
seen.add(href)
|
|
full = "https://www.instagram.com" + href if href.startswith("/") else href
|
|
urls.append(full)
|
|
if len(urls) >= count:
|
|
break
|
|
|
|
print(f"[{slug}] Found {len(urls)} post URLs.")
|
|
return urls
|
|
|
|
|
|
def scrape_post(page, post_url: str, profile_slug: str) -> dict:
|
|
print(f" Scraping {post_url}")
|
|
result = {
|
|
"profile": profile_slug,
|
|
"post_url": post_url,
|
|
"date": "",
|
|
"caption": "",
|
|
"likes": "",
|
|
"image_urls": "",
|
|
"hashtags": "",
|
|
"mentions": "",
|
|
"location": "",
|
|
"media_type": "",
|
|
}
|
|
|
|
try:
|
|
page.goto(post_url, wait_until="networkidle", timeout=30000)
|
|
page.wait_for_timeout(1500)
|
|
except Exception as e:
|
|
print(f" Failed to load post: {e}")
|
|
return result
|
|
|
|
# Date — first time[datetime] is always the post date
|
|
try:
|
|
time_el = page.locator("time[datetime]").first
|
|
result["date"] = time_el.get_attribute("datetime") or ""
|
|
except Exception:
|
|
pass
|
|
|
|
# Caption — walk text nodes in section containing profile link
|
|
try:
|
|
caption_parts = page.evaluate("""(slug) => {
|
|
const sections = Array.from(document.querySelectorAll('section'));
|
|
for (const sec of sections) {
|
|
if (!sec.querySelector('a[href*="/' + slug + '/"]')) continue;
|
|
const walker = document.createTreeWalker(sec, NodeFilter.SHOW_TEXT);
|
|
const texts = [];
|
|
let n;
|
|
while ((n = walker.nextNode())) {
|
|
const t = n.textContent.trim();
|
|
if (t.length > 20 && !/^\\d+ [wdhm]$/.test(t) && !/^\\d+$/.test(t)) {
|
|
texts.push(t);
|
|
}
|
|
}
|
|
if (texts.length > 0) return texts;
|
|
}
|
|
return [];
|
|
}""", profile_slug)
|
|
result["caption"] = " ".join(caption_parts)
|
|
except Exception:
|
|
pass
|
|
|
|
# Likes — first span with purely numeric text
|
|
try:
|
|
like_span = page.locator("span").filter(has_text=re.compile(r"^\d+$")).first
|
|
if like_span.count():
|
|
result["likes"] = like_span.inner_text().strip()
|
|
except Exception:
|
|
pass
|
|
|
|
# Media type
|
|
try:
|
|
if page.locator("button[aria-label='Next']").count():
|
|
result["media_type"] = "carousel"
|
|
elif page.locator("video").count():
|
|
result["media_type"] = "video"
|
|
else:
|
|
result["media_type"] = "photo"
|
|
except Exception:
|
|
result["media_type"] = "photo"
|
|
|
|
# Image URLs — use JS to find CDN images, filter out profile pictures
|
|
try:
|
|
img_urls = page.evaluate("""() => {
|
|
const imgs = Array.from(document.querySelectorAll('img'));
|
|
return imgs
|
|
.filter(img => img.src.includes('cdninstagram') && !img.src.includes('/s150x150/') && img.width > 100)
|
|
.map(img => img.src);
|
|
}""")
|
|
result["image_urls"] = ", ".join(img_urls)
|
|
except Exception:
|
|
pass
|
|
|
|
# Location — scoped to avoid footer "Locations" link
|
|
try:
|
|
loc_links = page.locator("a[href*='/explore/locations/']").all()
|
|
for loc in loc_links:
|
|
text = loc.inner_text().strip()
|
|
if text and text.lower() != "locations":
|
|
result["location"] = text
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
# Hashtags and mentions from caption
|
|
result["hashtags"] = ", ".join(extract_hashtags(result["caption"]))
|
|
result["mentions"] = ", ".join(extract_mentions(result["caption"]))
|
|
|
|
return result
|
|
|
|
|
|
FIELDS = ["profile", "post_url", "date", "caption", "likes", "image_urls",
|
|
"hashtags", "mentions", "location", "media_type"]
|
|
|
|
|
|
def write_csv(posts: list[dict]) -> None:
|
|
with OUTPUT_CSV.open("w", newline="", encoding="utf-8") as f:
|
|
writer = csv.DictWriter(f, fieldnames=FIELDS)
|
|
writer.writeheader()
|
|
writer.writerows(posts)
|
|
print(f"[output] CSV saved to {OUTPUT_CSV} ({len(posts)} posts)")
|
|
|
|
|
|
def write_markdown(posts: list[dict]) -> None:
|
|
from itertools import groupby
|
|
with OUTPUT_MD.open("w", encoding="utf-8") as f:
|
|
f.write("# Instagram Scrape Results\n\n")
|
|
for profile, group in groupby(posts, key=lambda p: p["profile"]):
|
|
f.write(f"## {profile}\n\n")
|
|
for post in group:
|
|
f.write(f"### [{post['post_url']}]({post['post_url']})\n\n")
|
|
for field in FIELDS:
|
|
if field in ("profile", "post_url"):
|
|
continue
|
|
value = post.get(field, "")
|
|
if value:
|
|
f.write(f"- **{field}:** {value}\n")
|
|
f.write("\n")
|
|
print(f"[output] Markdown saved to {OUTPUT_MD}")
|
|
|
|
|
|
def main():
|
|
profiles = read_profiles()
|
|
if not profiles:
|
|
print("No profiles found in profiles.txt")
|
|
return
|
|
|
|
print(f"[main] Loaded {len(profiles)} profiles.")
|
|
all_posts = []
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=False)
|
|
context, page = ensure_authenticated(browser)
|
|
|
|
try:
|
|
for profile_url in profiles:
|
|
slug = profile_slug_from_url(profile_url)
|
|
post_urls = get_post_urls(page, profile_url)
|
|
for post_url in post_urls:
|
|
post = scrape_post(page, post_url, slug)
|
|
all_posts.append(post)
|
|
except KeyboardInterrupt:
|
|
print("\n[main] Interrupted. Saving collected data...")
|
|
|
|
context.close()
|
|
browser.close()
|
|
|
|
if all_posts:
|
|
write_csv(all_posts)
|
|
write_markdown(all_posts)
|
|
else:
|
|
print("[main] No posts collected.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|