import inspect
import sys
import os
from functools import wraps, partial
import aniso8601
from flask import current_app, json, request as flask_request, _app_ctx_stack
from werkzeug.local import LocalProxy
from flask_assistant import logger
from flask_assistant.response import _Response
from flask_assistant.manager import ContextManager, parse_context_name
from api_ai.api import ApiAi
from io import StringIO
def find_assistant(): # Taken from Flask-ask courtesy of @voutilad
"""
Find our instance of Assistant, navigating Local's and possible blueprints.
Note: This only supports returning a reference to the first instance
of Assistant found.
"""
if hasattr(current_app, "assist"):
return getattr(current_app, "assist")
else:
if hasattr(current_app, "blueprints"):
blueprints = getattr(current_app, "blueprints")
for blueprint_name in blueprints:
if hasattr(blueprints[blueprint_name], "assist"):
return getattr(blueprints[blueprint_name], "assist")
request = LocalProxy(lambda: find_assistant().request)
intent = LocalProxy(lambda: find_assistant().intent)
access_token = LocalProxy(lambda: find_assistant().access_token)
context_in = LocalProxy(lambda: find_assistant().context_in)
context_manager = LocalProxy(lambda: find_assistant().context_manager)
convert_errors = LocalProxy(lambda: find_assistant().convert_errors)
session_id = LocalProxy(lambda: find_assistant().session_id)
user = LocalProxy(lambda: find_assistant().user)
storage = LocalProxy(lambda: find_assistant().storage)
profile = LocalProxy(lambda: find_assistant().profile)
# Converter shorthands for commonly used system entities
_converter_shorthands = {
"date": aniso8601.parse_date, # Returns date
"date-period": aniso8601.parse_interval, # Returns (date, date)
"time": aniso8601.parse_time, # Returns time
}
[docs]class Assistant(object):
"""Central Interface for creating a Dialogflow webhook.
The Assistant object routes requests to :func:`action` decorated functions.
The assistant object maps requests received from an Dialogflow agent to Intent-specific view functions.
The view_functions can be properly matched depending on required parameters and contexts.
These requests originate from Google Actions and are sent to the Assistant object
through Dialogflow's infrastructure.
Keyword Arguments:
app {Flask object} -- App instance - created with Flask(__name__) (default: {None})
blueprint {Flask Blueprint} -- Flask Blueprint instance to initialize (Default: {None})
route {str} -- entry point to which initial Alexa Requests are forwarded (default: {None})
project_id {str} -- Google Cloud Project ID, required to manage contexts from flask-assistant
client_id {Str} -- Actions on Google client ID used for account linking
dev_token {str} - Dialogflow dev access token used to register and retrieve agent resources
client_token {str} - Dialogflow client access token required for querying agent
"""
def __init__(
self,
app=None,
blueprint=None,
route=None,
project_id=None,
dev_token=None,
client_token=None,
client_id=None,
):
self.app = app
self.blueprint = blueprint
self._route = route
self.project_id = project_id
self.client_id = client_id
self._intent_action_funcs = {}
self._intent_mappings = {}
self._intent_converts = {}
self._intent_defaults = {}
self._intent_fallbacks = {}
self._intent_prompts = {}
self._intent_events = {}
self._required_contexts = {}
self._context_funcs = {}
self._func_contexts = {}
self.api = ApiAi(dev_token, client_token)
if app is not None:
self.init_app(app)
elif blueprint is not None:
self.init_blueprint(blueprint)
if self.client_id is None and self.app is not None:
self.client_id = self.app.config.get("AOG_CLIENT_ID")
if project_id is None:
import warnings
warnings.warn(
"""\nGoogle Cloud Project ID is required to manage contexts using flask-assistant\n
Please initialize the Assistant object with a project ID
assist = Assistant(app, project_id='YOUR_PROJECT_ID")""",
stacklevel=2,
)
def init_app(self, app):
self.app = app
if self._route is None:
self._route = "/"
app.assist = self
app.add_url_rule(
self._route, view_func=self._flask_assitant_view_func, methods=["POST"]
)
if self.client_id is None and self.app is not None:
self.client_id = self.app.config.get("AOG_CLIENT_ID")
# Taken from Flask-ask courtesy of @voutilad
[docs] def init_blueprint(self, blueprint, path="templates.yaml"):
"""Initialize a Flask Blueprint, similar to init_app, but without the access
to the application config.
Keyword Arguments:
blueprint {Flask Blueprint} -- Flask Blueprint instance to initialize
(Default: {None})
path {str} -- path to templates yaml file, relative to Blueprint
(Default: {'templates.yaml'})
"""
if self._route is not None:
raise TypeError("route cannot be set when using blueprints!")
# we need to tuck our reference to this Assistant instance
# into the blueprint object and find it later!
blueprint.assist = self
# BlueprintSetupState.add_url_rule gets called underneath the covers and
# concats the rule string, so we should set to an empty string to allow
# Blueprint('blueprint_api', __name__, url_prefix="/assist") to result in
# exposing the rule at "/assist" and not "/assist/".
blueprint.add_url_rule(
"", view_func=self._flask_assitant_view_func, methods=["POST"]
)
# blueprint.jinja_loader = ChoiceLoader([YamlLoader(blueprint, path)])
if self.client_id is None and self.app is not None:
self.client_id = self.app.config.get("AOG_CLIENT_ID")
@property
def request(self):
"""Local Proxy refering to the request JSON recieved from Dialogflow"""
return getattr(_app_ctx_stack.top, "_assist_request", None)
@request.setter
def request(self, value):
_app_ctx_stack.top._assist_request = value
@property
def intent(self):
"""Local Proxy refering to the name of the intent contained in the Dialogflow request"""
return getattr(_app_ctx_stack.top, "_assist_intent", None)
@intent.setter
def intent(self, value):
_app_ctx_stack.top._assist_intent = value
@property
def access_token(self):
"""Local proxy referring to the OAuth token for linked accounts."""
return getattr(_app_ctx_stack.top, "_assist_access_token", None)
@access_token.setter
def access_token(self, value):
_app_ctx_stack.top._assist_access_token = value
@property
def context_in(self):
"""Local Proxy refering to context objects contained within current session"""
return getattr(_app_ctx_stack.top, "_assist_context_in", [])
@context_in.setter
def context_in(self, value):
_app_ctx_stack.top._assist_context_in = value
@property
def context_manager(self):
"""LocalProxy refering to the app's instance of the :class: `ContextManager`.
Interface for adding and accessing contexts and their parameters
"""
return getattr(
_app_ctx_stack.top, "_assist_context_manager", ContextManager(self)
)
@context_manager.setter
def context_manager(self, value):
_app_ctx_stack.top._assist_context_manager = value
@property
def convert_errors(self):
return getattr(_app_ctx_stack.top, "_assistant_convert_errors", None)
@convert_errors.setter
def convert_errors(self, value):
_app_ctx_stack.top._assistant_convert_errors = value
@property
def session_id(self):
return getattr(_app_ctx_stack.top, "_assist_session_id", None)
@session_id.setter
def session_id(self, value):
_app_ctx_stack.top._assist_session_id = value
@property
def user(self):
return getattr(_app_ctx_stack.top, "_assist_user", {})
@user.setter
def user(self, value):
storage_data = value.get("userStorage", {})
if not isinstance(storage_data, dict):
storage_data = json.loads(storage_data)
value["userStorage"] = storage_data
_app_ctx_stack.top._assist_user = value
_app_ctx_stack.top._assist_storage = storage_data
@property
def storage(self):
return self.user.get("userStorage", {})
@storage.setter
def storage(self, value):
if not isinstance(value, dict):
raise TypeError("Storage must be a dictionary")
self.user["userStorage"] = value
@property
def profile(self):
return getattr(_app_ctx_stack.top, "_assist_profile", None)
@profile.setter
def profile(self, value):
_app_ctx_stack.top._assist_profile = value
def _register_context_to_func(self, intent_name, context=[]):
required = self._required_contexts.get(intent_name)
if required:
required.extend(list(set(context) - set(required)))
else:
self._required_contexts[intent_name] = []
self._required_contexts[intent_name].extend(context)
def context(self, *context_names):
def decorator(f):
func_requires = self._func_contexts.get(f)
if not func_requires:
self._func_contexts[f] = []
self._func_contexts[f].extend(context_names)
def wrapper(*args, **kw):
return f(*args, with_context=context_names, **kw)
return wrapper
return decorator
[docs] def action(
self,
intent_name,
is_fallback=False,
mapping={},
convert={},
default={},
with_context=[],
events=[],
*args,
**kw
):
"""Decorates an intent_name's Action view function.
The wrapped function is called when a request with the
given intent_name is recieved along with all required parameters.
"""
def decorator(f):
action_funcs = self._intent_action_funcs.get(intent_name, [])
action_funcs.append(f)
self._intent_action_funcs[intent_name] = action_funcs
self._intent_mappings[intent_name] = mapping
self._intent_converts[intent_name] = convert
self._intent_defaults[intent_name] = default
self._intent_fallbacks[intent_name] = is_fallback
self._intent_events[intent_name] = events
self._register_context_to_func(intent_name, with_context)
@wraps(f)
def wrapper(*args, **kw):
self._flask_assitant_view_func(*args, **kw)
return f
return decorator
[docs] def prompt_for(self, next_param, intent_name):
"""Decorates a function to prompt for an action's required parameter.
The wrapped function is called if next_param was not recieved with the given intent's
request and is required for the fulfillment of the intent's action.
Arguments:
next_param {str} -- name of the parameter required for action function
intent_name {str} -- name of the intent the dependent action belongs to
"""
def decorator(f):
prompts = self._intent_prompts.get(intent_name)
if prompts:
prompts[next_param] = f
else:
self._intent_prompts[intent_name] = {}
self._intent_prompts[intent_name][next_param] = f
@wraps(f)
def wrapper(*args, **kw):
self._flask_assitant_view_func(*args, **kw)
return f
return decorator
def fallback(self):
def decorator(f):
self._fallback_response = f
return f
def _dialogflow_request(self, verify=True):
raw_body = flask_request.data
_dialogflow_request_payload = json.loads(raw_body)
return _dialogflow_request_payload
def _dump_request(self,):
summary = {
"Intent": self.intent,
"Incoming Contexts": [c.name for c in self.context_manager.active],
"Source": self.request["originalDetectIntentRequest"].get("source"),
"Missing Params": self._missing_params,
"Received Params": self.request["queryResult"]["parameters"],
}
msg = "Request: " + json.dumps(summary, indent=2, sort_keys=True)
logger.info(msg)
def _dump_result(self, view_func, result):
summary = {
"Intent": self.intent,
"Outgoing Contexts": [c.name for c in self.context_manager.active],
"Matched Action": view_func.__name__,
"Response Speech": result._speech,
}
msg = "Result: " + json.dumps(summary, indent=2, sort_keys=True)
logger.info(msg)
def _parse_session_id(self):
return self.request["session"].split("/sessions/")[1]
def _set_user_profile(self):
if self.client_id is None:
return
if self.user.get("idToken") is not None:
from flask_assistant.utils import decode_token
token = self.user["idToken"]
decode_resp = decode_token(token, self.client_id)
if decode_resp["status"] == "BAD":
return
else: # decode_resp["status"]=="OK"
profile_payload = decode_resp["output"]
for k in ["sub", "iss", "aud", "iat", "exp"]:
profile_payload.pop(k)
self.profile = profile_payload
def _flask_assitant_view_func(self, nlp_result=None, *args, **kwargs):
if nlp_result: # pass API query result directly
self.request = nlp_result
else: # called as webhook
self.request = self._dialogflow_request(verify=False)
logger.debug(json.dumps(self.request, indent=2))
try:
self.intent = self.request["queryResult"]["intent"]["displayName"]
self.context_in = self.request["queryResult"].get("outputContexts", [])
self.session_id = self._parse_session_id()
assert self.session_id is not None
except KeyError:
raise DeprecationWarning(
"""It appears your agent is still using the Dialogflow V1 API,
please update to V2 in the Dialogflow console."""
)
# update context_manager's assist reference
# TODO: acces context_manager from assist, instead of own object
self.context_manager._assist = self
original_request = self.request.get("originalDetectIntentRequest")
if original_request:
payload = original_request.get("payload")
if payload and payload.get("user"):
self.user = original_request["payload"]["user"]
self._set_user_profile()
# Get access token from request
if original_request and original_request.get("user"):
self.access_token = original_request["user"].get("accessToken")
self._update_contexts()
self._dump_request()
view_func = self._match_view_func()
if view_func is None:
logger.error("Failed to match an action function")
return "", 400
logger.info("Matched action function: {}".format(view_func.__name__))
result = self._map_intent_to_view_func(view_func)()
if result is not None:
if isinstance(result, _Response):
self._dump_result(view_func, result)
resp = result.render_response()
return resp
return result
logger.error("Action func returned empty response")
return "", 400
def _update_contexts(self):
temp = self.context_manager
temp.update(self.context_in)
self.context_manager = temp
def _match_view_func(self):
view_func = None
intent_actions = self._intent_action_funcs.get(self.intent, [])
if len(intent_actions) == 0:
logger.critical(
"No action funcs defined for intent: {}".format(self.intent)
)
return view_func
if self.has_live_context():
view_func = self._choose_context_view()
if not view_func and self._missing_params:
prompts = self._intent_prompts.get(self.intent)
if prompts:
param_choice = self._missing_params.pop()
view_func = prompts.get(param_choice)
logger.debug(
"Matching prompt func {} for missing param {}".format(
view_func.__name__, param_choice
)
)
if not view_func and len(intent_actions) == 1:
view_func = self._intent_action_funcs[self.intent][0]
# TODO: Do not match func if context not satisfied
if not view_func and len(intent_actions) > 1:
view_func = intent_actions[0]
msg = "Multiple actions defined but no context was applied, will use first action func"
logger.warning(msg)
return view_func
def has_live_context(self):
for context in self.context_in:
# lifespanCount appears to be missing if context expired
if context.get("lifespanCount", 0) > 0:
return True
[docs] def run_aws_lambda(self, event):
"""Invoke the Flask Assistant application from an AWS Lambda function handler.
Use this method to service AWS Lambda requests from a custom Assistant
Action. This method will invoke your Flask application providing a
WSGI-compatible environment that wraps the original Dialogflow event
provided to the AWS API Gateway handler. Returns the output generated by
a Flask Assistant application, which should be used as the return value
to the AWS Lambda handler function ready for API Gateway.
From Flask Ask and adjusted for Flask Assistant
Example usage:
from flask import Flask
from flask_assistant import Assistant, ask
app = Flask(__name__)
assist = Assistant(app, route='/')
logging.getLogger('flask_assistant').setLevel(logging.DEBUG)
def lambda_handler(event, _context):
return assist.run_aws_lambda(event)
@assist.action('greetings')
def greet_and_start():
speech = "Hey! Are you male or female?"
return ask(speech)
"""
# Convert an environment variable to a WSGI "bytes-as-unicode" string
enc, esc = sys.getfilesystemencoding(), "surrogateescape"
def unicode_to_wsgi(u):
return u.encode(enc, esc).decode("iso-8859-1")
# Create a WSGI-compatible environ that can be passed to the
# application. It is loaded with the OS environment variables,
# mandatory CGI-like variables, as well as the mandatory WSGI
# variables.
environ = {k: unicode_to_wsgi(v) for k, v in os.environ.items()}
environ["REQUEST_METHOD"] = "POST"
environ["PATH_INFO"] = "/"
environ["SERVER_NAME"] = "AWS-Lambda"
environ["SERVER_PORT"] = "80"
environ["SERVER_PROTOCOL"] = "HTTP/1.0"
environ["wsgi.version"] = (1, 0)
environ["wsgi.url_scheme"] = "http"
environ["wsgi.errors"] = sys.stderr
environ["wsgi.multithread"] = False
environ["wsgi.multiprocess"] = False
environ["wsgi.run_once"] = True
# Convert the event provided by the AWS Lambda handler to a JSON
# string that can be read as the body of a HTTP POST request.
body = event["body"]
environ["CONTENT_TYPE"] = "application/json"
environ["CONTENT_LENGTH"] = len(body)
environ["wsgi.input"] = StringIO(body)
# Start response is a required callback that must be passed when
# the application is invoked. It is used to set HTTP status and
# headers. Read the WSGI spec for details (PEP3333).
headers = []
def start_response(status, response_headers, _exc_info=None):
headers[:] = [status, response_headers]
# Invoke the actual Flask application providing our environment,
# with our Assistant event as the body of the HTTP request, as well
# as the callback function above. The result will be an iterator
# that provides a serialized JSON string for our Dialogflow response.
result = self.app(environ, start_response)
try:
if not headers:
raise AssertionError("start_response() not called by WSGI app")
output = json.loads(b"".join(result))
if not headers[0].startswith("2"):
raise AssertionError(
"Non-2xx from app: hdrs={}, body={}".format(headers, output)
)
# API Gateway expects Status code, headers and Body
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(output),
}
finally:
# Per the WSGI spec, we need to invoke the close method if it
# is implemented on the result object.
if hasattr(result, "close"):
result.close()
def _context_satified(self, view_func):
met = []
required_names = list(self._func_contexts[view_func])
recieved_context_names = [parse_context_name(c) for c in self.context_in]
for req_context in required_names:
if req_context in recieved_context_names:
met.append(req_context)
if set(met) == set(required_names):
if len(required_names) <= len(recieved_context_names):
return True
@property
def _context_views(self):
"""Returns view functions for which the context requirements are met"""
possible_views = []
for func in self._func_contexts:
if self._context_satified(func):
logger.debug("{} context conditions satisified".format(func.__name__))
possible_views.append(func)
return possible_views
def _choose_context_view(self):
choice = None
for view in self._context_views:
if view in self._intent_action_funcs[self.intent]:
logger.debug(
"Matched {} based on active contexts".format(view.__name__)
)
choice = view
if choice:
return choice
else:
active_contexts = [c.name for c in self.context_manager.active]
intent_actions = [
f.__name__ for f in self._intent_action_funcs[self.intent]
]
msg = "No {} action func matched based on active contexts"
logger.debug(msg)
@property
def _missing_params(self): # TODO: fill missing slot from default\
params = self.request["queryResult"]["parameters"]
missing = []
for p_name in params:
if params[p_name] == "":
missing.append(p_name)
return missing
def _func_args(self, f):
try:
argspec = inspect.getfullargspec(f)
except AttributeError: # for python2
argspec = inspect.getargspec(f)
return argspec.args
def _map_intent_to_view_func(self, view_func):
arg_names = self._func_args(view_func)
arg_values = self._map_params_to_view_args(arg_names)
return partial(view_func, *arg_values)
def _map_params_to_view_args(self, arg_names): # TODO map to correct name
arg_values = []
mapping = self._intent_mappings.get(self.intent)
convert = self._intent_converts.get(self.intent)
params = self.request["queryResult"]["parameters"]
convert_errors = {}
for arg_name in arg_names:
entity_mapping = mapping.get(arg_name, arg_name)
# param name cant have '.',
# so when registered, the sys. is stripped,
# and must be stripped when looking up in request
mapped_param_name = entity_mapping.replace("sys.", "")
value = params.get(
mapped_param_name
) # params declared in GUI present in request
if not value: # params not declared, so must look in contexts
value = self._map_arg_from_context(arg_name)
elif arg_name in convert:
# Apply parameter conversion
shorthand_or_function = convert[arg_name]
if shorthand_or_function in _converter_shorthands:
convert_func = _converter_shorthands[shorthand_or_function]
else:
convert_func = shorthand_or_function
try:
value = convert_func(value)
except Exception as exc:
convert_errors[arg_name] = exc
arg_values.append(value)
self.convert_errors = convert_errors
return arg_values
def _map_arg_from_context(self, arg_name):
for context_obj in self.context_in:
if arg_name in context_obj["parameters"]:
logger.debug(
"Retrieved {} param value from {} context".format(
arg_name, context_obj["name"]
)
)
return context_obj["parameters"][arg_name]