Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions bluesky_httpserver/routers/core_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,172 @@ async def plans_allowed_handler(
return msg


# ========================================================================
# Experimental endpoints.


def get_json_schema(plan_description, allowed_plans, allowed_devices):
from bluesky_queueserver.manager.profile_ops import (
construct_parameters,
filter_plan_description,
pydantic_construct_model_class,
)

plan_description = filter_plan_description(
plan_description, allowed_plans=allowed_plans, allowed_devices=allowed_devices
)
param_list = plan_description["parameters"]
parameters = construct_parameters(param_list)
pydantic_model_class = pydantic_construct_model_class(parameters)
return pydantic_model_class.model_json_schema()


@router.get("/plans/allowed/json_schema")
async def plans_allowed_json_schema_handler(
payload: dict = {},
principal=Security(get_current_principal, scopes=["read:resources"]),
settings: BaseSettings = Depends(get_settings),
api_access_manager=Depends(get_api_access_manager),
resource_access_manager=Depends(get_resource_access_manager),
):
"""
Returns JSON schemas for plans in the list of allowed plans.
"""

try:
validate_payload_keys(payload)

username = get_current_username(
principal=principal, settings=settings, api_access_manager=api_access_manager
)[0]
user_group = resource_access_manager.get_resource_group(username)
payload.update({"user_group": user_group})
plans_msg = await SR.RM.plans_allowed(**payload)
devices_msg = await SR.RM.devices_allowed(**payload)

plans_allowed = plans_msg["plans_allowed"]
devices_allowed = devices_msg["devices_allowed"]

plans_schemas = {}
for plan in plans_allowed:
schema = get_json_schema(plans_allowed[plan], plans_allowed, devices_allowed)
plans_schemas[plan] = schema

msg = plans_msg.copy()
msg["plans_allowed"] = plans_schemas

print(f"{list(plans_allowed.keys())=}")

except Exception:
process_exception()
return msg


def plan_bind_args(item, existing_plans, existing_devices):
import inspect

from bluesky_queueserver.manager.profile_ops import construct_parameters, filter_plan_description

if item["item_type"] == "plan":
name = item.get("name", None)
if not name:
raise ValueError(f"Item {item} does not have 'name' key")
plan_description = existing_plans.get(name, None)
if not plan_description:
raise ValueError(f"No description is found for plan {name!r} (item {item})")
call_args, call_kwargs = item.get("args", []), item.get("kwargs", {})

plan_description = filter_plan_description(
plan_description, allowed_plans=existing_plans, allowed_devices=existing_devices
)
param_list = plan_description["parameters"]
parameters = construct_parameters(param_list)

sig = inspect.Signature(parameters)
bound_args = sig.bind(*call_args, **call_kwargs)

arguments = bound_args.arguments
item["args"] = []
item["kwargs"] = arguments

# print(f"{args=} {kwargs=} {arguments=}")

return item


@router.get("/queue/get/arguments")
async def queue_get_arguments_handler(
payload: dict = {},
principal=Security(get_current_principal, scopes=["read:queue"]),
settings: BaseSettings = Depends(get_settings),
api_access_manager=Depends(get_api_access_manager),
resource_access_manager=Depends(get_resource_access_manager),
):
"""
Returns plan queue. The plan args are bound to names if possible.
"""

try:
validate_payload_keys(payload)

msg_queue = await SR.RM.queue_get(**payload)
print(f"msg_queue = {msg_queue}")

plans_msg = await SR.RM.plans_existing(**payload)
devices_msg = await SR.RM.devices_existing(**payload)
plans_existing = plans_msg["plans_existing"]
devices_existing = devices_msg["devices_existing"]

items = msg_queue.get("items", [])
items_updated = []
for item in items:
item_updated = plan_bind_args(item, plans_existing, devices_existing)
items_updated.append(item_updated)

msg_queue["items"] = items_updated

except Exception:
process_exception()
return msg_queue


@router.post("/queue/item/arguments")
async def queue_item_arguments_handler(
payload: dict = {},
principal=Security(get_current_principal, scopes=["read:queue"]),
settings: BaseSettings = Depends(get_settings),
api_access_manager=Depends(get_api_access_manager),
resource_access_manager=Depends(get_resource_access_manager),
):
"""
Adds new plan to the queue
"""
try:
plans_msg = await SR.RM.plans_existing()
devices_msg = await SR.RM.devices_existing()
plans_existing = plans_msg["plans_existing"]
devices_existing = devices_msg["devices_existing"]

try:
item = payload.get("item", None)
if item is None:
raise ValueError("Payload does not contain 'item' key")
item = plan_bind_args(item, plans_existing, devices_existing)
success, msg = True, ""
except Exception as ex:
success = False
msg = f"Failed to bind arguments for the item: {ex}"

response = {"success": success, "msg": msg, "item": item}
except Exception:
process_exception()

return response


# ========================================================================


@router.get("/devices/allowed")
async def devices_allowed_handler(
payload: dict = {},
Expand Down
Loading