Penso che tu faccia alcune ottime domande che evidenziano quanto SWF possa essere utile come servizio. In breve, non dici ai tuoi server di coordinare il lavoro tra di loro. Il tuo decisore orchestra tutto questo per te, con l'aiuto del servizio SWF.
L'implementazione del tuo flusso di lavoro sarà la seguente:
- Registrazione del tuo flusso di lavoro e delle sue attività con il servizio (una tantum).
- Implementa il decisore e i lavoratori.
- Lascia correre i tuoi lavoratori e decisori.
- Avvia un nuovo flusso di lavoro.
Esistono diversi modi per inserire le credenziali nel codice di boto.swf. Ai fini di questo esercizio, consiglio di esportarli nell'ambiente prima di eseguire il codice seguente:
export AWS_ACCESS_KEY_ID=<your access key>
export AWS_SECRET_ACCESS_KEY=<your secret key>
1) Per registrare il dominio, il flusso di lavoro e le attività eseguire quanto segue:
# ab_setup.py
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()
2) Implementare ed eseguire decisori e lavoratori.
# ab_decider.py
import time
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
class ABDecider(swf.Decider):
domain = DOMAIN
task_list = 'default_tasks'
version = VERSION
def run(self):
history = self.poll()
# Print history to familiarize yourself with its format.
print history
if 'events' in history:
# Get a list of non-decision events to see what event came in last.
workflow_events = [e for e in history['events']
if not e['eventType'].startswith('Decision')]
decisions = swf.Layer1Decisions()
# Record latest non-decision event.
last_event = workflow_events[-1]
last_event_type = last_event['eventType']
if last_event_type == 'WorkflowExecutionStarted':
# At the start, get the worker to fetch the first assignment.
decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()),
ACTIVITY1, VERSION, task_list='a_tasks')
elif last_event_type == 'ActivityTaskCompleted':
# Take decision based on the name of activity that has just completed.
# 1) Get activity's event id.
last_event_attrs = last_event['activityTaskCompletedEventAttributes']
completed_activity_id = last_event_attrs['scheduledEventId'] - 1
# 2) Extract its name.
activity_data = history['events'][completed_activity_id]
activity_attrs = activity_data['activityTaskScheduledEventAttributes']
activity_name = activity_attrs['activityType']['name']
# 3) Optionally, get the result from the activity.
result = last_event['activityTaskCompletedEventAttributes'].get('result')
# Take the decision.
if activity_name == ACTIVITY1:
# Completed ACTIVITY1 just came in. Kick off ACTIVITY2.
decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()),
ACTIVITY2, VERSION, task_list='b_tasks', input=result)
elif activity_name == ACTIVITY2:
# Server B completed activity. We're done.
decisions.complete_workflow_execution()
self.complete(decisions=decisions)
return True
I lavoratori sono molto più semplici, non è necessario utilizzare l'ereditarietà se non si desidera.
# ab_worker.py
import os
import time
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
class MyBaseWorker(swf.ActivityWorker):
domain = DOMAIN
version = VERSION
task_list = None
def run(self):
activity_task = self.poll()
print activity_task
if 'activityId' in activity_task:
# Get input.
# Get the method for the requested activity.
try:
self.activity(activity_task.get('input'))
except Exception, error:
self.fail(reason=str(error))
raise error
return True
def activity(self, activity_input):
raise NotImplementedError
class WorkerA(MyBaseWorker):
task_list = 'a_tasks'
def activity(self, activity_input):
result = str(time.time())
print 'worker a reporting time: %s' % result
self.complete(result=result)
class WorkerB(MyBaseWorker):
task_list = 'b_tasks'
def activity(self, activity_input):
result = str(os.getpid())
print 'worker b returning pid: %s' % result
self.complete(result=result)
3) Esegui i tuoi decisori e lavoratori. Il tuo decisore e lavoratori possono essere eseguiti da host separati o dalla stessa macchina. Apri quattro terminali ed esegui i tuoi attori:
Prima il tuo decisore
$ python -i ab_decider.py
>>> while ABDecider().run(): pass
...
Quindi lavoratore A, puoi farlo dal server A:
$ python -i ab_workers.py
>>> while WorkerA().run(): pass
Quindi il lavoratore B, possibilmente dal server B ma se li esegui tutti da un laptop funzionerà altrettanto bene:
$ python -i ab_workers.py
>>> while WorkerB().run(): pass
...
4) Infine, avvia il flusso di lavoro.
$ python
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41)
[GCC 4.4.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto.swf.layer2 as swf
>>> workflows = swf.Domain(name='stackoverflow').workflows()
>>> workflows
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>]
>>> execution = workflows[0].start(task_list='default_tasks')
>>>
Torna indietro per vedere cosa succede con i tuoi attori. Potrebbero disconnettersi dal servizio dopo un minuto di inattività. In tal caso, premi freccia su + invio per rientrare nel ciclo di polling.
Ora puoi andare al pannello SWF della tua console di gestione AWS, controllare come stanno andando le esecuzioni e visualizzarne la cronologia. In alternativa, puoi richiederlo tramite la riga di comando.
>>> execution.history()
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted',
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'},
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy':
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version':
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2,
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes':
{'startToCloseTimeout': '300', 'taskList': {'name': ...
Questo è solo un esempio di un flusso di lavoro con l'esecuzione seriale delle attività, ma è anche possibile per il decisore programmare e coordinare l'esecuzione parallela delle attività.
Spero che questo almeno ti faccia iniziare. Per un esempio leggermente più complesso di un flusso di lavoro seriale, consiglio di guardare questo.
Non ho alcun codice di esempio da condividere, ma puoi sicuramente utilizzare SWF per coordinare l'esecuzione degli script su due server. L'idea principale è quella di creare tre parti di codice che parlino con SWF:
- Un componente che sa quale script eseguire per primo e cosa fare una volta terminata l'esecuzione del primo script. Questo è chiamato il "decisore" in termini SWF.
- Due componenti che comprendono ciascuno come eseguire lo script specifico che si desidera eseguire su ciascuna macchina. Questi sono chiamati "lavoratori attivi" in termini SWF.
Il primo componente, il decisore, chiama due API SWF:PollForDecisionTask e RespondDecisionTaskCompleted. La richiesta di sondaggio fornirà al componente decisore la cronologia corrente di un flusso di lavoro in esecuzione, in pratica le informazioni sullo stato "dove mi trovo" per il tuo script runner. Scrivi codice che esamina questi eventi e capisci quale script deve essere eseguito. Questi "comandi" per eseguire uno script sarebbero sotto forma di una pianificazione di un'attività, che viene restituita come parte della chiamata a RespondDecisionTaskCompleted.
I secondi componenti che scrivi, gli activity worker, chiamano ciascuno due API SWF:PollForActivityTask e RespondActivityTaskCompleted. La richiesta di polling darà all'operatore dell'attività un'indicazione che deve eseguire lo script di cui è a conoscenza, ciò che SWF chiama task dell'attività. Le informazioni restituite dalla richiesta di polling a SWF possono includere singoli dati specifici dell'esecuzione che sono stati inviati a SWF come parte della pianificazione dell'attività dell'attività. Ciascuno dei tuoi server eseguirà in modo indipendente il polling SWF per attività di attività per indicare l'esecuzione dello script locale su quell'host. Una volta che l'operatore ha terminato di eseguire lo script, richiama SWF tramite l'API RespondActivityTaskCompleted.
La richiamata dal tuo addetto alle attività a SWF si traduce in una nuova cronologia che viene distribuita al componente decisore che ho già menzionato. Esaminerà la cronologia, vedrà che il primo script è terminato e pianificherà l'esecuzione del secondo. Una volta che vede che il secondo è stato fatto, può "chiudere" il flusso di lavoro utilizzando un altro tipo di decisione.
Avvia l'intero processo di esecuzione degli script su ciascun host chiamando l'API StartWorkflowExecution. Questo crea la registrazione dell'intero processo in SWF e invia la prima cronologia al processo decisivo per programmare l'esecuzione del primo script sul primo host.
Si spera che questo dia un po' più di contesto su come realizzare questo tipo di flusso di lavoro utilizzando SWF. Se non l'hai già fatto, darei un'occhiata alla guida per gli sviluppatori sulla pagina SWF per ulteriori informazioni.