402 lines
13 KiB
Python
Executable File
402 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Steam Workshop Query Tool for Project Zomboid Mods
|
|
|
|
Queries Steam API to get mod details including correct Mod IDs with special characters.
|
|
Useful for generating properly formatted mod lists for Build 42 servers.
|
|
|
|
Usage:
|
|
# Query individual workshop items (semicolon-separated)
|
|
python steam-workshop-query.py "ID1;ID2;ID3"
|
|
|
|
# Query from a Steam Workshop collection
|
|
python steam-workshop-query.py --collection 3625776190
|
|
python steam-workshop-query.py --collection "https://steamcommunity.com/sharedfiles/filedetails?id=3625776190"
|
|
|
|
# Output formats
|
|
--json Output raw JSON data
|
|
--ansible Output workshop_items and mod_ids strings for ansible config
|
|
--report Human-readable report (default)
|
|
|
|
Examples:
|
|
python steam-workshop-query.py "3171167894;3330403100" --ansible
|
|
python steam-workshop-query.py --collection 3625776190 --report
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import sys
|
|
import time
|
|
import re
|
|
import argparse
|
|
from typing import List, Dict, Optional, Tuple
|
|
from datetime import datetime
|
|
|
|
STEAM_API_DETAILS = "https://api.steampowered.com/ISteamRemoteStorage/GetPublishedFileDetails/v1/"
|
|
STEAM_API_COLLECTION = "https://api.steampowered.com/ISteamRemoteStorage/GetCollectionDetails/v1/"
|
|
BATCH_SIZE = 50 # Conservative batch size to avoid rate limits
|
|
DELAY_BETWEEN_BATCHES = 1.0 # seconds
|
|
|
|
|
|
def get_collection_items(collection_id: str) -> List[str]:
|
|
"""Fetch all workshop item IDs from a Steam Workshop collection."""
|
|
data = {"collectioncount": 1, "publishedfileids[0]": collection_id}
|
|
response = requests.post(STEAM_API_COLLECTION, data=data)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
items = []
|
|
collection_details = result.get("response", {}).get("collectiondetails", [])
|
|
|
|
if not collection_details:
|
|
print(f"Warning: No collection found with ID {collection_id}", file=sys.stderr)
|
|
return items
|
|
|
|
for coll in collection_details:
|
|
if coll.get("result") != 1:
|
|
print(f"Warning: Collection {collection_id} returned error result", file=sys.stderr)
|
|
continue
|
|
for child in coll.get("children", []):
|
|
file_id = child.get("publishedfileid")
|
|
if file_id:
|
|
items.append(file_id)
|
|
|
|
return items
|
|
|
|
|
|
def query_workshop_items_batch(item_ids: List[str]) -> List[Dict]:
|
|
"""Query Steam API for a batch of workshop item details."""
|
|
data = {"itemcount": len(item_ids)}
|
|
for i, item_id in enumerate(item_ids):
|
|
data[f"publishedfileids[{i}]"] = item_id
|
|
|
|
response = requests.post(STEAM_API_DETAILS, data=data)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
return result.get("response", {}).get("publishedfiledetails", [])
|
|
|
|
|
|
def query_all_workshop_items(item_ids: List[str]) -> List[Dict]:
|
|
"""Query Steam API for all workshop items, handling batching."""
|
|
all_items = []
|
|
|
|
for i in range(0, len(item_ids), BATCH_SIZE):
|
|
batch = item_ids[i:i + BATCH_SIZE]
|
|
print(f"Querying batch {i // BATCH_SIZE + 1} ({len(batch)} items)...", file=sys.stderr)
|
|
|
|
items = query_workshop_items_batch(batch)
|
|
all_items.extend(items)
|
|
|
|
# Delay between batches to avoid rate limiting
|
|
if i + BATCH_SIZE < len(item_ids):
|
|
time.sleep(DELAY_BETWEEN_BATCHES)
|
|
|
|
return all_items
|
|
|
|
|
|
def extract_mod_id(item: Dict) -> Optional[str]:
|
|
"""
|
|
Extract Mod ID(s) from item description.
|
|
PZ mods typically include 'Mod ID: xxx' in their description.
|
|
Some mods have multiple Mod IDs on separate lines or comma-separated.
|
|
"""
|
|
description = item.get("description", "")
|
|
|
|
# Find ALL "Mod ID: xxx" patterns in description (multiple lines)
|
|
matches = re.findall(r'Mod ID:\s*([^\r\n]+)', description, re.IGNORECASE)
|
|
|
|
if not matches:
|
|
return None
|
|
|
|
all_mod_ids = []
|
|
for match in matches:
|
|
mod_id_str = match.strip().rstrip('.')
|
|
# Handle comma or semicolon separated mod IDs on same line
|
|
if ',' in mod_id_str:
|
|
all_mod_ids.extend([m.strip() for m in mod_id_str.split(',')])
|
|
elif ';' in mod_id_str:
|
|
all_mod_ids.extend([m.strip() for m in mod_id_str.split(';')])
|
|
else:
|
|
all_mod_ids.append(mod_id_str)
|
|
|
|
# Remove empty strings and duplicates while preserving order
|
|
seen = set()
|
|
unique_ids = []
|
|
for mod_id in all_mod_ids:
|
|
if mod_id and mod_id not in seen:
|
|
seen.add(mod_id)
|
|
unique_ids.append(mod_id)
|
|
|
|
return ';'.join(unique_ids) if unique_ids else None
|
|
|
|
|
|
def check_b42_compatible(item: Dict) -> Tuple[bool, str]:
|
|
"""
|
|
Check if mod appears to be B42 compatible.
|
|
Returns (is_compatible, reason).
|
|
"""
|
|
title = item.get("title", "").lower()
|
|
tags = [t.get("tag", "").lower() for t in item.get("tags", [])]
|
|
all_tags_str = " ".join(tags)
|
|
|
|
# B42 indicators in title or tags
|
|
b42_patterns = [
|
|
r'\bb42\b',
|
|
r'build\s*42',
|
|
r'\b42\.\d+',
|
|
r'\[b42\]',
|
|
r'\(b42\)',
|
|
]
|
|
|
|
for pattern in b42_patterns:
|
|
if re.search(pattern, title) or re.search(pattern, all_tags_str):
|
|
return True, "B42 mentioned in title/tags"
|
|
|
|
# Check for B41 only indicators (might not be compatible)
|
|
b41_only = re.search(r'\bb41\b.*only', title) or re.search(r'build\s*41\s*only', title)
|
|
if b41_only:
|
|
return False, "B41 only"
|
|
|
|
return False, "No B42 indicator found"
|
|
|
|
|
|
def has_special_characters(text: str) -> bool:
|
|
"""Check if text contains special characters that need attention."""
|
|
special = ["'", '"', "!", "&", "(", ")"]
|
|
return any(c in text for c in special)
|
|
|
|
|
|
def extract_collection_id(url_or_id: str) -> str:
|
|
"""Extract collection ID from URL or return as-is if already an ID."""
|
|
match = re.search(r'[?&]id=(\d+)', url_or_id)
|
|
return match.group(1) if match else url_or_id
|
|
|
|
|
|
def format_timestamp(unix_ts: int) -> str:
|
|
"""Format Unix timestamp as readable date."""
|
|
if not unix_ts:
|
|
return "Unknown"
|
|
return datetime.fromtimestamp(unix_ts).strftime("%Y-%m-%d")
|
|
|
|
|
|
def process_items(items: List[Dict]) -> Dict:
|
|
"""
|
|
Process workshop items and extract relevant information.
|
|
Returns a dict with processed data and analysis.
|
|
"""
|
|
processed = []
|
|
duplicates = {}
|
|
issues = []
|
|
|
|
for item in items:
|
|
workshop_id = item.get("publishedfileid", "unknown")
|
|
title = item.get("title", "Unknown")
|
|
mod_id = extract_mod_id(item)
|
|
b42_compat, b42_reason = check_b42_compatible(item)
|
|
last_updated = item.get("time_updated", 0)
|
|
result_code = item.get("result", 0)
|
|
|
|
entry = {
|
|
"workshop_id": workshop_id,
|
|
"title": title,
|
|
"mod_id": mod_id,
|
|
"b42_compatible": b42_compat,
|
|
"b42_reason": b42_reason,
|
|
"last_updated": format_timestamp(last_updated),
|
|
"has_special_chars": has_special_characters(mod_id or ""),
|
|
"result_code": result_code,
|
|
}
|
|
|
|
# Track duplicates by mod_id
|
|
if mod_id:
|
|
if mod_id in duplicates:
|
|
duplicates[mod_id].append(workshop_id)
|
|
else:
|
|
duplicates[mod_id] = [workshop_id]
|
|
|
|
# Track issues
|
|
if result_code != 1:
|
|
issues.append(f"Workshop item {workshop_id} returned error (result={result_code})")
|
|
if not mod_id:
|
|
issues.append(f"Workshop item {workshop_id} ({title}) has no Mod ID tag")
|
|
if entry["has_special_chars"]:
|
|
issues.append(f"Mod ID '{mod_id}' contains special characters")
|
|
|
|
processed.append(entry)
|
|
|
|
# Find actual duplicates (mod_id appearing more than once)
|
|
duplicate_mod_ids = {k: v for k, v in duplicates.items() if len(v) > 1}
|
|
|
|
return {
|
|
"items": processed,
|
|
"duplicates": duplicate_mod_ids,
|
|
"issues": issues,
|
|
"total_count": len(items),
|
|
"valid_count": len([i for i in processed if i["mod_id"]]),
|
|
}
|
|
|
|
|
|
def output_report(data: Dict) -> None:
|
|
"""Output human-readable report."""
|
|
print("\n" + "=" * 80)
|
|
print("STEAM WORKSHOP MOD ANALYSIS REPORT")
|
|
print("=" * 80)
|
|
|
|
print(f"\nTotal items: {data['total_count']}")
|
|
print(f"Valid items (with Mod ID): {data['valid_count']}")
|
|
|
|
if data["duplicates"]:
|
|
print(f"\n{'=' * 40}")
|
|
print("DUPLICATE MOD IDs:")
|
|
print(f"{'=' * 40}")
|
|
for mod_id, workshop_ids in data["duplicates"].items():
|
|
print(f" {mod_id}: {', '.join(workshop_ids)}")
|
|
|
|
if data["issues"]:
|
|
print(f"\n{'=' * 40}")
|
|
print("ISSUES:")
|
|
print(f"{'=' * 40}")
|
|
for issue in data["issues"]:
|
|
print(f" - {issue}")
|
|
|
|
print(f"\n{'=' * 40}")
|
|
print("MOD LIST:")
|
|
print(f"{'=' * 40}")
|
|
|
|
for item in data["items"]:
|
|
b42_status = "[B42]" if item["b42_compatible"] else "[???]"
|
|
special = " [SPECIAL CHARS]" if item["has_special_chars"] else ""
|
|
mod_id_display = item["mod_id"] or "<NO MOD ID>"
|
|
|
|
print(f"\n Workshop: {item['workshop_id']}")
|
|
print(f" Title: {item['title']}")
|
|
print(f" Mod ID: {mod_id_display}{special}")
|
|
print(f" Status: {b42_status} {item['b42_reason']}")
|
|
print(f" Updated: {item['last_updated']}")
|
|
|
|
|
|
def output_ansible(data: Dict) -> None:
|
|
"""Output ansible-ready configuration strings."""
|
|
# Get unique, valid mod IDs (preserving order, removing duplicates)
|
|
seen_workshop = set()
|
|
seen_mod_ids = set()
|
|
workshop_items = []
|
|
mod_ids = []
|
|
|
|
for item in data["items"]:
|
|
workshop_id = item["workshop_id"]
|
|
mod_id_str = item["mod_id"]
|
|
|
|
# Skip if we've seen this workshop item
|
|
if workshop_id in seen_workshop:
|
|
continue
|
|
seen_workshop.add(workshop_id)
|
|
workshop_items.append(workshop_id)
|
|
|
|
# Handle mod_id which may contain multiple IDs separated by semicolon
|
|
if mod_id_str:
|
|
for mod_id in mod_id_str.split(';'):
|
|
mod_id = mod_id.strip()
|
|
if mod_id and mod_id not in seen_mod_ids:
|
|
seen_mod_ids.add(mod_id)
|
|
mod_ids.append(mod_id)
|
|
|
|
# Format for Build 42 (backslash prefix)
|
|
workshop_str = ";".join(workshop_items)
|
|
mod_ids_str = ";".join(f"\\{mid}" for mid in mod_ids)
|
|
|
|
print("\n# Ansible Configuration for zomboid_mods")
|
|
print("# Copy these values to ansible/roles/podman/defaults/main.yml")
|
|
print("")
|
|
print("zomboid_mods:")
|
|
print(" workshop_items: >-")
|
|
print(f" {workshop_str}")
|
|
print(" mod_ids: >-")
|
|
print(f" {mod_ids_str}")
|
|
|
|
if data["duplicates"]:
|
|
print("\n# WARNING: The following Mod IDs had duplicates (kept first occurrence):")
|
|
for mod_id, workshop_ids in data["duplicates"].items():
|
|
print(f"# {mod_id}: {', '.join(workshop_ids)}")
|
|
|
|
if data["issues"]:
|
|
print("\n# Issues found:")
|
|
for issue in data["issues"]:
|
|
print(f"# - {issue}")
|
|
|
|
|
|
def output_json(data: Dict) -> None:
|
|
"""Output JSON data."""
|
|
print(json.dumps(data, indent=2))
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Query Steam Workshop for Project Zomboid mod details",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=__doc__
|
|
)
|
|
|
|
parser.add_argument(
|
|
"workshop_ids",
|
|
nargs="?",
|
|
help="Semicolon-separated workshop IDs (e.g., 'ID1;ID2;ID3')"
|
|
)
|
|
parser.add_argument(
|
|
"--collection", "-c",
|
|
help="Steam Workshop collection ID or URL"
|
|
)
|
|
parser.add_argument(
|
|
"--json", "-j",
|
|
action="store_true",
|
|
help="Output raw JSON data"
|
|
)
|
|
parser.add_argument(
|
|
"--ansible", "-a",
|
|
action="store_true",
|
|
help="Output ansible-ready configuration"
|
|
)
|
|
parser.add_argument(
|
|
"--report", "-r",
|
|
action="store_true",
|
|
help="Output human-readable report (default)"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Determine input source
|
|
if args.collection:
|
|
collection_id = extract_collection_id(args.collection)
|
|
print(f"Fetching collection {collection_id}...", file=sys.stderr)
|
|
item_ids = get_collection_items(collection_id)
|
|
if not item_ids:
|
|
print("Error: No items found in collection", file=sys.stderr)
|
|
sys.exit(1)
|
|
print(f"Found {len(item_ids)} items in collection", file=sys.stderr)
|
|
elif args.workshop_ids:
|
|
item_ids = [id.strip() for id in args.workshop_ids.split(";") if id.strip()]
|
|
else:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
# Query Steam API
|
|
print(f"Querying {len(item_ids)} workshop items...", file=sys.stderr)
|
|
items = query_all_workshop_items(item_ids)
|
|
print(f"Retrieved {len(items)} item details", file=sys.stderr)
|
|
|
|
# Process items
|
|
data = process_items(items)
|
|
|
|
# Output based on format
|
|
if args.json:
|
|
output_json(data)
|
|
elif args.ansible:
|
|
output_ansible(data)
|
|
else:
|
|
output_report(data)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|