Private
Public Access
1
0

feat: complete scraper with auth, scraping, and output

This commit is contained in:
belisards
2026-03-29 17:02:39 -03:00
parent cdd71deb17
commit 8929f3ec40

View File

@@ -47,3 +47,234 @@ def read_profiles() -> list[str]:
if url.startswith("http"):
urls.append(url)
return urls
AUTH_STATE = Path("auth_state.json")
def is_logged_in(page) -> bool:
page.goto("https://www.instagram.com/", wait_until="networkidle", timeout=30000)
return page.locator("input[name='username']").count() == 0
def ensure_authenticated(browser) -> object:
if AUTH_STATE.exists():
context = browser.new_context(storage_state=str(AUTH_STATE))
page = context.new_page()
if is_logged_in(page):
print("[auth] Loaded saved session.")
return context, page
print("[auth] Saved session expired, need to re-login.")
context.close()
context = browser.new_context(viewport={"width": 1280, "height": 900})
page = context.new_page()
page.goto("https://www.instagram.com/", wait_until="networkidle", timeout=30000)
print("[auth] Please log in to Instagram in the browser window.")
print("[auth] Press Enter here when you can see your feed...")
input()
if not is_logged_in(page):
print("[auth] Still not logged in. Restart the script and try again.")
sys.exit(1)
context.storage_state(path=str(AUTH_STATE))
print(f"[auth] Session saved to {AUTH_STATE}")
return context, page
def get_post_urls(page, profile_url: str, count: int = POSTS_PER_PROFILE) -> list[str]:
slug = profile_slug_from_url(profile_url)
print(f"[{slug}] Navigating to profile...")
try:
page.goto(profile_url, wait_until="networkidle", timeout=30000)
except Exception as e:
print(f"[{slug}] Failed to load profile: {e}")
return []
try:
page.wait_for_selector("a[href*='/p/']", timeout=15000)
except Exception:
print(f"[{slug}] No posts found or profile is private.")
return []
links = page.locator("a[href*='/p/']").all()
seen = set()
urls = []
for link in links:
href = link.get_attribute("href")
if href and href not in seen:
seen.add(href)
full = "https://www.instagram.com" + href if href.startswith("/") else href
urls.append(full)
if len(urls) >= count:
break
print(f"[{slug}] Found {len(urls)} post URLs.")
return urls
def scrape_post(page, post_url: str, profile_slug: str) -> dict:
print(f" Scraping {post_url}")
result = {
"profile": profile_slug,
"post_url": post_url,
"date": "",
"caption": "",
"likes": "",
"image_urls": "",
"hashtags": "",
"mentions": "",
"location": "",
"media_type": "",
}
try:
page.goto(post_url, wait_until="networkidle", timeout=30000)
page.wait_for_timeout(1500)
except Exception as e:
print(f" Failed to load post: {e}")
return result
# Date — first time[datetime] is always the post date
try:
time_el = page.locator("time[datetime]").first
result["date"] = time_el.get_attribute("datetime") or ""
except Exception:
pass
# Caption — walk text nodes in section containing profile link
try:
caption_parts = page.evaluate("""(slug) => {
const sections = Array.from(document.querySelectorAll('section'));
for (const sec of sections) {
if (!sec.querySelector('a[href*="/' + slug + '/"]')) continue;
const walker = document.createTreeWalker(sec, NodeFilter.SHOW_TEXT);
const texts = [];
let n;
while ((n = walker.nextNode())) {
const t = n.textContent.trim();
if (t.length > 20 && !/^\\d+ [wdhm]$/.test(t) && !/^\\d+$/.test(t)) {
texts.push(t);
}
}
if (texts.length > 0) return texts;
}
return [];
}""", profile_slug)
result["caption"] = " ".join(caption_parts)
except Exception:
pass
# Likes — first span with purely numeric text
try:
like_span = page.locator("span").filter(has_text=re.compile(r"^\d+$")).first
if like_span.count():
result["likes"] = like_span.inner_text().strip()
except Exception:
pass
# Media type
try:
if page.locator("button[aria-label='Next']").count():
result["media_type"] = "carousel"
elif page.locator("video").count():
result["media_type"] = "video"
else:
result["media_type"] = "photo"
except Exception:
result["media_type"] = "photo"
# Image URLs — use JS to find CDN images, filter out profile pictures
try:
img_urls = page.evaluate("""() => {
const imgs = Array.from(document.querySelectorAll('img'));
return imgs
.filter(img => img.src.includes('cdninstagram') && !img.src.includes('/s150x150/') && img.width > 100)
.map(img => img.src);
}""")
result["image_urls"] = ", ".join(img_urls)
except Exception:
pass
# Location — scoped to avoid footer "Locations" link
try:
loc_links = page.locator("a[href*='/explore/locations/']").all()
for loc in loc_links:
text = loc.inner_text().strip()
if text and text.lower() != "locations":
result["location"] = text
break
except Exception:
pass
# Hashtags and mentions from caption
result["hashtags"] = ", ".join(extract_hashtags(result["caption"]))
result["mentions"] = ", ".join(extract_mentions(result["caption"]))
return result
FIELDS = ["profile", "post_url", "date", "caption", "likes", "image_urls",
"hashtags", "mentions", "location", "media_type"]
def write_csv(posts: list[dict]) -> None:
with OUTPUT_CSV.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=FIELDS)
writer.writeheader()
writer.writerows(posts)
print(f"[output] CSV saved to {OUTPUT_CSV} ({len(posts)} posts)")
def write_markdown(posts: list[dict]) -> None:
from itertools import groupby
with OUTPUT_MD.open("w", encoding="utf-8") as f:
f.write("# Instagram Scrape Results\n\n")
for profile, group in groupby(posts, key=lambda p: p["profile"]):
f.write(f"## {profile}\n\n")
for post in group:
f.write(f"### [{post['post_url']}]({post['post_url']})\n\n")
for field in FIELDS:
if field in ("profile", "post_url"):
continue
value = post.get(field, "")
if value:
f.write(f"- **{field}:** {value}\n")
f.write("\n")
print(f"[output] Markdown saved to {OUTPUT_MD}")
def main():
profiles = read_profiles()
if not profiles:
print("No profiles found in profiles.txt")
return
print(f"[main] Loaded {len(profiles)} profiles.")
all_posts = []
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context, page = ensure_authenticated(browser)
try:
for profile_url in profiles:
slug = profile_slug_from_url(profile_url)
post_urls = get_post_urls(page, profile_url)
for post_url in post_urls:
post = scrape_post(page, post_url, slug)
all_posts.append(post)
except KeyboardInterrupt:
print("\n[main] Interrupted. Saving collected data...")
context.close()
browser.close()
if all_posts:
write_csv(all_posts)
write_markdown(all_posts)
else:
print("[main] No posts collected.")
if __name__ == "__main__":
main()