# Examples
TIP
Read this first if you need a new project
Note that on all the examples, you will also get these url's
- http://127.0.0.1:8001/docs - Swagger documentation
- http://127.0.0.1:8001/redoc - Redoc documentation
- http://127.0.0.1:8001/openapi.json - OpenAPI schema
TIP
The FastAPI have already made an extremely usefull documentation that you should consider reading as well. Because it exists, this doc won't be about deep things you can do with FastAPI
# Hello world
The first example is a simple docker-compose setup where you will
- Setup
Uses docker-compose
Use only the api container
Expose output from a simple command on
/hello
Mount the plugins from a local plugin-folder.
Things to try
- http://127.0.0.1:8001/hello - The output of the route in
hello.py
- http://127.0.0.1:8001/hello - The output of the route in
# Files
- Files at github: https://github.com/opa-stack/opa-stack/tree/master/examples/docker-compose/hello-world
docker-compose.yaml
version: "3.7"
services:
api:
image: opastack/api:latest
ports:
- "127.0.0.1:8001:8000"
environment:
OPA_PLUGIN_PATHS: "/plugins"
volumes:
- ./plugins:/plugins
plugins/hello.py
from opa import get_router
router = get_router()
@router.get("/hello")
def return_string():
return 'Hello to you'
# Timekeeper
This example is still simple, but shows you how to do a little bit more, including a better development environment.
Setup
- Uses docker-compose
- Using only the api container
- Expose output on
/time
that also accept some parameters - Expose output on
/month/{month}
that converts a number (1-12) to month-name. Crashes if it is not able to... - Expose output on
/sleep-async/{seconds}
that sleeps.. This is an async function - Expose output on
/sleep-sync/{seconds}
that sleeps.. This is not an async function.. - Mount the plugins from a local plugin-folder, watches them for changes.
- Sets the environment to
DEV
(see docs for more info)- In addition to other things, enabling
DEV
gives some neat developing features.. Check here for more info how to leverage them
- In addition to other things, enabling
Things to try
- http://127.0.0.1:8001/time - Should show you the time in the default format
- http://127.0.0.1:8001/docs#/timekeeper/get_time_time_get > Try it out > change format > Execute
- http://127.0.0.1:8001/month/2
- http://127.0.0.1:8001/month/20
- See the better-exception (with variable output) in the console.
- Try setting up vscode debugging and set a breakpoint and so on..
- Hammer on http://127.0.0.1:8001/sleep-async/10 and http://127.0.0.1:8001/sleep-sync/20 on multiple terminals.. See how async is the king when it comes to things that blocks.
- Or use something like hey, for better benchmarking.
- hey -n 100 -c 100 http://127.0.0.1:8001/sleep-async/5
- hey -n 100 -c 100 http://127.0.0.1:8001/sleep-sync/5
# Files
- Files at github: https://github.com/opa-stack/opa-stack/tree/master/examples/docker-compose/timekeeper
docker-compose.yaml
version: "3.7"
services:
api:
image: opastack/api:latest
ports:
- "127.0.0.1:8001:8000"
- "127.0.0.1:5678:5678"
environment:
OPA_PLUGIN_PATHS: "@merge /plugins"
OPA_PLUGIN_BLACKLIST_LIST: '["/data/opa/plugins/core-selfhosted"]'
ENV: "dev"
OPA_LOGLEVEL: "debug"
volumes:
- ./plugins:/plugins
plugins/timekeeper.py
import logging
import datetime
import secrets
from time import sleep
from asyncio import sleep as async_sleep
from opa import get_router
router = get_router()
# Using tags helps separate your items from the also included demo-plugins
@router.get("/time", tags=["timekeeper"])
def get_time(format: str = '%Y-%m-%d %H:%M:%S'):
return datetime.datetime.now().strftime(format)
@router.get("/month/{month}", tags=["timekeeper"])
def get_monthname(month: int):
"""
Make sure you give me a valid month number, or I might crash...
"""
return datetime.datetime.strptime(str(month), '%m').strftime('%B')
@router.get("/sleep-async/{seconds}", tags=["timekeeper"])
async def async_sleeper(seconds: int):
"""
I sleep for some seconds, then return how long I slept.. Neat!
But I'm also async, so it shoulnt block anything..
"""
randstring = secrets.token_urlsafe(5)
logging.info(f'Start async sleep for ({randstring}) for {seconds}')
await async_sleep(seconds)
logging.info(f'Ending async sleep for ({randstring}) for {seconds}')
return f'I slept for {seconds} seconds'
@router.get("/sleep-sync/{seconds}", tags=["timekeeper"])
def sync_sleeper(seconds: int):
"""
I'm almost like sleep-async, but I'm not async.. That means that I block python when I do nothing.
"""
randstring = secrets.token_urlsafe(5)
logging.info(f'Start sync sleep for ({randstring}) for {seconds}')
sleep(seconds)
logging.info(f'Ending sync sleep for ({randstring}) for {seconds}')
return f'I slept for {seconds} seconds'
# Redis
Example using redis (both async and normal), using two different libs (aioredis and walrus), see dependencies for more info.
Setup
- Uses docker-compose
- Using the api container, and a redis-container
- Expose output on
/counter-async
increments when you visit using aioredis - Expose output on
/counter-sync
increments when you visit using walrus - Expose output to GET and POST on
/bloom
, one to add entries to a bloom-filter (using walrus) and one to check. Walrus have a ton of neat feature, this is one of them.
Things to try
- http://127.0.0.1:8001/counter-async - See a counter
cat a_file.md | curl -X POST -d @- "http://localhost:8000/bloom"
- fill up a bloom filter- http://localhost:8000/bloom?string=sleep - Check for
sleep
in the bloomfilter
# Files
docker-compose.yaml
version: "3.7"
services:
api:
# image: opastack/api:latest
image: 25724b069f78
ports:
- "127.0.0.1:8001:8000"
environment:
OPA_PLUGIN_PATHS: "/plugins"
volumes:
- ./plugins:/plugins
redis:
image: "redis:5"
plugins/timekeeper.py
from opa import get_instance, get_router
router = get_router()
@router.get("/counter-async")
async def counter_async(key=None):
counter = await get_instance('aioredis').incr(key or 'incr-async')
return f'Counter is {counter}'
@router.get("/counter-sync")
def counter_sync(key=None):
counter = get_instance('walrus').incr(key or 'incr-sync')
return f'Counter is {counter}'
@router.get("/bloom")
def check_bloom_filter(string: str):
walrus = get_instance('walrus')
bf = walrus.bloom_filter('bf')
return string in bf
@router.post("/bloom")
def add_bloom_filter(string: str):
# Waiting for https://github.com/tiangolo/fastapi/issues/1018 to have plain/text input
# Possible now, but not with generation of the openapi spec..
walrus = get_instance('walrus')
bf = walrus.bloom_filter('bf')
for i in string.split(' '):
bf.add(i)
return f'Added entries'
# Background tasks
Background tasks are built into FastAPI/Starlette. If you come from the sync-world, you are probably used to Celery. Celery complicates a lot, so if you just got a simple fire-and-forget job. A background-task might fit your needs.
See below for info about running a celery task.
Setup
- Uses docker-compose
- Using the api container
- Uses redis for locking and keeping track of a string (not needed for running tasks where you don't care about results)
- Runs background tasks using FastAPI's starlette background tasks
- Let you POST to
/runone
to put a task to the queue, it runs for 4 seconds - Expose output on
/runone
to see status of which task is currently running
Things to try
- Terminal 1 - Status
while true; do curl http://localhost:8001/runone; echo; sleep 1; done
- Terminal 2 - Trigger new tasks, try triggering many, see they queue up on terminal 1
- curl -X POST http://localhost:8001/runone
- Terminal 1 - Status
# Files
- Files at github: https://github.com/opa-stack/opa-stack/tree/master/examples/docker-compose/background-task
docker-compose.yaml
version: "3.7"
services:
api:
image: opastack/api:latest
ports:
- "127.0.0.1:8001:8000"
environment:
OPA_PLUGIN_PATHS: "/plugins"
volumes:
- ./plugins:/plugins
redis:
image: "redis:5"
plugins/tasks.py
from time import sleep
from secrets import token_urlsafe
from fastapi import BackgroundTasks, APIRouter
from opa import get_router, get_instance
router = get_router()
def queuer(text: str):
walrus = get_instance('walrus')
lock = walrus.lock('runone')
with lock:
walrus.set('runone', text)
sleep(4)
walrus.delete('runone')
@router.post("/runone")
async def runone_post(background_tasks: BackgroundTasks):
random_str = token_urlsafe(5)
background_tasks.add_task(queuer, random_str)
return {"message": f"Triggered background task: {random_str}"}
@router.get("/runone")
async def runone_get():
walrus = get_instance('walrus')
return {"current_task": walrus.get('runone')}
# Celery task
Celery is a very powerfull distributed task-queue. It is much more feature-rich than the default background tasks available in FastAPI/Starlette.
Setup
- Uses docker-compose
- Same container image, but different config for
- api container
- worker-math - Runs celery and consumes the
math
queue - worker-counter - Runs celery and consumes the
counter
queue
- Redis for storing task-results
- Rabbitmq for keeping track of tasks (broker)
- Flower to see statuses
Things to try
- http://localhost:8001/inc/5 - Hit the url a couple of times
- They will queue up. At the end of each task, the celery worker (in the
worker-counter
container) will increment a counter - The same counter is reported as output when accessing this url, since the api can access the same redis instance as the worker
- They will queue up. At the end of each task, the celery worker (in the
- http://localhost:8001/div/A/B - Hit the other worker (
worker-math
)- http://localhost:8001/div/2/1 - normal queuing
- http://localhost:8001/div/2/0 - one that will get an error
- See tasks as they arrive
- Using celery events (doesnt mather which)
- docker-compose exec -w /data worker-counter celery -A opa.main events
- docker-compose exec -w /data worker-math celery -A opa.main events
- Using flower
- http://localhost:5555/
- Using celery events (doesnt mather which)
- http://localhost:8001/inc/100 - Queue a task which takes 100 seconds
- Take a note of the task_id in the output
- http://localhost:8001/status/27deec1a-c8f0-4d0d-95dc-59c6dd937207 - Check status for task
- The inc task updates some meta-info that we can query using another endpoint
- There are also a background task that will get those updates in the fastapi context, see the
__init__.py
file for info
- http://localhost:8001/last_status - Another way to get the status
- http://localhost:8001/inc/5 - Hit the url a couple of times
Files at github: https://github.com/opa-stack/opa-stack/tree/master/examples/docker-compose/celery-task
docker-compose.yaml
version: "3.7"
services:
api:
image: opastack/api:latest
ports:
- "127.0.0.1:8001:8000"
environment:
OPA_PLUGIN_PATHS: "/plugins"
volumes:
- ./plugins:/plugins
redis:
image: "redis:5"
rabbitmq:
image: "rabbitmq:3.8-management"
ports:
- "15672:15672" # Management port
flower:
image: "mher/flower:latest"
command: "-A proj --broker=amqp://guest:guest@rabbitmq:5672//"
ports:
- "5555:5555"
plugins/celerydemo/__init__.py
from opa import get_router, Hook, get_instance, app
class celery_config(Hook):
name = 'driver.celery.setup'
def run(self, celery_app, task_candidates):
celery_app.conf.task_routes = {"worker.celery_worker.test_celery": "test-queue"}
celery_app.conf.update(task_track_started=True)
celery_app.autodiscover_tasks(task_candidates)
return celery_app
router = get_router()
@router.get("/add/{num1}/{num2}")
async def root(num1: int, num2: int):
from celerydemo.tasks import test_celery
celery = get_instance('celery')
walrus = get_instance('walrus')
test_celery.delay('abc')
count = str(walrus.get('celery'))
return {"message": "Word received", 'count': count}
plugins/celerydemo/tasks.py
from time import sleep
from celery import current_task
from opa import get_instance
celery = get_instance('celery')
@celery.task
def test_celery(word: str) -> str:
for i in range(1, 4):
sleep(1)
current_task.update_state(state='PROGRESS',
meta={'process_percent': i*10})
get_instance('walrus').incr('celery')
return f"test task return {word}...."