AYGP Daily Task Execution
Table of Contents
Overview
AYGP daily task execution and monitoring system.
Configuration Files
JSON Schema
{ "$schema": "http://json-schema.org/draft-07/schema#", "title": "DailyTaskReport", "type": "object", "required": ["date", "time", "agent", "execution_id", "tasks", "overall_status", "summary", "next_actions"], "properties": { "date": { "type": "string", "format": "date", "description": "Date of task execution in YYYY-MM-DD format" }, "time": { "type": "string", "format": "time", "description": "Time of task execution in HH:MM:SS format" }, "agent": { "type": "string", "const": "AYGP", "description": "Agent identifier" }, "execution_id": { "type": "string", "format": "uuid", "description": "Unique execution identifier" }, "tasks": { "type": "object", "required": ["repository_activity", "storage_sync", "notifications", "journal_updates"], "properties": { "repository_activity": { "type": "object", "required": ["status", "findings", "metrics", "actions_taken", "recommendations"], "properties": { "status": { "type": "string", "enum": ["completed", "partial", "failed"] } } } } } } }
Task Configuration
{ "repositories": { "check": [ "defrecord/infrastructure", "defrecord/research", "aygp-dr/aygp-dr", "aygp-dr/aygp-dr.gpg" ], "metrics": [ "commits_last_24h", "open_pull_requests", "recent_issues", "workflow_runs" ] }, "storage": { "webdav": { "paths": ["/research", "/backups", "/logs"], "checks": ["permissions", "last_sync", "conflicts"] }, "s3": { "buckets": ["aygp-backup", "aygp-logs", "aygp-artifacts"], "checks": ["replication", "versioning", "lifecycle"] } } }
Task Implementation
Repository Check Script
#!/usr/bin/env python3 """Check repository status and activity.""" import os import sys import json import requests from datetime import datetime, timedelta GITHUB_TOKEN = os.environ.get('GITHUB_TOKEN') GITHUB_API = "https://api.github.com" def check_repository(repo): """Check single repository activity.""" headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } metrics = { "commits": 0, "pull_requests": 0, "issues": 0, "workflows": 0 } # Implementation here return metrics def main(): with open('src/config/task-config.json') as f: config = json.load(f) results = { "status": "completed", "findings": [], "metrics": {} } for repo in config['repositories']['check']: results['metrics'][repo] = check_repository(repo) print(json.dumps(results, indent=2)) if __name__ == "__main__": main()
Storage Sync Check
#!/usr/bin/env python3 """Verify storage synchronization status.""" import os import json import boto3 import requests from datetime import datetime def check_webdav(): """Check WebDAV synchronization.""" pass def check_s3(): """Check S3 bucket status.""" pass def main(): with open('src/config/task-config.json') as f: config = json.load(f) results = { "status": "completed", "checks": [], "metrics": {}, "issues": [], "resolutions": [] } # Implementation here print(json.dumps(results, indent=2)) if __name__ == "__main__": main()
Task Executor Script
#!/usr/bin/env python3 """Execute AYGP daily tasks.""" import os import sys import json import uuid from datetime import datetime import jsonschema def load_schema(): """Load JSON schema for validation.""" with open('src/schema/daily-task-schema.json') as f: return json.load(f) def execute_tasks(): """Execute all daily tasks.""" report = { "date": datetime.now().strftime('%Y-%m-%d'), "time": datetime.now().strftime('%H:%M:%S'), "agent": "AYGP", "execution_id": str(uuid.uuid4()), "tasks": {}, "overall_status": "healthy", "summary": "", "next_actions": [] } # Execute tasks and populate report return report def validate_report(report): """Validate report against schema.""" schema = load_schema() jsonschema.validate(report, schema) return True def main(): try: report = execute_tasks() if validate_report(report): output_file = f"~/.anthropic/reports/daily/{report['date']}.json" os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, 'w') as f: json.dump(report, f, indent=2) print(f"Report generated: {output_file}") except Exception as e: print(f"Error: {str(e)}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()
Task Execution
Run Tasks
./src/bin/aygp-daily-tasks \ --date $(date +%Y-%m-%d) \ --report-format json \ --schema-validate true \ --notify-on-completion \ --create-journal-entry
Verify Results
jq . reports/daily/$(date +%Y-%m-%d).json
Repository Structure
. ├── src/ │ ├── bin/ │ │ └── aygp-daily-tasks │ ├── config/ │ │ └── task-config.json │ ├── schema/ │ │ └── daily-task-schema.json │ └── tasks/ │ ├── check_repos.py │ └── check_storage.py ├── reports/ │ └── daily/ ├── logs/ │ └── daily/ └── journal/ └── daily/
Success Criteria
Repository Checks
- All repositories accessed
- Activity metrics collected
- Security alerts reviewed
- PRs and issues triaged
Storage Verification
- All systems synced
- Permissions verified
- Conflicts resolved
- Backups confirmed
Notification Processing
- High-priority items handled
- Responses drafted
- Alerts categorized
- Follow-ups scheduled
Journal Updates
- Activities documented
- Metrics recorded
- Issues linked
- Follow-ups defined
Error Handling
Error Response Format
{ "$schema": "http://json-schema.org/draft-07/schema#", "title": "ErrorResponse", "type": "object", "required": ["task", "type", "description", "timestamp", "severity"], "properties": { "task": { "type": "string" }, "type": { "type": "string" }, "description": { "type": "string" }, "timestamp": { "type": "string", "format": "date-time" }, "severity": { "type": "string", "enum": ["info", "warning", "error", "critical"] } } }