AYGP Daily Task Execution
Table of Contents
Overview
AYGP daily task execution and monitoring system.
Configuration Files
JSON Schema
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "DailyTaskReport",
"type": "object",
"required": ["date", "time", "agent", "execution_id", "tasks", "overall_status", "summary", "next_actions"],
"properties": {
"date": {
"type": "string",
"format": "date",
"description": "Date of task execution in YYYY-MM-DD format"
},
"time": {
"type": "string",
"format": "time",
"description": "Time of task execution in HH:MM:SS format"
},
"agent": {
"type": "string",
"const": "AYGP",
"description": "Agent identifier"
},
"execution_id": {
"type": "string",
"format": "uuid",
"description": "Unique execution identifier"
},
"tasks": {
"type": "object",
"required": ["repository_activity", "storage_sync", "notifications", "journal_updates"],
"properties": {
"repository_activity": {
"type": "object",
"required": ["status", "findings", "metrics", "actions_taken", "recommendations"],
"properties": {
"status": {
"type": "string",
"enum": ["completed", "partial", "failed"]
}
}
}
}
}
}
}
Task Configuration
{
"repositories": {
"check": [
"defrecord/infrastructure",
"defrecord/research",
"aygp-dr/aygp-dr",
"aygp-dr/aygp-dr.gpg"
],
"metrics": [
"commits_last_24h",
"open_pull_requests",
"recent_issues",
"workflow_runs"
]
},
"storage": {
"webdav": {
"paths": ["/research", "/backups", "/logs"],
"checks": ["permissions", "last_sync", "conflicts"]
},
"s3": {
"buckets": ["aygp-backup", "aygp-logs", "aygp-artifacts"],
"checks": ["replication", "versioning", "lifecycle"]
}
}
}
Task Implementation
Repository Check Script
#!/usr/bin/env python3 """Check repository status and activity.""" import os import sys import json import requests from datetime import datetime, timedelta GITHUB_TOKEN = os.environ.get('GITHUB_TOKEN') GITHUB_API = "https://api.github.com" def check_repository(repo): """Check single repository activity.""" headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } metrics = { "commits": 0, "pull_requests": 0, "issues": 0, "workflows": 0 } # Implementation here return metrics def main(): with open('src/config/task-config.json') as f: config = json.load(f) results = { "status": "completed", "findings": [], "metrics": {} } for repo in config['repositories']['check']: results['metrics'][repo] = check_repository(repo) print(json.dumps(results, indent=2)) if __name__ == "__main__": main()
Storage Sync Check
#!/usr/bin/env python3 """Verify storage synchronization status.""" import os import json import boto3 import requests from datetime import datetime def check_webdav(): """Check WebDAV synchronization.""" pass def check_s3(): """Check S3 bucket status.""" pass def main(): with open('src/config/task-config.json') as f: config = json.load(f) results = { "status": "completed", "checks": [], "metrics": {}, "issues": [], "resolutions": [] } # Implementation here print(json.dumps(results, indent=2)) if __name__ == "__main__": main()
Task Executor Script
#!/usr/bin/env python3 """Execute AYGP daily tasks.""" import os import sys import json import uuid from datetime import datetime import jsonschema def load_schema(): """Load JSON schema for validation.""" with open('src/schema/daily-task-schema.json') as f: return json.load(f) def execute_tasks(): """Execute all daily tasks.""" report = { "date": datetime.now().strftime('%Y-%m-%d'), "time": datetime.now().strftime('%H:%M:%S'), "agent": "AYGP", "execution_id": str(uuid.uuid4()), "tasks": {}, "overall_status": "healthy", "summary": "", "next_actions": [] } # Execute tasks and populate report return report def validate_report(report): """Validate report against schema.""" schema = load_schema() jsonschema.validate(report, schema) return True def main(): try: report = execute_tasks() if validate_report(report): output_file = f"~/.anthropic/reports/daily/{report['date']}.json" os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, 'w') as f: json.dump(report, f, indent=2) print(f"Report generated: {output_file}") except Exception as e: print(f"Error: {str(e)}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()
Task Execution
Run Tasks
./src/bin/aygp-daily-tasks \ --date $(date +%Y-%m-%d) \ --report-format json \ --schema-validate true \ --notify-on-completion \ --create-journal-entry
Verify Results
jq . reports/daily/$(date +%Y-%m-%d).json
Repository Structure
.
├── src/
│ ├── bin/
│ │ └── aygp-daily-tasks
│ ├── config/
│ │ └── task-config.json
│ ├── schema/
│ │ └── daily-task-schema.json
│ └── tasks/
│ ├── check_repos.py
│ └── check_storage.py
├── reports/
│ └── daily/
├── logs/
│ └── daily/
└── journal/
└── daily/
Success Criteria
Repository Checks
- All repositories accessed
- Activity metrics collected
- Security alerts reviewed
- PRs and issues triaged
Storage Verification
- All systems synced
- Permissions verified
- Conflicts resolved
- Backups confirmed
Notification Processing
- High-priority items handled
- Responses drafted
- Alerts categorized
- Follow-ups scheduled
Journal Updates
- Activities documented
- Metrics recorded
- Issues linked
- Follow-ups defined
Error Handling
Error Response Format
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ErrorResponse",
"type": "object",
"required": ["task", "type", "description", "timestamp", "severity"],
"properties": {
"task": {
"type": "string"
},
"type": {
"type": "string"
},
"description": {
"type": "string"
},
"timestamp": {
"type": "string",
"format": "date-time"
},
"severity": {
"type": "string",
"enum": ["info", "warning", "error", "critical"]
}
}
}