Source code for pyramid_frontend.images.view

from __future__ import absolute_import, print_function, division

import os.path
import mimetypes
import pkg_resources

from pyramid.httpexceptions import HTTPNotFound
from pyramid.response import Response
from pyramid.static import FileResponse
from pyramid.settings import asbool
from lockfile import FileLock

from .files import filter_sep, prefix_for_name, processed_path, original_path


plausible_extensions = set([
    'jpg',
    'jpeg',
    'gif',
    'png',
    'tif',
    'tiff',
    'ppm',
    'bmp',
    'tga',
])


def get_image_filter(registry, filter_key):
    filter_registry = getattr(registry, 'image_filter_registry', {})
    chain, with_theme = filter_registry[filter_key]
    return chain


[docs]class MissingOriginal(Exception):
def __init__(self, path, chain): self.path = path self.chain = chain Exception.__init__(self, 'Missing file %s' % path) def process_image(settings, name, original_ext, chain, overwrite=False): proc_path = processed_path(settings, name, original_ext, chain) if overwrite or (not os.path.exists(proc_path)): dest_dir = os.path.dirname(proc_path) try: os.makedirs(dest_dir) except OSError: pass lock = FileLock(proc_path + '.lock') with lock: if overwrite or (not os.path.exists(proc_path)): orig_path = original_path(settings, name, original_ext) if not os.path.exists(orig_path): raise MissingOriginal(path=orig_path, chain=chain) image_data = open(orig_path, 'rb') chain.run(proc_path, image_data) return proc_path class ImageView(object): def __init__(self, request): self.request = request def placeholder(self, chain): orig_path = pkg_resources.resource_filename('pyramid_frontend.images', 'no-image.png') image_data = open(orig_path, 'rb') f = chain.run_chain(image_data) response = Response(f.read()) response.content_type = \ mimetypes.guess_type('x.%s' % chain.extension)[0] return response def __call__(self): request = self.request settings = request.registry.settings url_prefix = self.request.matchdict['prefix'] name = self.request.matchdict['name'] name, ext = name.rsplit('.', 1) if filter_sep in name: parts = name.split(filter_sep, 2) if len(parts) == 3: name, original_ext, chain_name = parts else: raise HTTPNotFound() else: original_ext = ext chain_name = None try: chain = get_image_filter(request.registry, chain_name) except KeyError: raise HTTPNotFound() if ((chain.extension and chain.extension != ext) or prefix_for_name(name) != url_prefix): raise HTTPNotFound() if original_ext not in plausible_extensions: raise HTTPNotFound() debug = asbool(settings.get('pyramid_frontend.debug')) overwrite = debug and request.params.get('overwrite') try: proc_path = process_image(settings, name, original_ext, chain, overwrite=overwrite) except MissingOriginal: if debug: return self.placeholder(chain) else: raise return FileResponse(proc_path, self.request)