Extending django-pyas2

A use case for extending django-pyas2 may be to have additional connectors, from which files are received, such as a message queue, or to run a directory monitor as a daemon to send messages as soon as a message has been written to an outbound directory (see directory structure), or to add additional functionalities, like a custom website to the root of the url etc.

One way to extend django-pyas2 is to use the django startapp command, that will create the directory structure needed for an app. In this example we call the app “extend_pyas2”.

Please consult the extensive django documentation to learn more about these command. Below simply a description for your convenience to get started:

In the django_pyas2 project directory invoke the script as follows:

$ python manage.py startapp extend_pyas2

This has now created a new directory containing files that may be used for apps:

{PROJECT DIRECTORY}
└──django_pyas2
    ├── django_pyas2
    │   ├── db.sqlite3
    │   ├── manage.py
    │   └── django_pyas2
    │       ├── settings.py
    │       ├── urls.py
    │       └── wsgi.py
    └── extend_pyas2
        ├── apps.py
        ├── migrations
        ├── models.py
        ├── tests.py
        └── views.py

In our example, we will add a new admin command that should monitor directories and trigger the sending of files to partners when they are written. For that purpose, we need to create some subfolders “management/commands” and a python file with the management command:

│       └── wsgi.py
└── extend_pyas2
    ├── apps.py
    ├── migrations
    ├── models.py
    ├── tests.py
    ├── views.py
    └── management
        └── commands
            └── filewatcher.py

Add extend_pyas2 to your INSTALLED_APPS settings, after pyas2.

INSTALLED_APPS = (
    ...
    'pyas2',
    'extend_pyas2',
)

An example content for the filewatcher.py may be as follows and can be run with Django’s manage command:

$ python manage.py filewatcher
from django.core.management.base import BaseCommand, CommandError
from django.core.management import call_command
from django.utils.translation import ugettext as _
from pyas2.models import Organization
from pyas2.models import Partner
from pyas2 import settings
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserverVFS
from watchdog.events import PatternMatchingEventHandler
import time
import atexit
import socket
import os
import sys
import logging

logger = logging.getLogger('django')

DAEMONPORT = 16388


class FileWatchHandle(PatternMatchingEventHandler):
    """
    FileWatchHandler that ignores directories. No Patterns defined by default. Any file in the
    directory will be sent.
    """
    def __init__(self, tasks, dir_watch):
        super(FileWatchHandle, self).__init__(ignore_directories=True)
        self.tasks = tasks
        self.dir_watch = dir_watch

    def handle_event(self, event):
        self.tasks.add(
            (self.dir_watch['organization'], self.dir_watch['partner'], event.src_path))
        logger.info(u' "%(file)s" created. Adding to Task Queue.', {'file': event.src_path})

    def on_modified(self, event):
        self.handle_event(event)

    def on_created(self, event):
        self.handle_event(event)


class WatchdogObserversManager:
    """
    Creates and manages a list of watchdog observers as daemons. All daemons will have the same
    settings. By default, subdirectories are not searched.
    :param: force_vfs : if the underlying filesystem is a network share, OS events cannot be
                        used reliably. Polling to be done, which is expensive.
    """
    def __init__(self, is_daemon=True, force_vfs=False):
        self.observers = []
        self.is_daemon = is_daemon
        self.force_vfs = force_vfs

    def add_observer(self, tasks, dir_watch):
        if self.force_vfs:
            new_observer = PollingObserverVFS(stat=os.stat, listdir=os.listdir)
        else:
            new_observer = Observer()
        new_observer.daemon = self.is_daemon
        new_observer.schedule(FileWatchHandle(tasks, dir_watch),
                              dir_watch['path'], recursive=False)
        new_observer.start()
        self.observers.append(new_observer)

    def stop_all(self):
        for observer in self.observers:
            observer.stop()

    def join_all(self):
        for observer in self.observers:
            observer.join()


class Command(BaseCommand):
    help = _(u'Daemon process that watches the outbox of all as2 partners and '
             u'triggers sendmessage when files become available')

    def handle(self, *args, **options):
        logger.info(_(u'Starting PYAS2 send Watchdog daemon.'))
        engine_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            engine_socket.bind(('127.0.0.1', DAEMONPORT))
        except socket.error:
            engine_socket.close()
            raise CommandError(_(u'An instance of the send daemon is already running'))
        else:
            atexit.register(engine_socket.close)

        tasks = set()
        dir_watch_data = []

        for partner in Partner.objects.all():
            for org in Organization.objects.all():
                outboxDir  = os.path.join(settings.DATA_DIR,
                                      'messages',
                                      partner.as2_name,
                                      'outbox',
                                      org.as2_name)
                if os.path.isdir(outboxDir):
                    dir_watch_data.append({})
                    dir_watch_data[-1]['path'] = outboxDir
                    dir_watch_data[-1]['organization'] = org.as2_name
                    dir_watch_data[-1]['partner'] = partner.as2_name

        if not dir_watch_data:
            logger.error(_(u'No partners have been configured!'))
            sys.exit(0)

        logger.info(_(u'Process existing files in the directory.'))
        for dir_watch in dir_watch_data:
            files = [f for f in os.listdir(dir_watch['path']) if
                     os.path.isfile(os.path.join(dir_watch['path'], f))]
            for file in files:
                logger.info(u'Send as2 message "%(file)s" from "%(org)s" to "%(partner)s".',
                            {'file': file,
                             'org': dir_watch['organization'],
                             'partner': dir_watch['partner']})

                call_command('sendas2message', dir_watch['organization'], dir_watch['partner'],
                             os.path.join(dir_watch['path'], file), delete=True)

        """Add WatchDog Thread Here"""
        logger.info(_(u'PYAS2 send Watchdog daemon started.'))
        active_receiving = False
        watchdog_file_observers = WatchdogObserversManager(is_daemon=True, force_vfs=True)
        for dir_watch in dir_watch_data:
            watchdog_file_observers.add_observer(tasks, dir_watch)
        try:
            logger.info(_(u'Watchdog awaiting tasks...'))
            while True:
                if tasks:
                    if not active_receiving:
                        # first request (after tasks have been fired, or startup of dirmonitor)
                        active_receiving = True
                    else:  # active receiving events
                        for task in tasks:
                            logger.info(
                                u'Send as2 message "%(file)s" from "%(org)s" to "%(partner)s".',
                                {'file': task[2],
                                 'org': task[0],
                                 'partner': task[1]})

                            call_command('sendas2message', task[0], task[1], task[2],
                                         delete=True)
                        tasks.clear()
                        active_receiving = False
                time.sleep(2)

        except (Exception, KeyboardInterrupt) as msg:
            logger.info(u'Error in running task: "%(msg)s".', {'msg': msg})
            logger.info(u'Stopping all running Watchdog threads...')
            watchdog_file_observers.stop_all()
            logger.info(u'All Watchdog threads stopped.')

        logger.info(u'Waiting for all Watchdog threads to finish...')
        watchdog_file_observers.join_all()
        logger.info(u'All Watchdog threads finished. Exiting...')
        sys.exit(0)