Extending django-pyas2¶
A use case for extending django-pyas2 may be to have additional connectors, from which files are received, such as a message queue, or to run a directory monitor as a daemon to send messages as soon as a message has been written to an outbound directory (see directory structure), or to add additional functionalities, like a custom website to the root of the url etc.
One way to extend django-pyas2
is to use the django startapp command, that will create the
directory structure needed for an app. In this example we call the app “extend_pyas2”.
Please consult the extensive django documentation to learn more about these command. Below simply a description for your convenience to get started:
In the django_pyas2 project directory invoke the script as follows:
$ python manage.py startapp extend_pyas2
This has now created a new directory containing files that may be used for apps:
{PROJECT DIRECTORY}
└──django_pyas2
├── django_pyas2
│ ├── db.sqlite3
│ ├── manage.py
│ └── django_pyas2
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── extend_pyas2
├── apps.py
├── migrations
├── models.py
├── tests.py
└── views.py
In our example, we will add a new admin command that should monitor directories and trigger the sending of files to partners when they are written. For that purpose, we need to create some subfolders “management/commands” and a python file with the management command:
│ └── wsgi.py
└── extend_pyas2
├── apps.py
├── migrations
├── models.py
├── tests.py
├── views.py
└── management
└── commands
└── filewatcher.py
Add extend_pyas2
to your INSTALLED_APPS
settings, after pyas2
.
INSTALLED_APPS = (
...
'pyas2',
'extend_pyas2',
)
An example content for the filewatcher.py may be as follows and can be run with Django’s manage command:
$ python manage.py filewatcher
from django.core.management.base import BaseCommand, CommandError
from django.core.management import call_command
from django.utils.translation import ugettext as _
from pyas2.models import Organization
from pyas2.models import Partner
from pyas2 import settings
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserverVFS
from watchdog.events import PatternMatchingEventHandler
import time
import atexit
import socket
import os
import sys
import logging
logger = logging.getLogger('django')
DAEMONPORT = 16388
class FileWatchHandle(PatternMatchingEventHandler):
"""
FileWatchHandler that ignores directories. No Patterns defined by default. Any file in the
directory will be sent.
"""
def __init__(self, tasks, dir_watch):
super(FileWatchHandle, self).__init__(ignore_directories=True)
self.tasks = tasks
self.dir_watch = dir_watch
def handle_event(self, event):
self.tasks.add(
(self.dir_watch['organization'], self.dir_watch['partner'], event.src_path))
logger.info(u' "%(file)s" created. Adding to Task Queue.', {'file': event.src_path})
def on_modified(self, event):
self.handle_event(event)
def on_created(self, event):
self.handle_event(event)
class WatchdogObserversManager:
"""
Creates and manages a list of watchdog observers as daemons. All daemons will have the same
settings. By default, subdirectories are not searched.
:param: force_vfs : if the underlying filesystem is a network share, OS events cannot be
used reliably. Polling to be done, which is expensive.
"""
def __init__(self, is_daemon=True, force_vfs=False):
self.observers = []
self.is_daemon = is_daemon
self.force_vfs = force_vfs
def add_observer(self, tasks, dir_watch):
if self.force_vfs:
new_observer = PollingObserverVFS(stat=os.stat, listdir=os.listdir)
else:
new_observer = Observer()
new_observer.daemon = self.is_daemon
new_observer.schedule(FileWatchHandle(tasks, dir_watch),
dir_watch['path'], recursive=False)
new_observer.start()
self.observers.append(new_observer)
def stop_all(self):
for observer in self.observers:
observer.stop()
def join_all(self):
for observer in self.observers:
observer.join()
class Command(BaseCommand):
help = _(u'Daemon process that watches the outbox of all as2 partners and '
u'triggers sendmessage when files become available')
def handle(self, *args, **options):
logger.info(_(u'Starting PYAS2 send Watchdog daemon.'))
engine_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
engine_socket.bind(('127.0.0.1', DAEMONPORT))
except socket.error:
engine_socket.close()
raise CommandError(_(u'An instance of the send daemon is already running'))
else:
atexit.register(engine_socket.close)
tasks = set()
dir_watch_data = []
for partner in Partner.objects.all():
for org in Organization.objects.all():
outboxDir = os.path.join(settings.DATA_DIR,
'messages',
partner.as2_name,
'outbox',
org.as2_name)
if os.path.isdir(outboxDir):
dir_watch_data.append({})
dir_watch_data[-1]['path'] = outboxDir
dir_watch_data[-1]['organization'] = org.as2_name
dir_watch_data[-1]['partner'] = partner.as2_name
if not dir_watch_data:
logger.error(_(u'No partners have been configured!'))
sys.exit(0)
logger.info(_(u'Process existing files in the directory.'))
for dir_watch in dir_watch_data:
files = [f for f in os.listdir(dir_watch['path']) if
os.path.isfile(os.path.join(dir_watch['path'], f))]
for file in files:
logger.info(u'Send as2 message "%(file)s" from "%(org)s" to "%(partner)s".',
{'file': file,
'org': dir_watch['organization'],
'partner': dir_watch['partner']})
call_command('sendas2message', dir_watch['organization'], dir_watch['partner'],
os.path.join(dir_watch['path'], file), delete=True)
"""Add WatchDog Thread Here"""
logger.info(_(u'PYAS2 send Watchdog daemon started.'))
active_receiving = False
watchdog_file_observers = WatchdogObserversManager(is_daemon=True, force_vfs=True)
for dir_watch in dir_watch_data:
watchdog_file_observers.add_observer(tasks, dir_watch)
try:
logger.info(_(u'Watchdog awaiting tasks...'))
while True:
if tasks:
if not active_receiving:
# first request (after tasks have been fired, or startup of dirmonitor)
active_receiving = True
else: # active receiving events
for task in tasks:
logger.info(
u'Send as2 message "%(file)s" from "%(org)s" to "%(partner)s".',
{'file': task[2],
'org': task[0],
'partner': task[1]})
call_command('sendas2message', task[0], task[1], task[2],
delete=True)
tasks.clear()
active_receiving = False
time.sleep(2)
except (Exception, KeyboardInterrupt) as msg:
logger.info(u'Error in running task: "%(msg)s".', {'msg': msg})
logger.info(u'Stopping all running Watchdog threads...')
watchdog_file_observers.stop_all()
logger.info(u'All Watchdog threads stopped.')
logger.info(u'Waiting for all Watchdog threads to finish...')
watchdog_file_observers.join_all()
logger.info(u'All Watchdog threads finished. Exiting...')
sys.exit(0)