chore(recommendations): mlflow update, pydantic update and others (#1450)

This commit is contained in:
MauricioGarciaS 2023-08-22 15:23:08 +02:00 committed by GitHub
parent cf0ea809eb
commit 9915e0b3c8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 176 additions and 28 deletions

View file

@ -79,7 +79,7 @@ class ServedModel:
query = conn.mogrify(
"""SELECT project_id, session_id, user_id, %(userId)s as viewer_id, events_count, errors_count, duration, user_country as country, issue_score, user_device_type as device_type
FROM sessions
WHERE project_id = %(projectId)s AND session_id NOT IN (SELECT session_id FROM user_viewed_sessions WHERE user_id = %(userId)s) AND duration IS NOT NULL AND start_ts > %(oldest_limit)s LIMIT %(limit)s""",
WHERE project_id = %(projectId)s AND session_id NOT IN (SELECT session_id FROM user_viewed_sessions WHERE user_id = %(userId)s) AND duration > 10000 AND start_ts > %(oldest_limit)s LIMIT %(limit)s""",
{'userId': userId, 'projectId': projectId, 'limit': limit, 'oldest_limit': oldest_limit}
)
conn.execute(query)

View file

@ -0,0 +1,147 @@
import asyncio
import hashlib
import mlflow
import os
import pandas as pd
import pendulum
import sys
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from datetime import datetime, timedelta
from decouple import config
_work_dir = os.getcwd()
sys.path.insert(1, _work_dir)
from utils import pg_client
from utils.feedback import ConnectionHandler
from sqlalchemy import text
execute_interval = config('EXECUTION_INTERVAL', default=24*60*60, cast=int)
features_table_name = config('FEATURES_TABLE_NAME', default='features_table')
host = config('pg_host_ml')
port = config('pg_port_ml')
user = config('pg_user_ml')
dbname = config('pg_dbname_ml')
password = config('pg_password_ml')
tracking_uri = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}"
def get_today_feedback():
connection_handler = ConnectionHandler(tracking_uri)
with connection_handler.get_live_session() as conn:
cur = conn.connection().connection.cursor()
query = cur.mogrify(
f"""SELECT * FROM recommendation_feedback WHERE insertion_time > %(time_lower_bound)s;""",
{'time_lower_bound': int(datetime.now().timestamp()) - execute_interval})
conn.execute(text(query.decode("utf-8")))
conn.commit()
def get_features_pg(ti):
os.environ['PG_POOL'] = 'true'
asyncio.run(pg_client.init())
sessionIds = ti.xcom_pull(key='sessionIds')
userIds = ti.xcom_pull(key='userIds').split(',')
with pg_client.PostgresClient() as conn:
conn.execute(
"""SELECT T.project_id,
T.session_id,
T2.viewer_id,
T.pages_count,
T.events_count,
T.errors_count,
T.duration,
T.country,
T.issue_score,
T.device_type,
T2.replays,
T2.network_access,
T2.storage_access,
T2.console_access,
T2.stack_access
FROM (SELECT project_id,
user_id as viewer_id,
session_id,
count(CASE WHEN source = 'replay' THEN 1 END) as replays,
count(CASE WHEN source = 'network' THEN 1 END) as network_access,
count(CASE WHEN source = 'storage' THEN 1 END) as storage_access,
count(CASE WHEN source = 'console' THEN 1 END) as console_access,
count(CASE WHEN source = 'stack_events' THEN 1 END) as stack_access
FROM frontend_signals
WHERE session_id IN ({sessionIds})
GROUP BY project_id, viewer_id, session_id) as T2
INNER JOIN (SELECT project_id,
session_id,
user_id,
pages_count,
events_count,
errors_count,
duration,
user_country as country,
issue_score,
user_device_type as device_type
FROM sessions
WHERE session_id IN ({sessionIds})
AND duration IS NOT NULL) as T
USING (session_id);""".format(sessionIds=sessionIds)
)
response = conn.fetchall()
sessionIds = [int(sessId) for sessId in sessionIds.split(',')]
df = pd.DataFrame(response)
df2 = pd.DataFrame(zip(userIds, sessionIds), columns=['viewer_id', 'session_id'])
base_query = f"""INSERT INTO {features_table_name} (project_id, session_id, viewer_id, pages_count, events_count,
issues_count, duration, country, issue_score, device_type,
replays, network_access, storage_access, console_access,
stack_access) VALUES """
count = 0
params = {}
for i in range(len(df)):
viewer = df['viewer_id'].iloc[i]
session = df['session_id'].iloc[i]
d = df2[df2['viewer_id'] == viewer]
x = d[d['session_id'] == session]
if len(x) > 0:
template = '('
for k, v in x.items():
params[f'{k}_{count}'] = v.values[0]
template += f's({k}_{count})%'
base_query += template + '), '
count += 1
base_query = base_query[:-2]
connection_handler = ConnectionHandler(tracking_uri)
with connection_handler.get_live_session() as conn:
cur = conn.connection().connection.cursor()
query = cur.mogrify(base_query, params)
conn.execute(text(query.decode("utf-8")))
conn.commit()
dag = DAG(
"Feedback_DB_FILL",
default_args={
"retries": 1,
"retry_delay": timedelta(minutes=3),
},
start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
description="My first test",
schedule=timedelta(seconds=execute_interval),
catchup=False,
)
with dag:
dag_t_feedback = PythonOperator(
task_id='Get_Feedbacks',
python_callable=get_today_feedback,
)
dag_features = PythonOperator(
task_id='Update_DB',
python_callable=get_features_pg,
)
dag_t_feedback >> dag_features

View file

@ -81,7 +81,7 @@ dag = DAG(
},
start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
description="My first test",
schedule=config('crons_train', default='@daily'),
schedule=config('crons_train', default='@weekly'),
catchup=False,
)

View file

@ -1,2 +1,3 @@
cp ../../api/chalicelib/utils/ch_client.py utils
cp ../../../api/chalicelib/utils/pg_client.py utils
cp ../utils/ch_client.py utils
cp ../utils/pg_client.py utils
cp ../ml_service/core/feedback.py utils

View file

@ -8,13 +8,13 @@ find airflow/ -type f -name "*.cfg" -exec sed -i "s#{{airflow_secret_key}}#${air
export MLFLOW_TRACKING_URI=postgresql+psycopg2://${pg_user_ml}:${pg_password_ml}@${pg_host_ml}:${pg_port_ml}/${pg_dbname_ml}
git init airflow/dags
# Airflow setup
airflow db init
airflow users create \
--username admin \
--firstname admin \
--lastname admin \
--role Admin \
--email admin@admin.admin \
-p ${airflow_admin_password}
# airflow db init
# airflow users create \
# --username admin \
# --firstname admin \
# --lastname admin \
# --role Admin \
# --email admin@admin.admin \
# -p ${airflow_admin_password}
# Run services
airflow webserver --port 8080 & airflow scheduler & ./mlflow_server.sh

View file

@ -3,7 +3,7 @@ import hashlib
import argparse
import numpy as np
from decouple import config
from datetime import datetime
from datetime import datetime,timedelta
from core.user_features import get_training_database
from core.recommendation_model import SVM_recommendation, sort_database
@ -53,7 +53,7 @@ def main(experiment_name, projectId, tenantId):
tenantId: tenant of the project id (used mainly as salt for hashing).
"""
hashed = hashlib.sha256(bytes(f'{projectId}-{tenantId}'.encode('utf-8'))).hexdigest()
x_, y_, d = get_training_database(projectId, max_timestamp=1680248412284, favorites=True)
x_, y_, d = get_training_database(projectId, max_timestamp=int((datetime.now() - timedelta(days=1)).timestamp()), favorites=True)
x, y = handle_database(x_, y_)
if x is None:

View file

@ -1,19 +1,19 @@
requests==2.28.2
urllib3==1.26.12
pyjwt==2.6.0
SQLAlchemy==2.0.10
requests==2.31.0
urllib3==1.26.16
pyjwt==2.8.0
SQLAlchemy==2.0.20
alembic==1.11.1
psycopg2-binary==2.9.5
psycopg2-binary==2.9.7
joblib==1.2.0
scipy==1.10.1
scikit-learn==1.2.2
mlflow==2.3
joblib==1.3.2
scipy==1.11.2
scikit-learn==1.3.0
mlflow==2.5
clickhouse-driver==0.2.5
python3-saml==1.14.0
python-multipart==0.0.5
clickhouse-driver==0.2.6
python3-saml==1.15.0
python-multipart==0.0.6
python-decouple==3.8
pydantic==1.10.8
pydantic==1.10.12
boto3==1.26.100
boto3==1.28.29