#!/usr/bin/env python3 """ Steam Workshop Query Tool for Project Zomboid Mods Queries Steam API to get mod details including correct Mod IDs with special characters. Useful for generating properly formatted mod lists for Build 42 servers. Usage: # Query individual workshop items (semicolon-separated) python steam-workshop-query.py "ID1;ID2;ID3" # Query from a Steam Workshop collection python steam-workshop-query.py --collection 3625776190 python steam-workshop-query.py --collection "https://steamcommunity.com/sharedfiles/filedetails?id=3625776190" # Output formats --json Output raw JSON data --ansible Output workshop_items and mod_ids strings for ansible config --report Human-readable report (default) Examples: python steam-workshop-query.py "3171167894;3330403100" --ansible python steam-workshop-query.py --collection 3625776190 --report """ import requests import json import sys import time import re import argparse from typing import List, Dict, Optional, Tuple from datetime import datetime STEAM_API_DETAILS = "https://api.steampowered.com/ISteamRemoteStorage/GetPublishedFileDetails/v1/" STEAM_API_COLLECTION = "https://api.steampowered.com/ISteamRemoteStorage/GetCollectionDetails/v1/" BATCH_SIZE = 50 # Conservative batch size to avoid rate limits DELAY_BETWEEN_BATCHES = 1.0 # seconds def get_collection_items(collection_id: str) -> List[str]: """Fetch all workshop item IDs from a Steam Workshop collection.""" data = {"collectioncount": 1, "publishedfileids[0]": collection_id} response = requests.post(STEAM_API_COLLECTION, data=data) response.raise_for_status() result = response.json() items = [] collection_details = result.get("response", {}).get("collectiondetails", []) if not collection_details: print(f"Warning: No collection found with ID {collection_id}", file=sys.stderr) return items for coll in collection_details: if coll.get("result") != 1: print(f"Warning: Collection {collection_id} returned error result", file=sys.stderr) continue for child in coll.get("children", []): file_id = child.get("publishedfileid") if file_id: items.append(file_id) return items def query_workshop_items_batch(item_ids: List[str]) -> List[Dict]: """Query Steam API for a batch of workshop item details.""" data = {"itemcount": len(item_ids)} for i, item_id in enumerate(item_ids): data[f"publishedfileids[{i}]"] = item_id response = requests.post(STEAM_API_DETAILS, data=data) response.raise_for_status() result = response.json() return result.get("response", {}).get("publishedfiledetails", []) def query_all_workshop_items(item_ids: List[str]) -> List[Dict]: """Query Steam API for all workshop items, handling batching.""" all_items = [] for i in range(0, len(item_ids), BATCH_SIZE): batch = item_ids[i:i + BATCH_SIZE] print(f"Querying batch {i // BATCH_SIZE + 1} ({len(batch)} items)...", file=sys.stderr) items = query_workshop_items_batch(batch) all_items.extend(items) # Delay between batches to avoid rate limiting if i + BATCH_SIZE < len(item_ids): time.sleep(DELAY_BETWEEN_BATCHES) return all_items def extract_mod_id(item: Dict) -> Optional[str]: """ Extract Mod ID(s) from item description. PZ mods typically include 'Mod ID: xxx' in their description. Some mods have multiple Mod IDs on separate lines or comma-separated. """ description = item.get("description", "") # Find ALL "Mod ID: xxx" patterns in description (multiple lines) matches = re.findall(r'Mod ID:\s*([^\r\n]+)', description, re.IGNORECASE) if not matches: return None all_mod_ids = [] for match in matches: mod_id_str = match.strip().rstrip('.') # Handle comma or semicolon separated mod IDs on same line if ',' in mod_id_str: all_mod_ids.extend([m.strip() for m in mod_id_str.split(',')]) elif ';' in mod_id_str: all_mod_ids.extend([m.strip() for m in mod_id_str.split(';')]) else: all_mod_ids.append(mod_id_str) # Remove empty strings and duplicates while preserving order seen = set() unique_ids = [] for mod_id in all_mod_ids: if mod_id and mod_id not in seen: seen.add(mod_id) unique_ids.append(mod_id) return ';'.join(unique_ids) if unique_ids else None def check_b42_compatible(item: Dict) -> Tuple[bool, str]: """ Check if mod appears to be B42 compatible. Returns (is_compatible, reason). """ title = item.get("title", "").lower() tags = [t.get("tag", "").lower() for t in item.get("tags", [])] all_tags_str = " ".join(tags) # B42 indicators in title or tags b42_patterns = [ r'\bb42\b', r'build\s*42', r'\b42\.\d+', r'\[b42\]', r'\(b42\)', ] for pattern in b42_patterns: if re.search(pattern, title) or re.search(pattern, all_tags_str): return True, "B42 mentioned in title/tags" # Check for B41 only indicators (might not be compatible) b41_only = re.search(r'\bb41\b.*only', title) or re.search(r'build\s*41\s*only', title) if b41_only: return False, "B41 only" return False, "No B42 indicator found" def has_special_characters(text: str) -> bool: """Check if text contains special characters that need attention.""" special = ["'", '"', "!", "&", "(", ")"] return any(c in text for c in special) def extract_collection_id(url_or_id: str) -> str: """Extract collection ID from URL or return as-is if already an ID.""" match = re.search(r'[?&]id=(\d+)', url_or_id) return match.group(1) if match else url_or_id def format_timestamp(unix_ts: int) -> str: """Format Unix timestamp as readable date.""" if not unix_ts: return "Unknown" return datetime.fromtimestamp(unix_ts).strftime("%Y-%m-%d") def process_items(items: List[Dict]) -> Dict: """ Process workshop items and extract relevant information. Returns a dict with processed data and analysis. """ processed = [] duplicates = {} issues = [] for item in items: workshop_id = item.get("publishedfileid", "unknown") title = item.get("title", "Unknown") mod_id = extract_mod_id(item) b42_compat, b42_reason = check_b42_compatible(item) last_updated = item.get("time_updated", 0) result_code = item.get("result", 0) entry = { "workshop_id": workshop_id, "title": title, "mod_id": mod_id, "b42_compatible": b42_compat, "b42_reason": b42_reason, "last_updated": format_timestamp(last_updated), "has_special_chars": has_special_characters(mod_id or ""), "result_code": result_code, } # Track duplicates by mod_id if mod_id: if mod_id in duplicates: duplicates[mod_id].append(workshop_id) else: duplicates[mod_id] = [workshop_id] # Track issues if result_code != 1: issues.append(f"Workshop item {workshop_id} returned error (result={result_code})") if not mod_id: issues.append(f"Workshop item {workshop_id} ({title}) has no Mod ID tag") if entry["has_special_chars"]: issues.append(f"Mod ID '{mod_id}' contains special characters") processed.append(entry) # Find actual duplicates (mod_id appearing more than once) duplicate_mod_ids = {k: v for k, v in duplicates.items() if len(v) > 1} return { "items": processed, "duplicates": duplicate_mod_ids, "issues": issues, "total_count": len(items), "valid_count": len([i for i in processed if i["mod_id"]]), } def output_report(data: Dict) -> None: """Output human-readable report.""" print("\n" + "=" * 80) print("STEAM WORKSHOP MOD ANALYSIS REPORT") print("=" * 80) print(f"\nTotal items: {data['total_count']}") print(f"Valid items (with Mod ID): {data['valid_count']}") if data["duplicates"]: print(f"\n{'=' * 40}") print("DUPLICATE MOD IDs:") print(f"{'=' * 40}") for mod_id, workshop_ids in data["duplicates"].items(): print(f" {mod_id}: {', '.join(workshop_ids)}") if data["issues"]: print(f"\n{'=' * 40}") print("ISSUES:") print(f"{'=' * 40}") for issue in data["issues"]: print(f" - {issue}") print(f"\n{'=' * 40}") print("MOD LIST:") print(f"{'=' * 40}") for item in data["items"]: b42_status = "[B42]" if item["b42_compatible"] else "[???]" special = " [SPECIAL CHARS]" if item["has_special_chars"] else "" mod_id_display = item["mod_id"] or "" print(f"\n Workshop: {item['workshop_id']}") print(f" Title: {item['title']}") print(f" Mod ID: {mod_id_display}{special}") print(f" Status: {b42_status} {item['b42_reason']}") print(f" Updated: {item['last_updated']}") def output_ansible(data: Dict) -> None: """Output ansible-ready configuration strings.""" # Get unique, valid mod IDs (preserving order, removing duplicates) seen_workshop = set() seen_mod_ids = set() workshop_items = [] mod_ids = [] for item in data["items"]: workshop_id = item["workshop_id"] mod_id_str = item["mod_id"] # Skip if we've seen this workshop item if workshop_id in seen_workshop: continue seen_workshop.add(workshop_id) workshop_items.append(workshop_id) # Handle mod_id which may contain multiple IDs separated by semicolon if mod_id_str: for mod_id in mod_id_str.split(';'): mod_id = mod_id.strip() if mod_id and mod_id not in seen_mod_ids: seen_mod_ids.add(mod_id) mod_ids.append(mod_id) # Format for Build 42 (backslash prefix) workshop_str = ";".join(workshop_items) mod_ids_str = ";".join(f"\\{mid}" for mid in mod_ids) print("\n# Ansible Configuration for zomboid_mods") print("# Copy these values to ansible/roles/podman/defaults/main.yml") print("") print("zomboid_mods:") print(" workshop_items: >-") print(f" {workshop_str}") print(" mod_ids: >-") print(f" {mod_ids_str}") if data["duplicates"]: print("\n# WARNING: The following Mod IDs had duplicates (kept first occurrence):") for mod_id, workshop_ids in data["duplicates"].items(): print(f"# {mod_id}: {', '.join(workshop_ids)}") if data["issues"]: print("\n# Issues found:") for issue in data["issues"]: print(f"# - {issue}") def output_json(data: Dict) -> None: """Output JSON data.""" print(json.dumps(data, indent=2)) def main(): parser = argparse.ArgumentParser( description="Query Steam Workshop for Project Zomboid mod details", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__ ) parser.add_argument( "workshop_ids", nargs="?", help="Semicolon-separated workshop IDs (e.g., 'ID1;ID2;ID3')" ) parser.add_argument( "--collection", "-c", help="Steam Workshop collection ID or URL" ) parser.add_argument( "--json", "-j", action="store_true", help="Output raw JSON data" ) parser.add_argument( "--ansible", "-a", action="store_true", help="Output ansible-ready configuration" ) parser.add_argument( "--report", "-r", action="store_true", help="Output human-readable report (default)" ) args = parser.parse_args() # Determine input source if args.collection: collection_id = extract_collection_id(args.collection) print(f"Fetching collection {collection_id}...", file=sys.stderr) item_ids = get_collection_items(collection_id) if not item_ids: print("Error: No items found in collection", file=sys.stderr) sys.exit(1) print(f"Found {len(item_ids)} items in collection", file=sys.stderr) elif args.workshop_ids: item_ids = [id.strip() for id in args.workshop_ids.split(";") if id.strip()] else: parser.print_help() sys.exit(1) # Query Steam API print(f"Querying {len(item_ids)} workshop items...", file=sys.stderr) items = query_all_workshop_items(item_ids) print(f"Retrieved {len(items)} item details", file=sys.stderr) # Process items data = process_items(items) # Output based on format if args.json: output_json(data) elif args.ansible: output_ansible(data) else: output_report(data) if __name__ == "__main__": main()