# Examples

TIP

Read this first if you need a new project

Note that on all the examples, you will also get these url's

  • http://127.0.0.1:8001/docs - Swagger documentation
  • http://127.0.0.1:8001/redoc - Redoc documentation
  • http://127.0.0.1:8001/openapi.json - OpenAPI schema

TIP

The FastAPI have already made an extremely usefull documentation that you should consider reading as well. Because it exists, this doc won't be about deep things you can do with FastAPI

# Hello world

The first example is a simple docker-compose setup where you will

  • Setup
    • Uses docker-compose

    • Use only the api container

    • Expose output from a simple command on /hello

    • Mount the plugins from a local plugin-folder.

    • Things to try

      • http://127.0.0.1:8001/hello - The output of the route in hello.py

# Files

docker-compose.yaml
version: "3.7"

services:
  api:
    image: opastack/api:latest
    ports:
      - "127.0.0.1:8001:8000"
    environment:
      OPA_PLUGIN_PATHS: "/plugins"
    volumes:
      - ./plugins:/plugins
plugins/hello.py
from opa import get_router

router = get_router()


@router.get("/hello")
def return_string():
    return 'Hello to you'

# Timekeeper

This example is still simple, but shows you how to do a little bit more, including a better development environment.

  • Setup

    • Uses docker-compose
    • Using only the api container
    • Expose output on /time that also accept some parameters
    • Expose output on /month/{month} that converts a number (1-12) to month-name. Crashes if it is not able to...
    • Expose output on /sleep-async/{seconds} that sleeps.. This is an async function
    • Expose output on /sleep-sync/{seconds} that sleeps.. This is not an async function..
    • Mount the plugins from a local plugin-folder, watches them for changes.
    • Sets the environment to DEV (see docs for more info)
      • In addition to other things, enabling DEV gives some neat developing features.. Check here for more info how to leverage them
  • Things to try

    • http://127.0.0.1:8001/time - Should show you the time in the default format
    • http://127.0.0.1:8001/docs#/timekeeper/get_time_time_get > Try it out > change format > Execute
    • http://127.0.0.1:8001/month/2
    • http://127.0.0.1:8001/month/20
      • See the better-exception (with variable output) in the console.
      • Try setting up vscode debugging and set a breakpoint and so on..
    • Hammer on http://127.0.0.1:8001/sleep-async/10 and http://127.0.0.1:8001/sleep-sync/20 on multiple terminals.. See how async is the king when it comes to things that blocks.
    • Or use something like hey, for better benchmarking.
      • hey -n 100 -c 100 http://127.0.0.1:8001/sleep-async/5
      • hey -n 100 -c 100 http://127.0.0.1:8001/sleep-sync/5

# Files

docker-compose.yaml
version: "3.7"

services:
  api:
    image: opastack/api:latest
    ports:
      - "127.0.0.1:8001:8000"
      - "127.0.0.1:5678:5678"
    environment:
      OPA_PLUGIN_PATHS: "@merge /plugins"
      OPA_PLUGIN_BLACKLIST_LIST: '["/data/opa/plugins/core-selfhosted"]'
      ENV: "dev"
      OPA_LOGLEVEL: "debug"
    volumes:
      - ./plugins:/plugins
plugins/timekeeper.py
import logging
import datetime
import secrets

from time import sleep
from asyncio import sleep as async_sleep

from opa import get_router

router = get_router()

# Using tags helps separate your items from the also included demo-plugins
@router.get("/time", tags=["timekeeper"])
def get_time(format: str = '%Y-%m-%d %H:%M:%S'):
    return datetime.datetime.now().strftime(format)


@router.get("/month/{month}", tags=["timekeeper"])
def get_monthname(month: int):
    """
    Make sure you give me a valid month number, or I might crash...
    """
    return datetime.datetime.strptime(str(month), '%m').strftime('%B')


@router.get("/sleep-async/{seconds}", tags=["timekeeper"])
async def async_sleeper(seconds: int):
    """
    I sleep for some seconds, then return how long I slept.. Neat!
    But I'm also async, so it shoulnt block anything..
    """
    randstring = secrets.token_urlsafe(5)
    logging.info(f'Start async sleep for ({randstring}) for {seconds}')
    await async_sleep(seconds)
    logging.info(f'Ending async sleep for ({randstring}) for {seconds}')
    return f'I slept for {seconds} seconds'


@router.get("/sleep-sync/{seconds}", tags=["timekeeper"])
def sync_sleeper(seconds: int):
    """
    I'm almost like sleep-async, but I'm not async.. That means that I block python when I do nothing.
    """
    randstring = secrets.token_urlsafe(5)
    logging.info(f'Start sync sleep for ({randstring}) for {seconds}')
    sleep(seconds)
    logging.info(f'Ending sync sleep for ({randstring}) for {seconds}')
    return f'I slept for {seconds} seconds'

# Redis

Example using redis (both async and normal), using two different libs (aioredis and walrus), see dependencies for more info.

  • Setup

    • Uses docker-compose
    • Using the api container, and a redis-container
    • Expose output on /counter-async increments when you visit using aioredis
    • Expose output on /counter-sync increments when you visit using walrus
    • Expose output to GET and POST on /bloom, one to add entries to a bloom-filter (using walrus) and one to check. Walrus have a ton of neat feature, this is one of them.
  • Things to try

    • http://127.0.0.1:8001/counter-async - See a counter
    • cat a_file.md | curl -X POST -d @- "http://localhost:8000/bloom" - fill up a bloom filter
    • http://localhost:8000/bloom?string=sleep - Check for sleep in the bloomfilter

# Files

docker-compose.yaml
version: "3.7"

services:
  api:
    # image: opastack/api:latest
    image: 25724b069f78
    ports:
      - "127.0.0.1:8001:8000"
    environment:
      OPA_PLUGIN_PATHS: "/plugins"
    volumes:
      - ./plugins:/plugins
  redis:
    image: "redis:5"
plugins/timekeeper.py
from opa import get_instance, get_router

router = get_router()


@router.get("/counter-async")
async def counter_async(key=None):
    counter = await get_instance('aioredis').incr(key or 'incr-async')
    return f'Counter is {counter}'


@router.get("/counter-sync")
def counter_sync(key=None):
    counter = get_instance('walrus').incr(key or 'incr-sync')
    return f'Counter is {counter}'


@router.get("/bloom")
def check_bloom_filter(string: str):
    walrus = get_instance('walrus')
    bf = walrus.bloom_filter('bf')
    return string in bf


@router.post("/bloom")
def add_bloom_filter(string: str):
    # Waiting for https://github.com/tiangolo/fastapi/issues/1018 to have plain/text input
    # Possible now, but not with generation of the openapi spec..
    walrus = get_instance('walrus')
    bf = walrus.bloom_filter('bf')
    for i in string.split(' '):
        bf.add(i)

    return f'Added entries'

# Background tasks

Background tasks are built into FastAPI/Starlette. If you come from the sync-world, you are probably used to Celery. Celery complicates a lot, so if you just got a simple fire-and-forget job. A background-task might fit your needs.

See below for info about running a celery task.

  • Setup

    • Uses docker-compose
    • Using the api container
    • Uses redis for locking and keeping track of a string (not needed for running tasks where you don't care about results)
    • Runs background tasks using FastAPI's starlette background tasks
    • Let you POST to /runone to put a task to the queue, it runs for 4 seconds
    • Expose output on /runone to see status of which task is currently running
  • Things to try

    • Terminal 1 - Status
      • while true; do curl http://localhost:8001/runone; echo; sleep 1; done
    • Terminal 2 - Trigger new tasks, try triggering many, see they queue up on terminal 1
      • curl -X POST http://localhost:8001/runone

# Files

docker-compose.yaml
version: "3.7"

services:
  api:
    image: opastack/api:latest
    ports:
      - "127.0.0.1:8001:8000"
    environment:
      OPA_PLUGIN_PATHS: "/plugins"
    volumes:
      - ./plugins:/plugins
  redis:
    image: "redis:5"
plugins/tasks.py
from time import sleep
from secrets import token_urlsafe

from fastapi import BackgroundTasks, APIRouter

from opa import get_router, get_instance

router = get_router()


def queuer(text: str):
    walrus = get_instance('walrus')
    lock = walrus.lock('runone')

    with lock:
        walrus.set('runone', text)
        sleep(4)
        walrus.delete('runone')


@router.post("/runone")
async def runone_post(background_tasks: BackgroundTasks):
    random_str = token_urlsafe(5)
    background_tasks.add_task(queuer, random_str)
    return {"message": f"Triggered background task: {random_str}"}


@router.get("/runone")
async def runone_get():
    walrus = get_instance('walrus')
    return {"current_task": walrus.get('runone')}

# Celery task

Celery is a very powerfull distributed task-queue. It is much more feature-rich than the default background tasks available in FastAPI/Starlette.

  • Setup

    • Uses docker-compose
    • Same container image, but different config for
      • api container
      • worker-math - Runs celery and consumes the math queue
      • worker-counter - Runs celery and consumes the counter queue
    • Redis for storing task-results
    • Rabbitmq for keeping track of tasks (broker)
    • Flower to see statuses
  • Things to try

    • http://localhost:8001/inc/5 - Hit the url a couple of times
      • They will queue up. At the end of each task, the celery worker (in the worker-counter container) will increment a counter
      • The same counter is reported as output when accessing this url, since the api can access the same redis instance as the worker
    • http://localhost:8001/div/A/B - Hit the other worker (worker-math)
      • http://localhost:8001/div/2/1 - normal queuing
      • http://localhost:8001/div/2/0 - one that will get an error
    • See tasks as they arrive
      • Using celery events (doesnt mather which)
        • docker-compose exec -w /data worker-counter celery -A opa.main events
        • docker-compose exec -w /data worker-math celery -A opa.main events
      • Using flower
        • http://localhost:5555/
    • http://localhost:8001/inc/100 - Queue a task which takes 100 seconds
      • Take a note of the task_id in the output
    • http://localhost:8001/status/27deec1a-c8f0-4d0d-95dc-59c6dd937207 - Check status for task
      • The inc task updates some meta-info that we can query using another endpoint
      • There are also a background task that will get those updates in the fastapi context, see the __init__.py file for info
    • http://localhost:8001/last_status - Another way to get the status
  • Files at github: https://github.com/opa-stack/opa-stack/tree/master/examples/docker-compose/celery-task

docker-compose.yaml
version: "3.7"

services:
  api:
    image: opastack/api:latest
    ports:
      - "127.0.0.1:8001:8000"
    environment:
      OPA_PLUGIN_PATHS: "/plugins"
    volumes:
      - ./plugins:/plugins

  redis:
    image: "redis:5"

  rabbitmq:
    image: "rabbitmq:3.8-management"
    ports:
      - "15672:15672" # Management port

  flower:
    image: "mher/flower:latest"
    command: "-A proj --broker=amqp://guest:guest@rabbitmq:5672//"
    ports:
      - "5555:5555"
plugins/celerydemo/__init__.py
from opa import get_router, Hook, get_instance, app


class celery_config(Hook):
    name = 'driver.celery.setup'

    def run(self, celery_app, task_candidates):
        celery_app.conf.task_routes = {"worker.celery_worker.test_celery": "test-queue"}
        celery_app.conf.update(task_track_started=True)
        celery_app.autodiscover_tasks(task_candidates)
        return celery_app


router = get_router()


@router.get("/add/{num1}/{num2}")
async def root(num1: int, num2: int):
    from celerydemo.tasks import test_celery

    celery = get_instance('celery')
    walrus = get_instance('walrus')
    test_celery.delay('abc')
    count = str(walrus.get('celery'))
    return {"message": "Word received", 'count': count}
plugins/celerydemo/tasks.py
from time import sleep

from celery import current_task

from opa import get_instance

celery = get_instance('celery')

@celery.task
def test_celery(word: str) -> str:
    for i in range(1, 4):
        sleep(1)
        current_task.update_state(state='PROGRESS',
                                  meta={'process_percent': i*10})

    get_instance('walrus').incr('celery')
    return f"test task return {word}...."