graylog updates, test.debyl.io, scripts for reference
This commit is contained in:
401
scripts/steam-workshop-query.py
Executable file
401
scripts/steam-workshop-query.py
Executable file
@@ -0,0 +1,401 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Steam Workshop Query Tool for Project Zomboid Mods
|
||||
|
||||
Queries Steam API to get mod details including correct Mod IDs with special characters.
|
||||
Useful for generating properly formatted mod lists for Build 42 servers.
|
||||
|
||||
Usage:
|
||||
# Query individual workshop items (semicolon-separated)
|
||||
python steam-workshop-query.py "ID1;ID2;ID3"
|
||||
|
||||
# Query from a Steam Workshop collection
|
||||
python steam-workshop-query.py --collection 3625776190
|
||||
python steam-workshop-query.py --collection "https://steamcommunity.com/sharedfiles/filedetails?id=3625776190"
|
||||
|
||||
# Output formats
|
||||
--json Output raw JSON data
|
||||
--ansible Output workshop_items and mod_ids strings for ansible config
|
||||
--report Human-readable report (default)
|
||||
|
||||
Examples:
|
||||
python steam-workshop-query.py "3171167894;3330403100" --ansible
|
||||
python steam-workshop-query.py --collection 3625776190 --report
|
||||
"""
|
||||
|
||||
import requests
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
import re
|
||||
import argparse
|
||||
from typing import List, Dict, Optional, Tuple
|
||||
from datetime import datetime
|
||||
|
||||
STEAM_API_DETAILS = "https://api.steampowered.com/ISteamRemoteStorage/GetPublishedFileDetails/v1/"
|
||||
STEAM_API_COLLECTION = "https://api.steampowered.com/ISteamRemoteStorage/GetCollectionDetails/v1/"
|
||||
BATCH_SIZE = 50 # Conservative batch size to avoid rate limits
|
||||
DELAY_BETWEEN_BATCHES = 1.0 # seconds
|
||||
|
||||
|
||||
def get_collection_items(collection_id: str) -> List[str]:
|
||||
"""Fetch all workshop item IDs from a Steam Workshop collection."""
|
||||
data = {"collectioncount": 1, "publishedfileids[0]": collection_id}
|
||||
response = requests.post(STEAM_API_COLLECTION, data=data)
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
|
||||
items = []
|
||||
collection_details = result.get("response", {}).get("collectiondetails", [])
|
||||
|
||||
if not collection_details:
|
||||
print(f"Warning: No collection found with ID {collection_id}", file=sys.stderr)
|
||||
return items
|
||||
|
||||
for coll in collection_details:
|
||||
if coll.get("result") != 1:
|
||||
print(f"Warning: Collection {collection_id} returned error result", file=sys.stderr)
|
||||
continue
|
||||
for child in coll.get("children", []):
|
||||
file_id = child.get("publishedfileid")
|
||||
if file_id:
|
||||
items.append(file_id)
|
||||
|
||||
return items
|
||||
|
||||
|
||||
def query_workshop_items_batch(item_ids: List[str]) -> List[Dict]:
|
||||
"""Query Steam API for a batch of workshop item details."""
|
||||
data = {"itemcount": len(item_ids)}
|
||||
for i, item_id in enumerate(item_ids):
|
||||
data[f"publishedfileids[{i}]"] = item_id
|
||||
|
||||
response = requests.post(STEAM_API_DETAILS, data=data)
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
|
||||
return result.get("response", {}).get("publishedfiledetails", [])
|
||||
|
||||
|
||||
def query_all_workshop_items(item_ids: List[str]) -> List[Dict]:
|
||||
"""Query Steam API for all workshop items, handling batching."""
|
||||
all_items = []
|
||||
|
||||
for i in range(0, len(item_ids), BATCH_SIZE):
|
||||
batch = item_ids[i:i + BATCH_SIZE]
|
||||
print(f"Querying batch {i // BATCH_SIZE + 1} ({len(batch)} items)...", file=sys.stderr)
|
||||
|
||||
items = query_workshop_items_batch(batch)
|
||||
all_items.extend(items)
|
||||
|
||||
# Delay between batches to avoid rate limiting
|
||||
if i + BATCH_SIZE < len(item_ids):
|
||||
time.sleep(DELAY_BETWEEN_BATCHES)
|
||||
|
||||
return all_items
|
||||
|
||||
|
||||
def extract_mod_id(item: Dict) -> Optional[str]:
|
||||
"""
|
||||
Extract Mod ID(s) from item description.
|
||||
PZ mods typically include 'Mod ID: xxx' in their description.
|
||||
Some mods have multiple Mod IDs on separate lines or comma-separated.
|
||||
"""
|
||||
description = item.get("description", "")
|
||||
|
||||
# Find ALL "Mod ID: xxx" patterns in description (multiple lines)
|
||||
matches = re.findall(r'Mod ID:\s*([^\r\n]+)', description, re.IGNORECASE)
|
||||
|
||||
if not matches:
|
||||
return None
|
||||
|
||||
all_mod_ids = []
|
||||
for match in matches:
|
||||
mod_id_str = match.strip().rstrip('.')
|
||||
# Handle comma or semicolon separated mod IDs on same line
|
||||
if ',' in mod_id_str:
|
||||
all_mod_ids.extend([m.strip() for m in mod_id_str.split(',')])
|
||||
elif ';' in mod_id_str:
|
||||
all_mod_ids.extend([m.strip() for m in mod_id_str.split(';')])
|
||||
else:
|
||||
all_mod_ids.append(mod_id_str)
|
||||
|
||||
# Remove empty strings and duplicates while preserving order
|
||||
seen = set()
|
||||
unique_ids = []
|
||||
for mod_id in all_mod_ids:
|
||||
if mod_id and mod_id not in seen:
|
||||
seen.add(mod_id)
|
||||
unique_ids.append(mod_id)
|
||||
|
||||
return ';'.join(unique_ids) if unique_ids else None
|
||||
|
||||
|
||||
def check_b42_compatible(item: Dict) -> Tuple[bool, str]:
|
||||
"""
|
||||
Check if mod appears to be B42 compatible.
|
||||
Returns (is_compatible, reason).
|
||||
"""
|
||||
title = item.get("title", "").lower()
|
||||
tags = [t.get("tag", "").lower() for t in item.get("tags", [])]
|
||||
all_tags_str = " ".join(tags)
|
||||
|
||||
# B42 indicators in title or tags
|
||||
b42_patterns = [
|
||||
r'\bb42\b',
|
||||
r'build\s*42',
|
||||
r'\b42\.\d+',
|
||||
r'\[b42\]',
|
||||
r'\(b42\)',
|
||||
]
|
||||
|
||||
for pattern in b42_patterns:
|
||||
if re.search(pattern, title) or re.search(pattern, all_tags_str):
|
||||
return True, "B42 mentioned in title/tags"
|
||||
|
||||
# Check for B41 only indicators (might not be compatible)
|
||||
b41_only = re.search(r'\bb41\b.*only', title) or re.search(r'build\s*41\s*only', title)
|
||||
if b41_only:
|
||||
return False, "B41 only"
|
||||
|
||||
return False, "No B42 indicator found"
|
||||
|
||||
|
||||
def has_special_characters(text: str) -> bool:
|
||||
"""Check if text contains special characters that need attention."""
|
||||
special = ["'", '"', "!", "&", "(", ")"]
|
||||
return any(c in text for c in special)
|
||||
|
||||
|
||||
def extract_collection_id(url_or_id: str) -> str:
|
||||
"""Extract collection ID from URL or return as-is if already an ID."""
|
||||
match = re.search(r'[?&]id=(\d+)', url_or_id)
|
||||
return match.group(1) if match else url_or_id
|
||||
|
||||
|
||||
def format_timestamp(unix_ts: int) -> str:
|
||||
"""Format Unix timestamp as readable date."""
|
||||
if not unix_ts:
|
||||
return "Unknown"
|
||||
return datetime.fromtimestamp(unix_ts).strftime("%Y-%m-%d")
|
||||
|
||||
|
||||
def process_items(items: List[Dict]) -> Dict:
|
||||
"""
|
||||
Process workshop items and extract relevant information.
|
||||
Returns a dict with processed data and analysis.
|
||||
"""
|
||||
processed = []
|
||||
duplicates = {}
|
||||
issues = []
|
||||
|
||||
for item in items:
|
||||
workshop_id = item.get("publishedfileid", "unknown")
|
||||
title = item.get("title", "Unknown")
|
||||
mod_id = extract_mod_id(item)
|
||||
b42_compat, b42_reason = check_b42_compatible(item)
|
||||
last_updated = item.get("time_updated", 0)
|
||||
result_code = item.get("result", 0)
|
||||
|
||||
entry = {
|
||||
"workshop_id": workshop_id,
|
||||
"title": title,
|
||||
"mod_id": mod_id,
|
||||
"b42_compatible": b42_compat,
|
||||
"b42_reason": b42_reason,
|
||||
"last_updated": format_timestamp(last_updated),
|
||||
"has_special_chars": has_special_characters(mod_id or ""),
|
||||
"result_code": result_code,
|
||||
}
|
||||
|
||||
# Track duplicates by mod_id
|
||||
if mod_id:
|
||||
if mod_id in duplicates:
|
||||
duplicates[mod_id].append(workshop_id)
|
||||
else:
|
||||
duplicates[mod_id] = [workshop_id]
|
||||
|
||||
# Track issues
|
||||
if result_code != 1:
|
||||
issues.append(f"Workshop item {workshop_id} returned error (result={result_code})")
|
||||
if not mod_id:
|
||||
issues.append(f"Workshop item {workshop_id} ({title}) has no Mod ID tag")
|
||||
if entry["has_special_chars"]:
|
||||
issues.append(f"Mod ID '{mod_id}' contains special characters")
|
||||
|
||||
processed.append(entry)
|
||||
|
||||
# Find actual duplicates (mod_id appearing more than once)
|
||||
duplicate_mod_ids = {k: v for k, v in duplicates.items() if len(v) > 1}
|
||||
|
||||
return {
|
||||
"items": processed,
|
||||
"duplicates": duplicate_mod_ids,
|
||||
"issues": issues,
|
||||
"total_count": len(items),
|
||||
"valid_count": len([i for i in processed if i["mod_id"]]),
|
||||
}
|
||||
|
||||
|
||||
def output_report(data: Dict) -> None:
|
||||
"""Output human-readable report."""
|
||||
print("\n" + "=" * 80)
|
||||
print("STEAM WORKSHOP MOD ANALYSIS REPORT")
|
||||
print("=" * 80)
|
||||
|
||||
print(f"\nTotal items: {data['total_count']}")
|
||||
print(f"Valid items (with Mod ID): {data['valid_count']}")
|
||||
|
||||
if data["duplicates"]:
|
||||
print(f"\n{'=' * 40}")
|
||||
print("DUPLICATE MOD IDs:")
|
||||
print(f"{'=' * 40}")
|
||||
for mod_id, workshop_ids in data["duplicates"].items():
|
||||
print(f" {mod_id}: {', '.join(workshop_ids)}")
|
||||
|
||||
if data["issues"]:
|
||||
print(f"\n{'=' * 40}")
|
||||
print("ISSUES:")
|
||||
print(f"{'=' * 40}")
|
||||
for issue in data["issues"]:
|
||||
print(f" - {issue}")
|
||||
|
||||
print(f"\n{'=' * 40}")
|
||||
print("MOD LIST:")
|
||||
print(f"{'=' * 40}")
|
||||
|
||||
for item in data["items"]:
|
||||
b42_status = "[B42]" if item["b42_compatible"] else "[???]"
|
||||
special = " [SPECIAL CHARS]" if item["has_special_chars"] else ""
|
||||
mod_id_display = item["mod_id"] or "<NO MOD ID>"
|
||||
|
||||
print(f"\n Workshop: {item['workshop_id']}")
|
||||
print(f" Title: {item['title']}")
|
||||
print(f" Mod ID: {mod_id_display}{special}")
|
||||
print(f" Status: {b42_status} {item['b42_reason']}")
|
||||
print(f" Updated: {item['last_updated']}")
|
||||
|
||||
|
||||
def output_ansible(data: Dict) -> None:
|
||||
"""Output ansible-ready configuration strings."""
|
||||
# Get unique, valid mod IDs (preserving order, removing duplicates)
|
||||
seen_workshop = set()
|
||||
seen_mod_ids = set()
|
||||
workshop_items = []
|
||||
mod_ids = []
|
||||
|
||||
for item in data["items"]:
|
||||
workshop_id = item["workshop_id"]
|
||||
mod_id_str = item["mod_id"]
|
||||
|
||||
# Skip if we've seen this workshop item
|
||||
if workshop_id in seen_workshop:
|
||||
continue
|
||||
seen_workshop.add(workshop_id)
|
||||
workshop_items.append(workshop_id)
|
||||
|
||||
# Handle mod_id which may contain multiple IDs separated by semicolon
|
||||
if mod_id_str:
|
||||
for mod_id in mod_id_str.split(';'):
|
||||
mod_id = mod_id.strip()
|
||||
if mod_id and mod_id not in seen_mod_ids:
|
||||
seen_mod_ids.add(mod_id)
|
||||
mod_ids.append(mod_id)
|
||||
|
||||
# Format for Build 42 (backslash prefix)
|
||||
workshop_str = ";".join(workshop_items)
|
||||
mod_ids_str = ";".join(f"\\{mid}" for mid in mod_ids)
|
||||
|
||||
print("\n# Ansible Configuration for zomboid_mods")
|
||||
print("# Copy these values to ansible/roles/podman/defaults/main.yml")
|
||||
print("")
|
||||
print("zomboid_mods:")
|
||||
print(" workshop_items: >-")
|
||||
print(f" {workshop_str}")
|
||||
print(" mod_ids: >-")
|
||||
print(f" {mod_ids_str}")
|
||||
|
||||
if data["duplicates"]:
|
||||
print("\n# WARNING: The following Mod IDs had duplicates (kept first occurrence):")
|
||||
for mod_id, workshop_ids in data["duplicates"].items():
|
||||
print(f"# {mod_id}: {', '.join(workshop_ids)}")
|
||||
|
||||
if data["issues"]:
|
||||
print("\n# Issues found:")
|
||||
for issue in data["issues"]:
|
||||
print(f"# - {issue}")
|
||||
|
||||
|
||||
def output_json(data: Dict) -> None:
|
||||
"""Output JSON data."""
|
||||
print(json.dumps(data, indent=2))
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Query Steam Workshop for Project Zomboid mod details",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog=__doc__
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"workshop_ids",
|
||||
nargs="?",
|
||||
help="Semicolon-separated workshop IDs (e.g., 'ID1;ID2;ID3')"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--collection", "-c",
|
||||
help="Steam Workshop collection ID or URL"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--json", "-j",
|
||||
action="store_true",
|
||||
help="Output raw JSON data"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ansible", "-a",
|
||||
action="store_true",
|
||||
help="Output ansible-ready configuration"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--report", "-r",
|
||||
action="store_true",
|
||||
help="Output human-readable report (default)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Determine input source
|
||||
if args.collection:
|
||||
collection_id = extract_collection_id(args.collection)
|
||||
print(f"Fetching collection {collection_id}...", file=sys.stderr)
|
||||
item_ids = get_collection_items(collection_id)
|
||||
if not item_ids:
|
||||
print("Error: No items found in collection", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
print(f"Found {len(item_ids)} items in collection", file=sys.stderr)
|
||||
elif args.workshop_ids:
|
||||
item_ids = [id.strip() for id in args.workshop_ids.split(";") if id.strip()]
|
||||
else:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
# Query Steam API
|
||||
print(f"Querying {len(item_ids)} workshop items...", file=sys.stderr)
|
||||
items = query_all_workshop_items(item_ids)
|
||||
print(f"Retrieved {len(items)} item details", file=sys.stderr)
|
||||
|
||||
# Process items
|
||||
data = process_items(items)
|
||||
|
||||
# Output based on format
|
||||
if args.json:
|
||||
output_json(data)
|
||||
elif args.ansible:
|
||||
output_ansible(data)
|
||||
else:
|
||||
output_report(data)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user