From 345f316b27f6653fa7ac5e8cd56e44226322708a Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 29 Aug 2024 16:08:33 +0200 Subject: [PATCH] =?UTF-8?q?Spots=20=E2=9A=A1=E2=9A=A1=E2=9A=A1=20(#2305)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(spot): first version to test http endpoints * fix(helm): changed nginx path prefix * fix(spots): added missing BUCKET_NAME env var * fix(spots): added services init check * feat(spots): removed geo module * feat(spots): removed uaparser * feat(spots): added more detailed authorization error log * feat(spots): changed the authorization middleware * feat(spots): extended http body size limit to 128kb * feat(spots): added s3 error log * feat(spots): added new handler for uploaded event * feat(backend): small api changes in spot service * feat(backend): rewrote request parameters grabber for getSpot handler * feat(backend): added tenantID to auth struct * feat(backend): added pre-signed download urls for preview, mob et video files * feat(backend): added user's email to spots table, and getSpot responses * feat(backend): returning spotID as a string * feat(spot): added transcoder pipeline * fix(spot): return spotID as a string * feat(spot): added volume mount to spot service * feat(spot): fixed volume mounting * feat(spot): helm fix * feat(spot): helm another fix * fix(spot): correct video.webm path * fix(spot): correct pre-signed url for download original video * feat(spot): added PATCH and DELETE methods to CORS * feat(spot): use string format for spotIDs in delete method * feat(spot): added public key implemented * fix(spot): correct public-key parser * fix(spot): fixed query params issue + user's tenantID * fix(spot): use 1 as a default tenant * feat(spot): added correct total spots calculation * fix(spot): fixed offset calculation * feat(spot): added extra check in auth method * fix(spot): removed / from video file name * fix(spot): devided codec flag into 2 parts * feat(spot): use fixed tenantID = 1 for oss users * feat(spot): return 404 for public key not found issue * feat(spots): added spots folder to minio path rule * feat(spot): added spot video streaming support * fix(spot): fixed an sql request for spot streams * feat(spot): return playlist file in getSpot responce * feat(spot): try to use aac audio codec * feat(spot): added permissions support (oss/ee) * feat(spot): added authorizer method * feat(spot): added license check * feat(spot): added spot preview for get response * fix(spot): fixed a problem with permissions * feat(spot): added crop feature * feat(spot): upload cropped video back to s3 * feat(spot): manage expired modified playlist file * feat(backend): hack with video formats * feat(backend): removed space * feat(spot): req tracing * feat(spot): manual method's name mapping * feat(spot): added a second method to public key auth support * feat(spot): metrics * feat(spot): added rate limiter per user * feat(spot): added ping endpoint for spot jwt token check * feat(spot): getStatus endpoint * feat(spot): added missing import * feat(spot): transcoding issue fix * feat(spot): temp remove tasks * feat(spot): better error log message * feat(spot): set default jwt_secret value * feat(spot): debug auth * feat(spot): 2 diff jwt tokens support * feat(spot): pg tasks with process status * feat(spot): more logs * feat(spot): improved defer for GetTask method * feat(spot): keep only failed tasks * feat(spot): removing temp dir with spot files * feat(spot): added several workers for transcoding module * feat(spot): fixed spot path for temp video files * feat(spot): use custom statusWriter to track response code in middleware * feat(spot): added body and parameter parser for auditrail feature * feat(spot): fixed IsAuth method signature * feat(spot): fixed ee service builder * feat(spot): added import * feat(spot): fix data type for payload and parameters jsonb fields * feat(spot): typo fix * feat(spot): moved out consts * feat(spot): new table's name * feat(spot): added missing imports in go.mod * feat(spot): added a check for the number of comments (20 by default) --- backend/Dockerfile | 7 +- backend/cmd/spot/main.go | 60 ++ backend/go.mod | 2 + backend/go.sum | 3 + backend/internal/config/spot/config.go | 37 ++ backend/pkg/db/postgres/pool/pool.go | 23 +- backend/pkg/metrics/common/metrics.go | 4 + backend/pkg/metrics/spot/spot.go | 194 ++++++ backend/pkg/objectstorage/objectstorage.go | 1 + backend/pkg/objectstorage/s3/s3.go | 12 + backend/pkg/pool/worker-pool.go | 10 +- backend/pkg/spot/api/handlers.go | 605 ++++++++++++++++++ backend/pkg/spot/api/limiter.go | 88 +++ backend/pkg/spot/api/model.go | 75 +++ backend/pkg/spot/api/permissions.go | 5 + backend/pkg/spot/api/router.go | 205 ++++++ backend/pkg/spot/api/tracer.go | 7 + backend/pkg/spot/auth/auth.go | 53 ++ backend/pkg/spot/auth/authorizer.go | 13 + backend/pkg/spot/auth/model.go | 34 + backend/pkg/spot/auth/storage.go | 23 + backend/pkg/spot/builder.go | 39 ++ backend/pkg/spot/service/public_key.go | 146 +++++ backend/pkg/spot/service/spot.go | 366 +++++++++++ backend/pkg/spot/transcoder/streams.go | 106 +++ backend/pkg/spot/transcoder/tasks.go | 100 +++ backend/pkg/spot/transcoder/transcoder.go | 349 ++++++++++ ee/backend/pkg/objectstorage/azure/azure.go | 4 + ee/backend/pkg/spot/api/permissions.go | 11 + ee/backend/pkg/spot/api/tracer.go | 57 ++ ee/backend/pkg/spot/auth/authorizer.go | 25 + ee/backend/pkg/spot/auth/storage.go | 32 + ee/backend/pkg/spot/builder.go | 45 ++ ee/backend/pkg/spot/service/tracer.go | 104 +++ .../charts/http/templates/ingress.yaml | 2 +- .../openreplay/charts/spot/.helmignore | 23 + .../openreplay/charts/spot/Chart.yaml | 24 + .../charts/spot/templates/NOTES.txt | 22 + .../charts/spot/templates/_helpers.tpl | 62 ++ .../charts/spot/templates/deployment.yaml | 140 ++++ .../openreplay/charts/spot/templates/hpa.yaml | 29 + .../charts/spot/templates/ingress.yaml | 36 ++ .../charts/spot/templates/service.yaml | 18 + .../charts/spot/templates/serviceMonitor.yaml | 18 + .../charts/spot/templates/serviceaccount.yaml | 13 + .../spot/templates/tests/test-connection.yaml | 15 + .../openreplay/charts/spot/values.yaml | 133 ++++ 47 files changed, 3365 insertions(+), 15 deletions(-) create mode 100644 backend/cmd/spot/main.go create mode 100644 backend/internal/config/spot/config.go create mode 100644 backend/pkg/metrics/spot/spot.go create mode 100644 backend/pkg/spot/api/handlers.go create mode 100644 backend/pkg/spot/api/limiter.go create mode 100644 backend/pkg/spot/api/model.go create mode 100644 backend/pkg/spot/api/permissions.go create mode 100644 backend/pkg/spot/api/router.go create mode 100644 backend/pkg/spot/api/tracer.go create mode 100644 backend/pkg/spot/auth/auth.go create mode 100644 backend/pkg/spot/auth/authorizer.go create mode 100644 backend/pkg/spot/auth/model.go create mode 100644 backend/pkg/spot/auth/storage.go create mode 100644 backend/pkg/spot/builder.go create mode 100644 backend/pkg/spot/service/public_key.go create mode 100644 backend/pkg/spot/service/spot.go create mode 100644 backend/pkg/spot/transcoder/streams.go create mode 100644 backend/pkg/spot/transcoder/tasks.go create mode 100644 backend/pkg/spot/transcoder/transcoder.go create mode 100644 ee/backend/pkg/spot/api/permissions.go create mode 100644 ee/backend/pkg/spot/api/tracer.go create mode 100644 ee/backend/pkg/spot/auth/authorizer.go create mode 100644 ee/backend/pkg/spot/auth/storage.go create mode 100644 ee/backend/pkg/spot/builder.go create mode 100644 ee/backend/pkg/spot/service/tracer.go create mode 100644 scripts/helmcharts/openreplay/charts/spot/.helmignore create mode 100644 scripts/helmcharts/openreplay/charts/spot/Chart.yaml create mode 100644 scripts/helmcharts/openreplay/charts/spot/templates/NOTES.txt create mode 100644 scripts/helmcharts/openreplay/charts/spot/templates/_helpers.tpl create mode 100644 scripts/helmcharts/openreplay/charts/spot/templates/deployment.yaml create mode 100644 scripts/helmcharts/openreplay/charts/spot/templates/hpa.yaml create mode 100644 scripts/helmcharts/openreplay/charts/spot/templates/ingress.yaml create mode 100644 scripts/helmcharts/openreplay/charts/spot/templates/service.yaml create mode 100644 scripts/helmcharts/openreplay/charts/spot/templates/serviceMonitor.yaml create mode 100644 scripts/helmcharts/openreplay/charts/spot/templates/serviceaccount.yaml create mode 100644 scripts/helmcharts/openreplay/charts/spot/templates/tests/test-connection.yaml create mode 100644 scripts/helmcharts/openreplay/charts/spot/values.yaml diff --git a/backend/Dockerfile b/backend/Dockerfile index 8effee9b8..9f5db5de7 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -103,7 +103,10 @@ ENV TZ=UTC \ COMPRESSION_THRESHOLD="20000" \ # Set Access-Control-* headers for tracker requests if true USE_CORS=false \ - RECORD_CANVAS=true + RECORD_CANVAS=true \ + JWT_SECRET="SECRET" \ + JWT_SPOT_SECRET="SECRET" \ + BUCKET_NAME="spots" RUN if [ "$SERVICE_NAME" = "http" ]; then \ @@ -113,7 +116,7 @@ RUN if [ "$SERVICE_NAME" = "http" ]; then \ apk add --no-cache zstd; \ elif [ "$SERVICE_NAME" = "canvas-handler" ]; then \ apk add --no-cache zstd; \ - elif [ "$SERVICE_NAME" = "canvas-maker" ]; then \ + elif [ "$SERVICE_NAME" = "spot" ]; then \ apk add --no-cache ffmpeg; \ fi diff --git a/backend/cmd/spot/main.go b/backend/cmd/spot/main.go new file mode 100644 index 000000000..b4204486e --- /dev/null +++ b/backend/cmd/spot/main.go @@ -0,0 +1,60 @@ +package main + +import ( + "context" + "openreplay/backend/pkg/spot" + "openreplay/backend/pkg/spot/api" + "os" + "os/signal" + "syscall" + + spotConfig "openreplay/backend/internal/config/spot" + "openreplay/backend/internal/http/server" + "openreplay/backend/pkg/db/postgres/pool" + "openreplay/backend/pkg/logger" + "openreplay/backend/pkg/metrics" + databaseMetrics "openreplay/backend/pkg/metrics/database" + spotMetrics "openreplay/backend/pkg/metrics/spot" +) + +func main() { + ctx := context.Background() + log := logger.New() + cfg := spotConfig.New(log) + metrics.New(log, append(spotMetrics.List(), databaseMetrics.List()...)) + + pgConn, err := pool.New(cfg.Postgres.String()) + if err != nil { + log.Fatal(ctx, "can't init postgres connection: %s", err) + } + defer pgConn.Close() + + services, err := spot.NewServiceBuilder(log, cfg, pgConn) + if err != nil { + log.Fatal(ctx, "can't init services: %s", err) + } + + router, err := api.NewRouter(cfg, log, services) + if err != nil { + log.Fatal(ctx, "failed while creating router: %s", err) + } + + spotServer, err := server.New(router.GetHandler(), cfg.HTTPHost, cfg.HTTPPort, cfg.HTTPTimeout) + if err != nil { + log.Fatal(ctx, "failed while creating server: %s", err) + } + + go func() { + if err := spotServer.Start(); err != nil { + log.Fatal(ctx, "http server error: %s", err) + } + }() + log.Info(ctx, "server successfully started on port %s", cfg.HTTPPort) + + // Wait stop signal to shut down server gracefully + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + <-sigchan + log.Info(ctx, "shutting down the server") + spotServer.Stop() +} diff --git a/backend/go.mod b/backend/go.mod index 6ef64859e..bed65cb32 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -17,6 +17,7 @@ require ( github.com/elastic/go-elasticsearch/v7 v7.13.1 github.com/elastic/go-elasticsearch/v8 v8.13.0 github.com/go-redis/redis v6.15.9+incompatible + github.com/golang-jwt/jwt/v5 v5.2.1 github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 github.com/jackc/pgconn v1.14.3 @@ -29,6 +30,7 @@ require ( github.com/oschwald/maxminddb-golang v1.7.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 + github.com/rs/xid v1.2.1 github.com/sethvargo/go-envconfig v0.7.0 github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce github.com/ua-parser/uap-go v0.0.0-20200325213135-e1c09f13e2fe diff --git a/backend/go.sum b/backend/go.sum index f88e886e1..a8ef1c925 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -199,6 +199,8 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= @@ -460,6 +462,7 @@ github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= diff --git a/backend/internal/config/spot/config.go b/backend/internal/config/spot/config.go new file mode 100644 index 000000000..7f65dcc83 --- /dev/null +++ b/backend/internal/config/spot/config.go @@ -0,0 +1,37 @@ +package spot + +import ( + "time" + + "openreplay/backend/internal/config/common" + "openreplay/backend/internal/config/configurator" + "openreplay/backend/internal/config/objectstorage" + "openreplay/backend/internal/config/redis" + "openreplay/backend/pkg/env" + "openreplay/backend/pkg/logger" +) + +type Config struct { + common.Config + common.Postgres + redis.Redis + objectstorage.ObjectsConfig + FSDir string `env:"FS_DIR,required"` + SpotsDir string `env:"SPOTS_DIR,default=spots"` + HTTPHost string `env:"HTTP_HOST,default="` + HTTPPort string `env:"HTTP_PORT,required"` + HTTPTimeout time.Duration `env:"HTTP_TIMEOUT,default=60s"` + JsonSizeLimit int64 `env:"JSON_SIZE_LIMIT,default=131072"` // 128KB + UseAccessControlHeaders bool `env:"USE_CORS,default=false"` + ProjectExpiration time.Duration `env:"PROJECT_EXPIRATION,default=10m"` + JWTSecret string `env:"JWT_SECRET,required"` + JWTSpotSecret string `env:"JWT_SPOT_SECRET,required"` + MinimumStreamDuration int `env:"MINIMUM_STREAM_DURATION,default=15000"` // 15s + WorkerID uint16 +} + +func New(log logger.Logger) *Config { + cfg := &Config{WorkerID: env.WorkerID()} + configurator.Process(log, cfg) + return cfg +} diff --git a/backend/pkg/db/postgres/pool/pool.go b/backend/pkg/db/postgres/pool/pool.go index 1db796cdf..7e3f9f4ad 100644 --- a/backend/pkg/db/postgres/pool/pool.go +++ b/backend/pkg/db/postgres/pool/pool.go @@ -18,7 +18,7 @@ type Pool interface { QueryRow(sql string, args ...interface{}) pgx.Row Exec(sql string, arguments ...interface{}) error SendBatch(b *pgx.Batch) pgx.BatchResults - Begin() (*_Tx, error) + Begin() (*Tx, error) Close() } @@ -62,12 +62,12 @@ func (p *poolImpl) SendBatch(b *pgx.Batch) pgx.BatchResults { return res } -func (p *poolImpl) Begin() (*_Tx, error) { +func (p *poolImpl) Begin() (*Tx, error) { start := time.Now() tx, err := p.conn.Begin(context.Background()) database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "begin", "") database.IncreaseTotalRequests("begin", "") - return &_Tx{tx}, err + return &Tx{tx}, err } func (p *poolImpl) Close() { @@ -91,11 +91,11 @@ func New(url string) (Pool, error) { // TX - start -type _Tx struct { +type Tx struct { pgx.Tx } -func (tx *_Tx) exec(sql string, args ...interface{}) error { +func (tx *Tx) TxExec(sql string, args ...interface{}) error { start := time.Now() _, err := tx.Exec(context.Background(), sql, args...) method, table := methodName(sql) @@ -104,7 +104,16 @@ func (tx *_Tx) exec(sql string, args ...interface{}) error { return err } -func (tx *_Tx) rollback() error { +func (tx *Tx) TxQueryRow(sql string, args ...interface{}) pgx.Row { + start := time.Now() + res := tx.QueryRow(context.Background(), sql, args...) + method, table := methodName(sql) + database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), method, table) + database.IncreaseTotalRequests(method, table) + return res +} + +func (tx *Tx) TxRollback() error { start := time.Now() err := tx.Rollback(context.Background()) database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "rollback", "") @@ -112,7 +121,7 @@ func (tx *_Tx) rollback() error { return err } -func (tx *_Tx) commit() error { +func (tx *Tx) TxCommit() error { start := time.Now() err := tx.Commit(context.Background()) database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "commit", "") diff --git a/backend/pkg/metrics/common/metrics.go b/backend/pkg/metrics/common/metrics.go index 85b66c713..0617ccf45 100644 --- a/backend/pkg/metrics/common/metrics.go +++ b/backend/pkg/metrics/common/metrics.go @@ -9,3 +9,7 @@ var DefaultSizeBuckets = []float64{1, 10, 50, 100, 250, 500, 1000, 2500, 5000, 1 // DefaultBuckets is a set of buckets from 1 to 1_000_000 elements var DefaultBuckets = []float64{1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10_000, 50_000, 100_000, 1_000_000} + +// VideoSizeBuckets is a set of buckets from 1_000 bytes (~1 Kb) to 500_000_000 bytes (~500 Mb) +var VideoSizeBuckets = []float64{1_000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000, 20_000_000, 30_000_000, + 40_000_000, 50_000_000, 75_000_000, 100_000_000, 250_000_000, 500_000_000} diff --git a/backend/pkg/metrics/spot/spot.go b/backend/pkg/metrics/spot/spot.go new file mode 100644 index 000000000..df5420a97 --- /dev/null +++ b/backend/pkg/metrics/spot/spot.go @@ -0,0 +1,194 @@ +package spot + +import ( + "strconv" + + "github.com/prometheus/client_golang/prometheus" + + "openreplay/backend/pkg/metrics/common" +) + +var spotRequestSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "spot", + Name: "request_size_bytes", + Help: "A histogram displaying the size of each HTTP request in bytes.", + Buckets: common.DefaultSizeBuckets, + }, + []string{"url", "response_code"}, +) + +func RecordRequestSize(size float64, url string, code int) { + spotRequestSize.WithLabelValues(url, strconv.Itoa(code)).Observe(size) +} + +var spotRequestDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "spot", + Name: "request_duration_seconds", + Help: "A histogram displaying the duration of each HTTP request in seconds.", + Buckets: common.DefaultDurationBuckets, + }, + []string{"url", "response_code"}, +) + +func RecordRequestDuration(durMillis float64, url string, code int) { + spotRequestDuration.WithLabelValues(url, strconv.Itoa(code)).Observe(durMillis / 1000.0) +} + +var spotTotalRequests = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "spot", + Name: "requests_total", + Help: "A counter displaying the number all HTTP requests.", + }, +) + +func IncreaseTotalRequests() { + spotTotalRequests.Inc() +} + +var spotOriginalVideoSize = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "spot", + Name: "original_video_size_bytes", + Help: "A histogram displaying the size of each original video in bytes.", + Buckets: common.VideoSizeBuckets, + }, +) + +func RecordOriginalVideoSize(size float64) { + spotOriginalVideoSize.Observe(size) +} + +var spotCroppedVideoSize = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "spot", + Name: "cropped_video_size_bytes", + Help: "A histogram displaying the size of each cropped video in bytes.", + Buckets: common.VideoSizeBuckets, + }, +) + +func RecordCroppedVideoSize(size float64) { + spotCroppedVideoSize.Observe(size) +} + +var spotVideosTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "spot", + Name: "videos_total", + Help: "A counter displaying the total number of all processed videos.", + }, +) + +func IncreaseVideosTotal() { + spotVideosTotal.Inc() +} + +var spotVideosCropped = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "spot", + Name: "videos_cropped_total", + Help: "A counter displaying the total number of all cropped videos.", + }, +) + +func IncreaseVideosCropped() { + spotVideosCropped.Inc() +} + +var spotVideosTranscoded = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "spot", + Name: "videos_transcoded_total", + Help: "A counter displaying the total number of all transcoded videos.", + }, +) + +func IncreaseVideosTranscoded() { + spotVideosTranscoded.Inc() +} + +var spotOriginalVideoDownloadDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "spot", + Name: "original_video_download_duration_seconds", + Help: "A histogram displaying the duration of downloading each original video in seconds.", + Buckets: common.DefaultDurationBuckets, + }, +) + +func RecordOriginalVideoDownloadDuration(durMillis float64) { + spotOriginalVideoDownloadDuration.Observe(durMillis / 1000.0) +} + +var spotCroppingDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "spot", + Name: "cropping_duration_seconds", + Help: "A histogram displaying the duration of cropping each video in seconds.", + Buckets: common.DefaultDurationBuckets, + }, +) + +func RecordCroppingDuration(durMillis float64) { + spotCroppingDuration.Observe(durMillis / 1000.0) +} + +var spotCroppedVideoUploadDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "spot", + Name: "cropped_video_upload_duration_seconds", + Help: "A histogram displaying the duration of uploading each cropped video in seconds.", + Buckets: common.DefaultDurationBuckets, + }, +) + +func RecordCroppedVideoUploadDuration(durMillis float64) { + spotCroppedVideoUploadDuration.Observe(durMillis / 1000.0) +} + +var spotTranscodingDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "spot", + Name: "transcoding_duration_seconds", + Help: "A histogram displaying the duration of transcoding each video in seconds.", + Buckets: common.DefaultDurationBuckets, + }, +) + +func RecordTranscodingDuration(durMillis float64) { + spotTranscodingDuration.Observe(durMillis / 1000.0) +} + +var spotTranscodedVideoUploadDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "spot", + Name: "transcoded_video_upload_duration_seconds", + Help: "A histogram displaying the duration of uploading each transcoded video in seconds.", + Buckets: common.DefaultDurationBuckets, + }, +) + +func RecordTranscodedVideoUploadDuration(durMillis float64) { + spotTranscodedVideoUploadDuration.Observe(durMillis / 1000.0) +} + +func List() []prometheus.Collector { + return []prometheus.Collector{ + spotRequestSize, + spotRequestDuration, + spotTotalRequests, + spotOriginalVideoSize, + spotCroppedVideoSize, + spotVideosTotal, + spotVideosCropped, + spotVideosTranscoded, + spotOriginalVideoDownloadDuration, + spotCroppingDuration, + spotCroppedVideoUploadDuration, + spotTranscodingDuration, + spotTranscodedVideoUploadDuration, + } +} diff --git a/backend/pkg/objectstorage/objectstorage.go b/backend/pkg/objectstorage/objectstorage.go index f97e6408f..a514d70ef 100644 --- a/backend/pkg/objectstorage/objectstorage.go +++ b/backend/pkg/objectstorage/objectstorage.go @@ -20,4 +20,5 @@ type ObjectStorage interface { Exists(key string) bool GetCreationTime(key string) *time.Time GetPreSignedUploadUrl(key string) (string, error) + GetPreSignedDownloadUrl(key string) (string, error) } diff --git a/backend/pkg/objectstorage/s3/s3.go b/backend/pkg/objectstorage/s3/s3.go index 05b53e27d..4997079c5 100644 --- a/backend/pkg/objectstorage/s3/s3.go +++ b/backend/pkg/objectstorage/s3/s3.go @@ -206,3 +206,15 @@ func (s *storageImpl) GetPreSignedUploadUrl(key string) (string, error) { } return urlStr, nil } + +func (s *storageImpl) GetPreSignedDownloadUrl(key string) (string, error) { + req, _ := s.svc.GetObjectRequest(&s3.GetObjectInput{ + Bucket: aws.String(*s.bucket), + Key: aws.String(key), + }) + urlStr, err := req.Presign(15 * time.Minute) + if err != nil { + return "", err + } + return urlStr, nil +} diff --git a/backend/pkg/pool/worker-pool.go b/backend/pkg/pool/worker-pool.go index 791b0bb3b..a07e9d0eb 100644 --- a/backend/pkg/pool/worker-pool.go +++ b/backend/pkg/pool/worker-pool.go @@ -7,11 +7,11 @@ type task struct { toStop bool } -func NewTask(payload interface{}) *task { +func newTask(payload interface{}) *task { return &task{Payload: payload} } -func NewStopSignal() *task { +func newStopSignal() *task { return &task{toStop: true} } @@ -28,7 +28,7 @@ type WorkerPool interface { Stop() } -func NewPool(numberOfWorkers, queueSize int, handler func(payload interface{})) *workerPoolImpl { +func NewPool(numberOfWorkers, queueSize int, handler func(payload interface{})) WorkerPool { pool := &workerPoolImpl{ wg: &sync.WaitGroup{}, tasks: make(chan *task, queueSize), @@ -47,12 +47,12 @@ func (p *workerPoolImpl) runWorkers() { } func (p *workerPoolImpl) Submit(payload interface{}) { - p.tasks <- NewTask(payload) + p.tasks <- newTask(payload) } func (p *workerPoolImpl) stop() { for i := 0; i < p.numberOfWorkers; i++ { - p.tasks <- NewStopSignal() + p.tasks <- newStopSignal() } p.wg.Wait() } diff --git a/backend/pkg/spot/api/handlers.go b/backend/pkg/spot/api/handlers.go new file mode 100644 index 000000000..e1afa2c08 --- /dev/null +++ b/backend/pkg/spot/api/handlers.go @@ -0,0 +1,605 @@ +package api + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "time" + + "github.com/gorilla/mux" + + metrics "openreplay/backend/pkg/metrics/spot" + "openreplay/backend/pkg/objectstorage" + "openreplay/backend/pkg/spot/auth" + "openreplay/backend/pkg/spot/service" +) + +func (e *Router) createSpot(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + bodySize := 0 + + bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize) + return + } + bodySize = len(bodyBytes) + + req := &CreateSpotRequest{} + if err := json.Unmarshal(bodyBytes, req); err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + // Creat a spot + currUser := r.Context().Value("userData").(*auth.User) + newSpot, err := e.services.Spots.Add(currUser, req.Name, req.Comment, req.Duration, req.Crop) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + + // Parse and upload preview image + previewImage, err := getSpotPreview(req.Preview) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + previewName := fmt.Sprintf("%d/preview.jpeg", newSpot.ID) + if err = e.services.ObjStorage.Upload(bytes.NewReader(previewImage), previewName, "image/jpeg", objectstorage.NoCompression); err != nil { + e.log.Error(r.Context(), "can't upload preview image: %s", err) + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, errors.New("can't upload preview image"), startTime, r.URL.Path, bodySize) + return + } + + mobURL, err := e.getUploadMobURL(newSpot.ID) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + videoURL, err := e.getUploadVideoURL(newSpot.ID) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + + resp := &CreateSpotResponse{ + ID: strconv.Itoa(int(newSpot.ID)), + MobURL: mobURL, + VideoURL: videoURL, + } + e.ResponseWithJSON(r.Context(), w, resp, startTime, r.URL.Path, bodySize) +} + +func getSpotPreview(preview string) ([]byte, error) { + parts := strings.Split(preview, ",") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid preview format") + } + base64Str := parts[1] + + data, err := base64.StdEncoding.DecodeString(base64Str) + if err != nil { + return nil, fmt.Errorf("can't decode base64 preview: %s", err) + } + return data, nil +} + +func (e *Router) getUploadMobURL(spotID uint64) (string, error) { + mobKey := fmt.Sprintf("%d/events.mob", spotID) + mobURL, err := e.services.ObjStorage.GetPreSignedUploadUrl(mobKey) + if err != nil { + return "", fmt.Errorf("can't get mob URL: %s", err) + } + return mobURL, nil +} + +func (e *Router) getUploadVideoURL(spotID uint64) (string, error) { + mobKey := fmt.Sprintf("%d/video.webm", spotID) + mobURL, err := e.services.ObjStorage.GetPreSignedUploadUrl(mobKey) + if err != nil { + return "", fmt.Errorf("can't get video URL: %s", err) + } + return mobURL, nil +} + +func getSpotID(r *http.Request) (uint64, error) { + vars := mux.Vars(r) + idStr := vars["id"] + if idStr == "" { + return 0, fmt.Errorf("empty spot id") + } + id, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid spot id") + } + if id <= 0 { + return 0, fmt.Errorf("invalid spot id") + } + return id, nil +} + +func getSpotsRequest(r *http.Request) (*GetSpotsRequest, error) { + params := r.URL.Query() + page := params.Get("page") + limit := params.Get("limit") + pageNum, _ := strconv.ParseUint(page, 10, 64) + limitNum, _ := strconv.ParseUint(limit, 10, 64) + req := &GetSpotsRequest{ + Query: params.Get("query"), + FilterBy: params.Get("filterBy"), + Order: params.Get("order"), + Page: pageNum, + Limit: limitNum, + } + return req, nil +} + +func (e *Router) getPreviewURL(spotID uint64) (string, error) { + previewKey := fmt.Sprintf("%d/preview.jpeg", spotID) + previewURL, err := e.services.ObjStorage.GetPreSignedDownloadUrl(previewKey) + if err != nil { + return "", fmt.Errorf("can't get preview URL: %s", err) + } + return previewURL, nil +} + +func (e *Router) getMobURL(spotID uint64) (string, error) { + mobKey := fmt.Sprintf("%d/events.mob", spotID) + mobURL, err := e.services.ObjStorage.GetPreSignedDownloadUrl(mobKey) + if err != nil { + return "", fmt.Errorf("can't get mob URL: %s", err) + } + return mobURL, nil +} + +func (e *Router) getVideoURL(spotID uint64) (string, error) { + mobKey := fmt.Sprintf("%d/video.webm", spotID) // TODO: later return url to m3u8 file + mobURL, err := e.services.ObjStorage.GetPreSignedDownloadUrl(mobKey) + if err != nil { + return "", fmt.Errorf("can't get video URL: %s", err) + } + return mobURL, nil +} + +func (e *Router) getSpot(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + bodySize := 0 + + id, err := getSpotID(r) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + user := r.Context().Value("userData").(*auth.User) + res, err := e.services.Spots.GetByID(user, id) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + if res == nil { + e.ResponseWithError(r.Context(), w, http.StatusNotFound, fmt.Errorf("spot not found"), startTime, r.URL.Path, bodySize) + return + } + + previewUrl, err := e.getPreviewURL(id) + if err != nil { + e.log.Error(r.Context(), "can't get preview URL: %s", err) + } + mobURL, err := e.getMobURL(id) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + videoURL, err := e.getVideoURL(id) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + + spotInfo := &Info{ + Name: res.Name, + UserEmail: res.UserEmail, + Duration: res.Duration, + Comments: res.Comments, + CreatedAt: res.CreatedAt, + PreviewURL: previewUrl, + MobURL: mobURL, + VideoURL: videoURL, + } + playlist, err := e.services.Transcoder.GetSpotStreamPlaylist(id) + if err != nil { + e.log.Warn(r.Context(), "can't get stream playlist: %s", err) + } else { + spotInfo.StreamFile = base64.StdEncoding.EncodeToString(playlist) + } + + e.ResponseWithJSON(r.Context(), w, &GetSpotResponse{Spot: spotInfo}, startTime, r.URL.Path, bodySize) +} + +func (e *Router) updateSpot(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + bodySize := 0 + + id, err := getSpotID(r) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize) + return + } + bodySize = len(bodyBytes) + + req := &UpdateSpotRequest{} + if err := json.Unmarshal(bodyBytes, req); err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + user := r.Context().Value("userData").(*auth.User) + _, err = e.services.Spots.UpdateName(user, id, req.Name) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + + e.ResponseOK(r.Context(), w, startTime, r.URL.Path, bodySize) +} + +func (e *Router) getSpots(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + bodySize := 0 + + req, err := getSpotsRequest(r) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + user := r.Context().Value("userData").(*auth.User) + opts := &service.GetOpts{ + NameFilter: req.Query, Order: req.Order, Page: req.Page, Limit: req.Limit} + switch req.FilterBy { + case "own": + opts.UserID = user.ID + default: + opts.TenantID = user.TenantID + } + spots, total, err := e.services.Spots.Get(user, opts) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + res := make([]ShortInfo, 0, len(spots)) + for _, spot := range spots { + previewUrl, err := e.getPreviewURL(spot.ID) + if err != nil { + e.log.Error(r.Context(), "can't get preview URL: %s", err) + } + res = append(res, ShortInfo{ + ID: strconv.Itoa(int(spot.ID)), + Name: spot.Name, + UserEmail: spot.UserEmail, + Duration: spot.Duration, + CreatedAt: spot.CreatedAt, + PreviewURL: previewUrl, + }) + } + e.ResponseWithJSON(r.Context(), w, &GetSpotsResponse{Spots: res, Total: total}, startTime, r.URL.Path, bodySize) +} + +func (e *Router) deleteSpots(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + bodySize := 0 + + bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize) + return + } + bodySize = len(bodyBytes) + + req := &DeleteSpotRequest{} + if err := json.Unmarshal(bodyBytes, req); err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + spotsToDelete := make([]uint64, 0, len(req.SpotIDs)) + for _, idStr := range req.SpotIDs { + id, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, fmt.Errorf("invalid spot id: %s", idStr), startTime, r.URL.Path, bodySize) + return + } + spotsToDelete = append(spotsToDelete, id) + } + + user := r.Context().Value("userData").(*auth.User) + if err := e.services.Spots.Delete(user, spotsToDelete); err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + + e.ResponseOK(r.Context(), w, startTime, r.URL.Path, bodySize) +} + +func (e *Router) addComment(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + bodySize := 0 + + id, err := getSpotID(r) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize) + return + } + bodySize = len(bodyBytes) + + req := &AddCommentRequest{} + if err := json.Unmarshal(bodyBytes, req); err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + user := r.Context().Value("userData").(*auth.User) + updatedSpot, err := e.services.Spots.AddComment(user, id, &service.Comment{UserName: req.UserName, Text: req.Comment}) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + + mobURL, err := e.getMobURL(id) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + videoURL, err := e.getVideoURL(id) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + + spotInfo := &Info{ + Name: updatedSpot.Name, + Duration: updatedSpot.Duration, + Comments: updatedSpot.Comments, + CreatedAt: updatedSpot.CreatedAt, + MobURL: mobURL, + VideoURL: videoURL, + } + e.ResponseWithJSON(r.Context(), w, &GetSpotResponse{Spot: spotInfo}, startTime, r.URL.Path, bodySize) +} + +func (e *Router) uploadedSpot(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + bodySize := 0 + + id, err := getSpotID(r) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + user := r.Context().Value("userData").(*auth.User) + spot, err := e.services.Spots.GetByID(user, id) // check if spot exists + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + e.log.Info(r.Context(), "uploaded spot %+v, from user: %+v", spot, user) + if err := e.services.Transcoder.Process(spot); err != nil { + e.log.Error(r.Context(), "can't add transcoding task: %s", err) + } + + e.ResponseOK(r.Context(), w, startTime, r.URL.Path, bodySize) +} + +func (e *Router) getSpotVideo(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + bodySize := 0 + + id, err := getSpotID(r) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + key := fmt.Sprintf("%d/video.webm", id) + videoURL, err := e.services.ObjStorage.GetPreSignedDownloadUrl(key) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + + resp := map[string]interface{}{ + "url": videoURL, + } + e.ResponseWithJSON(r.Context(), w, resp, startTime, r.URL.Path, bodySize) +} + +func (e *Router) getSpotStream(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + bodySize := 0 + + id, err := getSpotID(r) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + // Example data to serve as the file content + streamPlaylist, err := e.services.Transcoder.GetSpotStreamPlaylist(id) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + + // Create a buffer with the file content + buffer := bytes.NewBuffer(streamPlaylist) + + // Set the headers for the response + w.Header().Set("Content-Disposition", "attachment; filename=index.m3u8") + w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") //"application/octet-stream") + w.Header().Set("Content-Length", string(len(streamPlaylist))) + + // Write the content of the buffer to the response writer + if _, err := buffer.WriteTo(w); err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } +} + +func (e *Router) getPublicKey(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + bodySize := 0 + + id, err := getSpotID(r) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + user := r.Context().Value("userData").(*auth.User) + key, err := e.services.Keys.Get(id, user) + if err != nil { + if strings.Contains(err.Error(), "not found") { + e.ResponseWithError(r.Context(), w, http.StatusNotFound, err, startTime, r.URL.Path, bodySize) + } else { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + } + return + } + resp := map[string]interface{}{ + "key": key, + } + e.ResponseWithJSON(r.Context(), w, resp, startTime, r.URL.Path, bodySize) +} + +func (e *Router) updatePublicKey(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + bodySize := 0 + + id, err := getSpotID(r) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize) + return + } + bodySize = len(bodyBytes) + + req := &UpdateSpotPublicKeyRequest{} + if err := json.Unmarshal(bodyBytes, req); err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + user := r.Context().Value("userData").(*auth.User) + key, err := e.services.Keys.Set(id, req.Expiration, user) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + resp := map[string]interface{}{ + "key": key, + } + e.ResponseWithJSON(r.Context(), w, resp, startTime, r.URL.Path, bodySize) +} + +func (e *Router) spotStatus(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + bodySize := 0 + + id, err := getSpotID(r) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) + return + } + + user := r.Context().Value("userData").(*auth.User) + status, err := e.services.Spots.GetStatus(user, id) + if err != nil { + e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) + return + } + resp := map[string]interface{}{ + "status": status, + } + e.ResponseWithJSON(r.Context(), w, resp, startTime, r.URL.Path, bodySize) +} + +func recordMetrics(requestStart time.Time, url string, code, bodySize int) { + if bodySize > 0 { + metrics.RecordRequestSize(float64(bodySize), url, code) + } + metrics.IncreaseTotalRequests() + metrics.RecordRequestDuration(float64(time.Now().Sub(requestStart).Milliseconds()), url, code) +} + +func (e *Router) readBody(w http.ResponseWriter, r *http.Request, limit int64) ([]byte, error) { + body := http.MaxBytesReader(w, r.Body, limit) + bodyBytes, err := io.ReadAll(body) + + // Close body + if closeErr := body.Close(); closeErr != nil { + e.log.Warn(r.Context(), "error while closing request body: %s", closeErr) + } + if err != nil { + return nil, err + } + return bodyBytes, nil +} + +func (e *Router) ResponseOK(ctx context.Context, w http.ResponseWriter, requestStart time.Time, url string, bodySize int) { + w.WriteHeader(http.StatusOK) + e.log.Info(ctx, "response ok") + recordMetrics(requestStart, url, http.StatusOK, bodySize) +} + +func (e *Router) ResponseWithJSON(ctx context.Context, w http.ResponseWriter, res interface{}, requestStart time.Time, url string, bodySize int) { + e.log.Info(ctx, "response ok") + body, err := json.Marshal(res) + if err != nil { + e.log.Error(ctx, "can't marshal response: %s", err) + } + w.Header().Set("Content-Type", "application/json") + w.Write(body) + recordMetrics(requestStart, url, http.StatusOK, bodySize) +} + +type response struct { + Error string `json:"error"` +} + +func (e *Router) ResponseWithError(ctx context.Context, w http.ResponseWriter, code int, err error, requestStart time.Time, url string, bodySize int) { + e.log.Error(ctx, "response error, code: %d, error: %s", code, err) + body, err := json.Marshal(&response{err.Error()}) + if err != nil { + e.log.Error(ctx, "can't marshal response: %s", err) + } + w.WriteHeader(code) + w.Write(body) + recordMetrics(requestStart, url, code, bodySize) +} diff --git a/backend/pkg/spot/api/limiter.go b/backend/pkg/spot/api/limiter.go new file mode 100644 index 000000000..004f8be86 --- /dev/null +++ b/backend/pkg/spot/api/limiter.go @@ -0,0 +1,88 @@ +package api + +import ( + "sync" + "time" +) + +type RateLimiter struct { + rate int + burst int + tokens int + lastToken time.Time + lastUsed time.Time + mu sync.Mutex +} + +func NewRateLimiter(rate int, burst int) *RateLimiter { + return &RateLimiter{ + rate: rate, + burst: burst, + tokens: burst, + lastToken: time.Now(), + lastUsed: time.Now(), + } +} + +func (rl *RateLimiter) Allow() bool { + rl.mu.Lock() + defer rl.mu.Unlock() + + now := time.Now() + elapsed := now.Sub(rl.lastToken) + + rl.tokens += int(elapsed.Seconds()) * rl.rate + if rl.tokens > rl.burst { + rl.tokens = rl.burst + } + + rl.lastToken = now + rl.lastUsed = now + + if rl.tokens > 0 { + rl.tokens-- + return true + } + return false +} + +type UserRateLimiter struct { + rateLimiters sync.Map + rate int + burst int + cleanupInterval time.Duration + maxIdleTime time.Duration +} + +func NewUserRateLimiter(rate int, burst int, cleanupInterval time.Duration, maxIdleTime time.Duration) *UserRateLimiter { + url := &UserRateLimiter{ + rate: rate, + burst: burst, + cleanupInterval: cleanupInterval, + maxIdleTime: maxIdleTime, + } + go url.cleanup() + return url +} + +func (url *UserRateLimiter) GetRateLimiter(user uint64) *RateLimiter { + value, _ := url.rateLimiters.LoadOrStore(user, NewRateLimiter(url.rate, url.burst)) + return value.(*RateLimiter) +} + +func (url *UserRateLimiter) cleanup() { + for { + time.Sleep(url.cleanupInterval) + now := time.Now() + + url.rateLimiters.Range(func(key, value interface{}) bool { + rl := value.(*RateLimiter) + rl.mu.Lock() + if now.Sub(rl.lastUsed) > url.maxIdleTime { + url.rateLimiters.Delete(key) + } + rl.mu.Unlock() + return true + }) + } +} diff --git a/backend/pkg/spot/api/model.go b/backend/pkg/spot/api/model.go new file mode 100644 index 000000000..70da02bf7 --- /dev/null +++ b/backend/pkg/spot/api/model.go @@ -0,0 +1,75 @@ +package api + +import ( + "openreplay/backend/pkg/spot/service" + "time" +) + +type CreateSpotRequest struct { + Name string `json:"name"` + Comment string `json:"comment"` + Duration int `json:"duration"` + Crop []int `json:"crop"` + Preview string `json:"preview"` +} + +type CreateSpotResponse struct { + ID string `json:"id"` + MobURL string `json:"mobURL"` + VideoURL string `json:"videoURL"` +} + +type Info struct { + Name string `json:"name"` + UserEmail string `json:"userEmail"` + Duration int `json:"duration"` + Comments []service.Comment `json:"comments"` + CreatedAt time.Time `json:"createdAt"` + MobURL string `json:"mobURL"` + PreviewURL string `json:"previewURL"` + VideoURL string `json:"videoURL"` + StreamFile string `json:"streamFile"` +} + +type GetSpotResponse struct { + Spot *Info `json:"spot"` +} + +type GetSpotsRequest struct { + Query string `json:"query"` // for search by name (optional) + FilterBy string `json:"filterBy"` // "own", "all", "shared" + Order string `json:"order"` + Page uint64 `json:"page"` + Limit uint64 `json:"limit"` +} + +type ShortInfo struct { + ID string `json:"id"` + Name string `json:"name"` + UserEmail string `json:"userEmail"` + Duration int `json:"duration"` + CreatedAt time.Time `json:"createdAt"` + PreviewURL string `json:"previewURL"` +} + +type GetSpotsResponse struct { + Spots []ShortInfo `json:"spots"` + Total uint64 `json:"total"` +} + +type UpdateSpotRequest struct { + Name string `json:"name"` +} + +type AddCommentRequest struct { + UserName string `json:"userName"` + Comment string `json:"comment"` +} + +type DeleteSpotRequest struct { + SpotIDs []string `json:"spotIDs"` +} + +type UpdateSpotPublicKeyRequest struct { + Expiration uint64 `json:"expiration"` // in seconds +} diff --git a/backend/pkg/spot/api/permissions.go b/backend/pkg/spot/api/permissions.go new file mode 100644 index 000000000..f8392bb70 --- /dev/null +++ b/backend/pkg/spot/api/permissions.go @@ -0,0 +1,5 @@ +package api + +func getPermissions(urlPath string) []string { + return nil +} diff --git a/backend/pkg/spot/api/router.go b/backend/pkg/spot/api/router.go new file mode 100644 index 000000000..b28c8f54f --- /dev/null +++ b/backend/pkg/spot/api/router.go @@ -0,0 +1,205 @@ +package api + +import ( + "bytes" + "fmt" + "io" + "net/http" + "openreplay/backend/pkg/spot" + "openreplay/backend/pkg/spot/auth" + "sync" + "time" + + "github.com/docker/distribution/context" + "github.com/gorilla/mux" + + spotConfig "openreplay/backend/internal/config/spot" + "openreplay/backend/internal/http/util" + "openreplay/backend/pkg/logger" +) + +type Router struct { + log logger.Logger + cfg *spotConfig.Config + router *mux.Router + mutex *sync.RWMutex + services *spot.ServicesBuilder + limiter *UserRateLimiter +} + +func NewRouter(cfg *spotConfig.Config, log logger.Logger, services *spot.ServicesBuilder) (*Router, error) { + switch { + case cfg == nil: + return nil, fmt.Errorf("config is empty") + case services == nil: + return nil, fmt.Errorf("services is empty") + case log == nil: + return nil, fmt.Errorf("logger is empty") + } + e := &Router{ + log: log, + cfg: cfg, + mutex: &sync.RWMutex{}, + services: services, + limiter: NewUserRateLimiter(10, 30, 1*time.Minute, 5*time.Minute), + } + e.init() + return e, nil +} + +func (e *Router) init() { + e.router = mux.NewRouter() + + // Root route + e.router.HandleFunc("/", e.root) + + // Spot routes + e.router.HandleFunc("/v1/spots", e.createSpot).Methods("POST", "OPTIONS") + e.router.HandleFunc("/v1/spots/{id}", e.getSpot).Methods("GET", "OPTIONS") + e.router.HandleFunc("/v1/spots/{id}", e.updateSpot).Methods("PATCH", "OPTIONS") + e.router.HandleFunc("/v1/spots", e.getSpots).Methods("GET", "OPTIONS") + e.router.HandleFunc("/v1/spots", e.deleteSpots).Methods("DELETE", "OPTIONS") + e.router.HandleFunc("/v1/spots/{id}/comment", e.addComment).Methods("POST", "OPTIONS") + e.router.HandleFunc("/v1/spots/{id}/uploaded", e.uploadedSpot).Methods("POST", "OPTIONS") + e.router.HandleFunc("/v1/spots/{id}/video", e.getSpotVideo).Methods("GET", "OPTIONS") + e.router.HandleFunc("/v1/spots/{id}/public-key", e.getPublicKey).Methods("GET", "OPTIONS") + e.router.HandleFunc("/v1/spots/{id}/public-key", e.updatePublicKey).Methods("PATCH", "OPTIONS") + e.router.HandleFunc("/v1/spots/{id}/status", e.spotStatus).Methods("GET", "OPTIONS") + e.router.HandleFunc("/v1/ping", e.ping).Methods("GET", "OPTIONS") + + // CORS middleware + e.router.Use(e.corsMiddleware) + e.router.Use(e.authMiddleware) + e.router.Use(e.rateLimitMiddleware) + e.router.Use(e.actionMiddleware) +} + +func (e *Router) root(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) +} + +func (e *Router) ping(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) +} + +func (e *Router) corsMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if e.cfg.UseAccessControlHeaders { + // Prepare headers for preflight requests + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST,GET,PATCH,DELETE") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization,Content-Encoding") + } + if r.Method == http.MethodOptions { + w.Header().Set("Cache-Control", "max-age=86400") + w.WriteHeader(http.StatusOK) + return + } + r = r.WithContext(context.WithValues(r.Context(), map[string]interface{}{"httpMethod": r.Method, "url": util.SafeString(r.URL.Path)})) + + next.ServeHTTP(w, r) + }) +} + +func (e *Router) authMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/" { + next.ServeHTTP(w, r) + } + isExtension := false + pathTemplate, err := mux.CurrentRoute(r).GetPathTemplate() + if err != nil { + e.log.Error(r.Context(), "failed to get path template: %s", err) + } else { + if pathTemplate == "/v1/ping" || + (pathTemplate == "/v1/spots" && r.Method == "POST") || + (pathTemplate == "/v1/spots/{id}/uploaded" && r.Method == "POST") { + isExtension = true + } + } + + // Check if the request is authorized + user, err := e.services.Auth.IsAuthorized(r.Header.Get("Authorization"), getPermissions(r.URL.Path), isExtension) + if err != nil { + e.log.Warn(r.Context(), "Unauthorized request: %s", err) + if !isSpotWithKeyRequest(r) { + w.WriteHeader(http.StatusUnauthorized) + return + } + + user, err = e.services.Keys.IsValid(r.URL.Query().Get("key")) + if err != nil { + e.log.Warn(r.Context(), "Wrong public key: %s", err) + w.WriteHeader(http.StatusUnauthorized) + return + } + } + + r = r.WithContext(context.WithValues(r.Context(), map[string]interface{}{"userData": user})) + next.ServeHTTP(w, r) + }) +} + +func isSpotWithKeyRequest(r *http.Request) bool { + pathTemplate, err := mux.CurrentRoute(r).GetPathTemplate() + if err != nil { + return false + } + getSpotPrefix := "/v1/spots/{id}" // GET + addCommentPrefix := "/v1/spots/{id}/comment" // POST + if (pathTemplate == getSpotPrefix && r.Method == "GET") || (pathTemplate == addCommentPrefix && r.Method == "POST") { + return true + } + return false +} + +func (e *Router) rateLimitMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + user := r.Context().Value("userData").(*auth.User) + rl := e.limiter.GetRateLimiter(user.ID) + + if !rl.Allow() { + http.Error(w, "Too Many Requests", http.StatusTooManyRequests) + return + } + next.ServeHTTP(w, r) + }) +} + +type statusWriter struct { + http.ResponseWriter + statusCode int +} + +func (w *statusWriter) WriteHeader(statusCode int) { + w.statusCode = statusCode + w.ResponseWriter.WriteHeader(statusCode) +} + +func (w *statusWriter) Write(b []byte) (int, error) { + if w.statusCode == 0 { + w.statusCode = http.StatusOK // Default status code is 200 + } + return w.ResponseWriter.Write(b) +} + +func (e *Router) actionMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Read body and restore the io.ReadCloser to its original state + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "can't read body", http.StatusBadRequest) + return + } + r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + // Use custom response writer to get the status code + sw := &statusWriter{ResponseWriter: w} + // Serve the request + next.ServeHTTP(sw, r) + e.logRequest(r, bodyBytes, sw.statusCode) + }) +} + +func (e *Router) GetHandler() http.Handler { + return e.router +} diff --git a/backend/pkg/spot/api/tracer.go b/backend/pkg/spot/api/tracer.go new file mode 100644 index 000000000..14e006fe0 --- /dev/null +++ b/backend/pkg/spot/api/tracer.go @@ -0,0 +1,7 @@ +package api + +import ( + "net/http" +) + +func (e *Router) logRequest(r *http.Request, bodyBytes []byte, statusCode int) {} diff --git a/backend/pkg/spot/auth/auth.go b/backend/pkg/spot/auth/auth.go new file mode 100644 index 000000000..498e16e0a --- /dev/null +++ b/backend/pkg/spot/auth/auth.go @@ -0,0 +1,53 @@ +package auth + +import ( + "fmt" + "strings" + + "github.com/golang-jwt/jwt/v5" + + "openreplay/backend/pkg/db/postgres/pool" + "openreplay/backend/pkg/logger" +) + +type Auth interface { + IsAuthorized(authHeader string, permissions []string, isExtension bool) (*User, error) +} + +type authImpl struct { + log logger.Logger + secret string + spotSecret string + pgconn pool.Pool +} + +func NewAuth(log logger.Logger, jwtSecret, jwtSpotSecret string, conn pool.Pool) Auth { + return &authImpl{ + log: log, + secret: jwtSecret, + spotSecret: jwtSpotSecret, + pgconn: conn, + } +} + +func parseJWT(authHeader, secret string) (*JWTClaims, error) { + if authHeader == "" { + return nil, fmt.Errorf("authorization header missing") + } + tokenParts := strings.Split(authHeader, "Bearer ") + if len(tokenParts) != 2 { + return nil, fmt.Errorf("invalid authorization header") + } + tokenString := tokenParts[1] + + claims := &JWTClaims{} + token, err := jwt.ParseWithClaims(tokenString, claims, + func(token *jwt.Token) (interface{}, error) { + return []byte(secret), nil + }) + if err != nil || !token.Valid { + fmt.Printf("token err: %v\n", err) + return nil, fmt.Errorf("invalid token") + } + return claims, nil +} diff --git a/backend/pkg/spot/auth/authorizer.go b/backend/pkg/spot/auth/authorizer.go new file mode 100644 index 000000000..e60b5acc1 --- /dev/null +++ b/backend/pkg/spot/auth/authorizer.go @@ -0,0 +1,13 @@ +package auth + +func (a *authImpl) IsAuthorized(authHeader string, permissions []string, isExtension bool) (*User, error) { + secret := a.secret + if isExtension { + secret = a.spotSecret + } + jwtInfo, err := parseJWT(authHeader, secret) + if err != nil { + return nil, err + } + return authUser(a.pgconn, jwtInfo.UserId, jwtInfo.TenantID, int(jwtInfo.IssuedAt.Unix())) +} diff --git a/backend/pkg/spot/auth/model.go b/backend/pkg/spot/auth/model.go new file mode 100644 index 000000000..ef2f09d75 --- /dev/null +++ b/backend/pkg/spot/auth/model.go @@ -0,0 +1,34 @@ +package auth + +import "github.com/golang-jwt/jwt/v5" + +type JWTClaims struct { + UserId int `json:"userId"` + TenantID int `json:"tenantId"` + jwt.RegisteredClaims +} + +type User struct { + ID uint64 `json:"id"` + Name string `json:"name"` + Email string `json:"email"` + TenantID uint64 `json:"tenantId"` + JwtIat int `json:"jwtIat"` + Permissions map[string]bool `json:"permissions"` + AuthMethod string +} + +func (u *User) HasPermission(perm string) bool { + if u.Permissions == nil { + return true // no permissions + } + _, ok := u.Permissions[perm] + return ok +} + +func abs(x int) int { + if x < 0 { + return -x + } + return x +} diff --git a/backend/pkg/spot/auth/storage.go b/backend/pkg/spot/auth/storage.go new file mode 100644 index 000000000..b0f63e494 --- /dev/null +++ b/backend/pkg/spot/auth/storage.go @@ -0,0 +1,23 @@ +package auth + +import ( + "fmt" + "openreplay/backend/pkg/db/postgres/pool" +) + +func authUser(conn pool.Pool, userID, tenantID, jwtIAT int) (*User, error) { + sql := ` + SELECT user_id, name, email, EXTRACT(epoch FROM spot_jwt_iat)::BIGINT AS spot_jwt_iat + FROM public.users + WHERE user_id = $1 AND deleted_at IS NULL + LIMIT 1;` + + user := &User{TenantID: 1, AuthMethod: "jwt"} + if err := conn.QueryRow(sql, userID).Scan(&user.ID, &user.Name, &user.Email, &user.JwtIat); err != nil { + return nil, fmt.Errorf("user not found") + } + if user.JwtIat == 0 || abs(jwtIAT-user.JwtIat) > 1 { + return nil, fmt.Errorf("token expired") + } + return user, nil +} diff --git a/backend/pkg/spot/builder.go b/backend/pkg/spot/builder.go new file mode 100644 index 000000000..047318844 --- /dev/null +++ b/backend/pkg/spot/builder.go @@ -0,0 +1,39 @@ +package spot + +import ( + "openreplay/backend/internal/config/spot" + "openreplay/backend/pkg/db/postgres/pool" + "openreplay/backend/pkg/flakeid" + "openreplay/backend/pkg/logger" + "openreplay/backend/pkg/objectstorage" + "openreplay/backend/pkg/objectstorage/store" + "openreplay/backend/pkg/spot/auth" + "openreplay/backend/pkg/spot/service" + "openreplay/backend/pkg/spot/transcoder" +) + +type ServicesBuilder struct { + Flaker *flakeid.Flaker + ObjStorage objectstorage.ObjectStorage + Auth auth.Auth + Spots service.Spots + Keys service.Keys + Transcoder transcoder.Transcoder +} + +func NewServiceBuilder(log logger.Logger, cfg *spot.Config, pgconn pool.Pool) (*ServicesBuilder, error) { + objStore, err := store.NewStore(&cfg.ObjectsConfig) + if err != nil { + return nil, err + } + flaker := flakeid.NewFlaker(cfg.WorkerID) + spots := service.NewSpots(log, pgconn, flaker) + return &ServicesBuilder{ + Flaker: flaker, + ObjStorage: objStore, + Auth: auth.NewAuth(log, cfg.JWTSecret, cfg.JWTSpotSecret, pgconn), + Spots: spots, + Keys: service.NewKeys(log, pgconn), + Transcoder: transcoder.NewTranscoder(cfg, log, objStore, pgconn, spots), + }, nil +} diff --git a/backend/pkg/spot/service/public_key.go b/backend/pkg/spot/service/public_key.go new file mode 100644 index 000000000..255787296 --- /dev/null +++ b/backend/pkg/spot/service/public_key.go @@ -0,0 +1,146 @@ +package service + +import ( + "context" + "fmt" + "github.com/rs/xid" + "openreplay/backend/pkg/spot/auth" + "time" + + "openreplay/backend/pkg/db/postgres/pool" + "openreplay/backend/pkg/logger" +) + +type Key struct { + SpotID uint64 `json:"-"` + UserID uint64 `json:"-"` // to track who generated the key + TenantID uint64 `json:"-"` // to check availability + Value string `json:"value"` + Expiration uint64 `json:"expiration"` // in seconds + ExpiredAt time.Time `json:"-"` +} + +type Keys interface { + Set(spotID, expiration uint64, user *auth.User) (*Key, error) + Get(spotID uint64, user *auth.User) (*Key, error) + IsValid(key string) (*auth.User, error) +} + +type keysImpl struct { + log logger.Logger + conn pool.Pool +} + +func (k *keysImpl) Set(spotID, expiration uint64, user *auth.User) (*Key, error) { + switch { + case spotID == 0: + return nil, fmt.Errorf("spotID is required") + case expiration > 604800: + return nil, fmt.Errorf("expiration should be less than 7 days") + case user == nil: + return nil, fmt.Errorf("user is required") + } + now := time.Now() + if expiration == 0 { + sql := `UPDATE spots_keys SET expired_at = $1, expiration = 0 WHERE spot_id = $2` + if err := k.conn.Exec(sql, now, spotID); err != nil { + k.log.Error(context.Background(), "failed to set key: %v", err) + return nil, fmt.Errorf("key not updated") + } + return nil, nil + } + newKey := xid.New().String() + expiredAt := now.Add(time.Duration(expiration) * time.Second) + sql := ` + WITH updated AS ( + UPDATE spots_keys + SET + spot_key = CASE + WHEN expired_at < $1 THEN $2 + ELSE spot_key + END, + user_id = $3, + expiration = $4, + expired_at = $5, + updated_at = $1 + WHERE spot_id = $6 + RETURNING spot_key, expiration, expired_at + ), + + inserted AS ( + INSERT INTO spots_keys (spot_key, spot_id, user_id, tenant_id, expiration, created_at, expired_at) + SELECT $2, $6, $3, $7, $4, $1, $5 + WHERE NOT EXISTS (SELECT 1 FROM updated) + RETURNING spot_key, expiration, expired_at + ) + + SELECT spot_key, expiration, expired_at FROM updated + UNION ALL + SELECT spot_key, expiration, expired_at FROM inserted; + ` + key := &Key{} + if err := k.conn.QueryRow(sql, now, newKey, user.ID, expiration, expiredAt, spotID, user.TenantID). + Scan(&key.Value, &key.Expiration, &key.ExpiredAt); err != nil { + k.log.Error(context.Background(), "failed to set key: %v", err) + return nil, fmt.Errorf("key not updated") + } + return key, nil +} + +func (k *keysImpl) Get(spotID uint64, user *auth.User) (*Key, error) { + switch { + case spotID == 0: + return nil, fmt.Errorf("spotID is required") + case user == nil: + return nil, fmt.Errorf("user is required") + } + // + key := &Key{} + sql := `SELECT spot_key, expiration, expired_at FROM spots_keys WHERE spot_id = $1 AND tenant_id = $2` + if err := k.conn.QueryRow(sql, spotID, user.TenantID).Scan(&key.Value, &key.Expiration, &key.ExpiredAt); err != nil { + k.log.Error(context.Background(), "failed to get key: %v", err) + return nil, fmt.Errorf("key not found") + } + now := time.Now() + if key.ExpiredAt.Before(now) { + return nil, fmt.Errorf("key is expired") + } + key.Expiration = uint64(key.ExpiredAt.Sub(now).Seconds()) + return key, nil +} + +func (k *keysImpl) IsValid(key string) (*auth.User, error) { + if key == "" { + return nil, fmt.Errorf("key is required") + } + var ( + userID uint64 + expiredAt time.Time + ) + // Get userID if key is valid + sql := `SELECT user_id, expired_at FROM spots_keys WHERE spot_key = $1` + if err := k.conn.QueryRow(sql, key).Scan(&userID, &expiredAt); err != nil { + k.log.Error(context.Background(), "failed to get key: %v", err) + return nil, fmt.Errorf("key not found") + } + now := time.Now() + if expiredAt.Before(now) { + return nil, fmt.Errorf("key is expired") + } + // Get user info by userID + user := &auth.User{ID: userID, AuthMethod: "public-key"} + // We don't need tenantID here + sql = `SELECT 1, name, email FROM public.users WHERE user_id = $1 AND deleted_at IS NULL LIMIT 1` + if err := k.conn.QueryRow(sql, userID).Scan(&user.TenantID, &user.Name, &user.Email); err != nil { + k.log.Error(context.Background(), "failed to get user: %v", err) + return nil, fmt.Errorf("user not found") + } + return user, nil +} + +func NewKeys(log logger.Logger, conn pool.Pool) Keys { + return &keysImpl{ + log: log, + conn: conn, + } +} diff --git a/backend/pkg/spot/service/spot.go b/backend/pkg/spot/service/spot.go new file mode 100644 index 000000000..c9ee133cb --- /dev/null +++ b/backend/pkg/spot/service/spot.go @@ -0,0 +1,366 @@ +package service + +import ( + "context" + "encoding/json" + "fmt" + "openreplay/backend/pkg/db/postgres/pool" + "openreplay/backend/pkg/flakeid" + "openreplay/backend/pkg/logger" + "openreplay/backend/pkg/spot/auth" + "time" +) + +const MaxCommentLength = 120 +const MaxNumberOfComments = 20 + +type Spot struct { + ID uint64 `json:"id"` + Name string `json:"name"` + UserID uint64 `json:"userID"` + UserEmail string `json:"userEmail"` + TenantID uint64 `json:"tenantID"` + Duration int `json:"duration"` + Crop []int `json:"crop"` + Comments []Comment `json:"comments"` + CreatedAt time.Time `json:"createdAt"` +} + +type Comment struct { + UserName string `json:"user"` + Text string `json:"text"` + CreatedAt time.Time `json:"createdAt"` +} + +type GetOpts struct { + SpotID uint64 // grab particular spot by ID + UserID uint64 // for filtering by user + TenantID uint64 // for filtering by all users in tenant + NameFilter string // for filtering by name (substring) + Order string // sorting ("asc" or "desc") + Limit uint64 // pagination (limit for page) + Offset uint64 // pagination (offset for page) + Page uint64 +} + +type spotsImpl struct { + log logger.Logger + pgconn pool.Pool + flaker *flakeid.Flaker +} + +type Update struct { + ID uint64 `json:"id"` + NewName string `json:"newName"` + NewComment *Comment `json:"newComment"` +} + +type Spots interface { + Add(user *auth.User, name, comment string, duration int, crop []int) (*Spot, error) + GetByID(user *auth.User, spotID uint64) (*Spot, error) + Get(user *auth.User, opts *GetOpts) ([]*Spot, uint64, error) + UpdateName(user *auth.User, spotID uint64, newName string) (*Spot, error) + AddComment(user *auth.User, spotID uint64, comment *Comment) (*Spot, error) + Delete(user *auth.User, spotIds []uint64) error + SetStatus(spotID uint64, status string) error + GetStatus(user *auth.User, spotID uint64) (string, error) +} + +func NewSpots(log logger.Logger, pgconn pool.Pool, flaker *flakeid.Flaker) Spots { + return &spotsImpl{ + log: log, + pgconn: pgconn, + flaker: flaker, + } +} + +func (s *spotsImpl) Add(user *auth.User, name, comment string, duration int, crop []int) (*Spot, error) { + switch { + case user == nil: + return nil, fmt.Errorf("user is required") + case name == "": + return nil, fmt.Errorf("name is required") + case duration <= 0: + return nil, fmt.Errorf("duration should be greater than 0") + } + + createdAt := time.Now() + spotID, err := s.flaker.Compose(uint64(createdAt.UnixMilli())) + if err != nil { + return nil, err + } + newSpot := &Spot{ + ID: spotID, + Name: name, + UserID: user.ID, + UserEmail: user.Email, + TenantID: user.TenantID, + Duration: duration, + Crop: crop, + CreatedAt: createdAt, + } + if comment != "" { + newSpot.Comments = append(newSpot.Comments, Comment{ + UserName: user.Name, + Text: comment, + CreatedAt: createdAt, + }) + } + if err = s.add(newSpot); err != nil { + return nil, err + } + return newSpot, nil +} + +func (s *spotsImpl) encodeComment(comment *Comment) string { + encodedComment, err := json.Marshal(comment) + if err != nil { + s.log.Warn(context.Background(), "failed to encode comment: %v, err: %s", comment, err) + return "" + } + return string(encodedComment) +} + +func (s *spotsImpl) add(spot *Spot) error { + sql := `INSERT INTO spots (spot_id, name, user_id, user_email, tenant_id, duration, crop, comments, status, created_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)` + var comments []string + for _, comment := range spot.Comments { + if encodedComment := s.encodeComment(&comment); encodedComment != "" { + comments = append(comments, encodedComment) + } + } + err := s.pgconn.Exec(sql, spot.ID, spot.Name, spot.UserID, spot.UserEmail, spot.TenantID, spot.Duration, spot.Crop, + comments, "pending", spot.CreatedAt) + if err != nil { + return err + } + return nil +} + +func (s *spotsImpl) GetByID(user *auth.User, spotID uint64) (*Spot, error) { + switch { + case user == nil: + return nil, fmt.Errorf("user is required") + case spotID == 0: + return nil, fmt.Errorf("spot id is required") + } + return s.getByID(spotID, user) +} + +func (s *spotsImpl) getByID(spotID uint64, user *auth.User) (*Spot, error) { + sql := `SELECT name, user_email, duration, crop, comments, created_at FROM spots + WHERE spot_id = $1 AND tenant_id = $2 AND deleted_at IS NULL` + spot := &Spot{ID: spotID} + var comments []string + err := s.pgconn.QueryRow(sql, spotID, user.TenantID).Scan(&spot.Name, &spot.UserEmail, &spot.Duration, &spot.Crop, + &comments, &spot.CreatedAt) + if err != nil { + return nil, err + } + for _, comment := range comments { + var decodedComment Comment + if err = json.Unmarshal([]byte(comment), &decodedComment); err != nil { + s.log.Warn(context.Background(), "failed to decode comment: %s", err) + continue + } + spot.Comments = append(spot.Comments, decodedComment) + + } + return spot, nil +} + +func (s *spotsImpl) Get(user *auth.User, opts *GetOpts) ([]*Spot, uint64, error) { + switch { + case user == nil: + return nil, 0, fmt.Errorf("user is required") + case opts == nil: + return nil, 0, fmt.Errorf("get options are required") + case user.TenantID == 0: // Tenant ID is required even for public get functions + return nil, 0, fmt.Errorf("tenant id is required") + } + + // Show the latest spots first by default + if opts.Order != "asc" && opts.Order != "desc" { + opts.Order = "desc" + } + if opts.Limit <= 0 || opts.Limit > 10 { + opts.Limit = 9 + } + if opts.Page < 1 { + opts.Page = 1 + } + opts.Offset = (opts.Page - 1) * opts.Limit + return s.getAll(user, opts) +} + +func (s *spotsImpl) getAll(user *auth.User, opts *GetOpts) ([]*Spot, uint64, error) { + sql := `SELECT COUNT(1) OVER () AS total, spot_id, name, user_email, duration, created_at FROM spots + WHERE tenant_id = $1 AND deleted_at IS NULL` + args := []interface{}{user.TenantID} + if opts.UserID != 0 { + sql += ` AND user_id = ` + fmt.Sprintf("$%d", len(args)+1) + args = append(args, opts.UserID) + } + if opts.NameFilter != "" { + sql += ` AND name ILIKE ` + fmt.Sprintf("$%d", len(args)+1) + args = append(args, "%"+opts.NameFilter+"%") + } + if opts.Order != "" { + sql += ` ORDER BY created_at ` + opts.Order + } + if opts.Limit != 0 { + sql += ` LIMIT ` + fmt.Sprintf("$%d", len(args)+1) + args = append(args, opts.Limit) + } + if opts.Offset != 0 { + sql += ` OFFSET ` + fmt.Sprintf("$%d", len(args)+1) + args = append(args, opts.Offset) + } + //s.log.Info(context.Background(), "sql: %s, args: %v", sql, args) + rows, err := s.pgconn.Query(sql, args...) + if err != nil { + return nil, 0, err + } + defer rows.Close() + + var total uint64 + var spots []*Spot + for rows.Next() { + spot := &Spot{} + if err = rows.Scan(&total, &spot.ID, &spot.Name, &spot.UserEmail, &spot.Duration, &spot.CreatedAt); err != nil { + return nil, 0, err + } + spots = append(spots, spot) + } + return spots, total, nil +} + +func (s *spotsImpl) UpdateName(user *auth.User, spotID uint64, newName string) (*Spot, error) { + switch { + case user == nil: + return nil, fmt.Errorf("user is required") + case spotID == 0: + return nil, fmt.Errorf("spot id is required") + case newName == "": + return nil, fmt.Errorf("new name is required") + } + return s.updateName(spotID, newName, user) +} + +func (s *spotsImpl) updateName(spotID uint64, newName string, user *auth.User) (*Spot, error) { + sql := `WITH updated AS ( + UPDATE spots SET name = $1, updated_at = $2 + WHERE spot_id = $3 AND tenant_id = $4 AND deleted_at IS NULL RETURNING *) + SELECT COUNT(*) FROM updated` + updated := 0 + if err := s.pgconn.QueryRow(sql, newName, time.Now(), spotID, user.TenantID).Scan(&updated); err != nil { + return nil, err + } + if updated == 0 { + return nil, fmt.Errorf("not allowed to update name") + } + return &Spot{ID: spotID, Name: newName}, nil +} + +func (s *spotsImpl) AddComment(user *auth.User, spotID uint64, comment *Comment) (*Spot, error) { + switch { + case user == nil: + return nil, fmt.Errorf("user is required") + case spotID == 0: + return nil, fmt.Errorf("spot id is required") + case comment == nil: + return nil, fmt.Errorf("comment is required") + case comment.UserName == "": + return nil, fmt.Errorf("user name is required") + case comment.Text == "": + return nil, fmt.Errorf("comment text is required") + } + if len(comment.Text) > MaxCommentLength { + comment.Text = comment.Text[:MaxCommentLength] + } + comment.CreatedAt = time.Now() + return s.addComment(spotID, comment, user) +} + +func (s *spotsImpl) addComment(spotID uint64, newComment *Comment, user *auth.User) (*Spot, error) { + sql := `WITH updated AS ( + UPDATE spots + SET comments = array_append(comments, $1), updated_at = $2 + WHERE spot_id = $3 AND tenant_id = $4 AND deleted_at IS NULL AND array_length(comments, 1) < $5 + RETURNING *) + SELECT COUNT(*) FROM updated` + encodedComment := s.encodeComment(newComment) + if encodedComment == "" { + return nil, fmt.Errorf("failed to encode comment") + } + updated := 0 + if err := s.pgconn.QueryRow(sql, encodedComment, time.Now(), spotID, user.TenantID, MaxNumberOfComments).Scan(&updated); err != nil { + return nil, err + } + if updated == 0 { + return nil, fmt.Errorf("not allowed to add comment") + } + return &Spot{ID: spotID}, nil +} + +func (s *spotsImpl) Delete(user *auth.User, spotIds []uint64) error { + switch { + case user == nil: + return fmt.Errorf("user is required") + case len(spotIds) == 0: + return fmt.Errorf("spot ids are required") + } + return s.deleteSpots(spotIds, user) +} + +func (s *spotsImpl) deleteSpots(spotIds []uint64, user *auth.User) error { + sql := `WITH updated AS (UPDATE spots SET deleted_at = NOW() WHERE tenant_id = $1 AND spot_id IN (` + args := []interface{}{user.TenantID} + for i, spotID := range spotIds { + sql += fmt.Sprintf("$%d,", i+2) + args = append(args, spotID) + } + sql = sql[:len(sql)-1] + `) RETURNING *) SELECT COUNT(*) FROM updated` + count := 0 + if err := s.pgconn.QueryRow(sql, args...).Scan(&count); err != nil { + return err + } + if count == 0 { + return fmt.Errorf("not allowed to delete spots") + } + if count != len(spotIds) { + s.log.Warn(context.Background(), "deleted %d spots, but expected to delete %d", count, len(spotIds)) + return fmt.Errorf("failed to delete all requested spots") + } + return nil +} + +func (s *spotsImpl) SetStatus(spotID uint64, status string) error { + switch { + case spotID == 0: + return fmt.Errorf("spot id is required") + case status == "": + return fmt.Errorf("status is required") + } + sql := `UPDATE spots SET status = $1, updated_at = $2 WHERE spot_id = $3 AND deleted_at IS NULL` + if err := s.pgconn.Exec(sql, status, time.Now(), spotID); err != nil { + return err + } + return nil +} + +func (s *spotsImpl) GetStatus(user *auth.User, spotID uint64) (string, error) { + switch { + case user == nil: + return "", fmt.Errorf("user is required") + case spotID == 0: + return "", fmt.Errorf("spot id is required") + } + sql := `SELECT status FROM spots WHERE spot_id = $1 AND tenant_id = $2 AND deleted_at IS NULL` + var status string + if err := s.pgconn.QueryRow(sql, spotID, user.TenantID).Scan(&status); err != nil { + return "", err + } + return status, nil +} diff --git a/backend/pkg/spot/transcoder/streams.go b/backend/pkg/spot/transcoder/streams.go new file mode 100644 index 000000000..f220cf0e8 --- /dev/null +++ b/backend/pkg/spot/transcoder/streams.go @@ -0,0 +1,106 @@ +package transcoder + +import ( + "context" + "fmt" + "strings" + "time" + + "openreplay/backend/pkg/db/postgres/pool" + "openreplay/backend/pkg/logger" + "openreplay/backend/pkg/objectstorage" +) + +type Streams interface { + Add(spotID uint64, originalStream string) error + Get(spotID uint64) ([]byte, error) +} + +type streamsImpl struct { + log logger.Logger + conn pool.Pool + storage objectstorage.ObjectStorage +} + +func (s *streamsImpl) Add(spotID uint64, originalStream string) error { + lines := strings.Split(originalStream, "\n") + + // Replace indexN.ts with pre-signed URLs + for i, line := range lines { + if strings.HasPrefix(line, "index") && strings.HasSuffix(line, ".ts") { + key := fmt.Sprintf("%d/%s", spotID, line) + presignedURL, err := s.storage.GetPreSignedDownloadUrl(key) + if err != nil { + fmt.Println("Error generating pre-signed URL:", err) + return err + } + lines[i] = presignedURL + } + } + + modifiedContent := strings.Join(lines, "\n") + now := time.Now() + // Insert playlist to DB + sql := `INSERT INTO spots_streams (spot_id, original_playlist, modified_playlist, created_at, expired_at) + VALUES ($1, $2, $3, $4, $5) ON CONFLICT (spot_id) DO UPDATE SET original_playlist = $2, modified_playlist = $3, + created_at = $4, expired_at = $5` + if err := s.conn.Exec(sql, spotID, originalStream, modifiedContent, now, now.Add(10*time.Minute)); err != nil { + fmt.Println("Error inserting playlist to DB:", err) + return err + } + return nil +} + +func (s *streamsImpl) Get(spotID uint64) ([]byte, error) { + // Get modified playlist from DB + sql := ` + SELECT + CASE + WHEN expired_at > $2 THEN modified_playlist + ELSE original_playlist + END AS playlist, + CASE + WHEN expired_at > $2 THEN 'modified' + ELSE 'original' + END AS playlist_type + FROM spots_streams + WHERE spot_id = $1` + var playlist, flag string + if err := s.conn.QueryRow(sql, spotID, time.Now()).Scan(&playlist, &flag); err != nil { + s.log.Error(context.Background(), "Error getting spot stream playlist: %v", err) + return []byte(""), err + } + if flag == "modified" { + return []byte(playlist), nil + } + + // Have to generate a new modified playlist with updated pre-signed URLs for chunks + lines := strings.Split(playlist, "\n") + for i, line := range lines { + if strings.HasPrefix(line, "index") && strings.HasSuffix(line, ".ts") { + key := fmt.Sprintf("%d/%s", spotID, line) + presignedURL, err := s.storage.GetPreSignedDownloadUrl(key) + if err != nil { + s.log.Error(context.Background(), "Error generating pre-signed URL: %v", err) + return []byte(""), err + } + lines[i] = presignedURL + } + } + modifiedPlaylist := strings.Join(lines, "\n") + + // Save modified playlist to DB + sql = `UPDATE spots_streams SET modified_playlist = $1, expired_at = $2 WHERE spot_id = $3` + if err := s.conn.Exec(sql, modifiedPlaylist, time.Now().Add(10*time.Minute), spotID); err != nil { + s.log.Warn(context.Background(), "Error updating modified playlist: %v", err) + } + return []byte(modifiedPlaylist), nil +} + +func NewStreams(log logger.Logger, conn pool.Pool, storage objectstorage.ObjectStorage) Streams { + return &streamsImpl{ + log: log, + conn: conn, + storage: storage, + } +} diff --git a/backend/pkg/spot/transcoder/tasks.go b/backend/pkg/spot/transcoder/tasks.go new file mode 100644 index 000000000..4756fc66a --- /dev/null +++ b/backend/pkg/spot/transcoder/tasks.go @@ -0,0 +1,100 @@ +package transcoder + +import ( + "errors" + "time" + + "github.com/jackc/pgx/v4" + "openreplay/backend/pkg/db/postgres/pool" +) + +type Tasks interface { + Add(spotID uint64, crop []int, duration int) error + Get() (*Task, error) + Done(task *Task) error + Failed(task *Task, taskErr error) error +} + +type tasksImpl struct { + conn pool.Pool +} + +func NewTasks(conn pool.Pool) Tasks { + return &tasksImpl{conn: conn} +} + +type Task struct { + SpotID uint64 + Crop []int + Duration int + Status string + Path string + tx pool.Tx +} + +func (t *Task) HasToTrim() bool { + return t.Crop != nil && len(t.Crop) == 2 +} + +func (t *Task) HasToTranscode() bool { + return t.Duration > 15000 +} + +func (t *tasksImpl) Add(spotID uint64, crop []int, duration int) error { + sql := `INSERT INTO spot_tasks (id, crop, duration, status, added_time) VALUES ($1, $2, $3, $4, $5)` + if err := t.conn.Exec(sql, spotID, crop, duration, "pending", time.Now()); err != nil { + return err + } + return nil +} + +type NoTasksError struct{} + +func (NoTasksError) Error() string { + return "no tasks" +} + +func (t *tasksImpl) Get() (task *Task, err error) { + tx, err := t.conn.Begin() + if err != nil { + return nil, err + } + defer func() { + if err != nil { + tx.TxRollback() + } + }() + + task = &Task{tx: pool.Tx{Tx: tx}} + sql := `SELECT id, crop, duration FROM spots_tasks WHERE status = 'pending' ORDER BY added_time FOR UPDATE SKIP LOCKED LIMIT 1` + err = tx.TxQueryRow(sql).Scan(&task.SpotID, &task.Crop, &task.Duration) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, NoTasksError{} + } + return nil, err + } + return task, nil +} + +func (t *tasksImpl) Done(task *Task) error { + sql := `DELETE FROM spots_tasks WHERE id = $1` + err := task.tx.TxExec(sql, task.SpotID) + if err != nil { + task.tx.TxRollback() + return err + } + + return task.tx.TxCommit() +} + +func (t *tasksImpl) Failed(task *Task, taskErr error) error { + sql := `UPDATE spots_tasks SET status = 'failed', error = $2 WHERE id = $1` + err := task.tx.TxExec(sql, task.SpotID, taskErr.Error()) + if err != nil { + task.tx.TxRollback() + return err + } + + return task.tx.TxCommit() +} diff --git a/backend/pkg/spot/transcoder/transcoder.go b/backend/pkg/spot/transcoder/transcoder.go new file mode 100644 index 000000000..f1008b8c5 --- /dev/null +++ b/backend/pkg/spot/transcoder/transcoder.go @@ -0,0 +1,349 @@ +package transcoder + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io" + "os" + "os/exec" + "strconv" + "strings" + "time" + + "openreplay/backend/internal/config/spot" + "openreplay/backend/pkg/db/postgres/pool" + "openreplay/backend/pkg/logger" + metrics "openreplay/backend/pkg/metrics/spot" + "openreplay/backend/pkg/objectstorage" + workers "openreplay/backend/pkg/pool" + "openreplay/backend/pkg/spot/service" +) + +type Transcoder interface { + Process(spot *service.Spot) error + GetSpotStreamPlaylist(spotID uint64) ([]byte, error) + Close() +} + +type transcoderImpl struct { + cfg *spot.Config + log logger.Logger + close chan interface{} + objStorage objectstorage.ObjectStorage + conn pool.Pool + tasks Tasks + streams Streams + spots service.Spots + prepareWorkers workers.WorkerPool + transcodeWorkers workers.WorkerPool +} + +func NewTranscoder(cfg *spot.Config, log logger.Logger, objStorage objectstorage.ObjectStorage, conn pool.Pool, spots service.Spots) Transcoder { + tnsc := &transcoderImpl{ + cfg: cfg, + log: log, + close: make(chan interface{}, 1), + objStorage: objStorage, + conn: conn, + tasks: NewTasks(conn), + streams: NewStreams(log, conn, objStorage), + spots: spots, + } + tnsc.prepareWorkers = workers.NewPool(2, 4, tnsc.prepare) + tnsc.transcodeWorkers = workers.NewPool(2, 4, tnsc.transcode) + go tnsc.mainLoop() + return tnsc +} + +func (t *transcoderImpl) Process(spot *service.Spot) error { + if spot.Crop == nil && spot.Duration < t.cfg.MinimumStreamDuration { + // Skip this spot and set processed status + t.log.Info(context.Background(), "Spot video %+v is too short for transcoding and without crop values", spot) + if err := t.spots.SetStatus(spot.ID, "processed"); err != nil { + t.log.Error(context.Background(), "Error updating spot status: %v", err) + } + return nil + } + return t.tasks.Add(spot.ID, spot.Crop, spot.Duration) +} + +func (t *transcoderImpl) mainLoop() { + for { + select { + case closeEvent := <-t.close: + t.log.Info(context.Background(), "Transcoder is closing: %v", closeEvent) + return + default: + task, err := t.tasks.Get() + if err != nil { + if errors.Is(err, NoTasksError{}) { + time.Sleep(1 * time.Second) + } else { + t.log.Error(context.Background(), "Error getting task: %v", err) + } + continue + } + t.process(task) + } + } +} + +func (t *transcoderImpl) failedTask(task *Task, err error) { + t.log.Error(context.Background(), "Task failed: %v", err) + if err := t.tasks.Failed(task, err); err != nil { + t.log.Error(context.Background(), "Error marking task as failed: %v", err) + } + if err := os.RemoveAll(task.Path); err != nil { + t.log.Error(context.Background(), "Error removing directory: %v", err) + } +} + +func (t *transcoderImpl) doneTask(task *Task) { + if err := t.tasks.Done(task); err != nil { + t.log.Error(context.Background(), "Error marking task as done: %v", err) + } + if err := os.RemoveAll(task.Path); err != nil { + t.log.Error(context.Background(), "Error removing directory: %v", err) + } +} + +func (t *transcoderImpl) process(task *Task) { + metrics.IncreaseVideosTotal() + //spotID := task.SpotID + t.log.Info(context.Background(), "Processing spot %s", task.SpotID) + + // Prepare path for spot video + path := t.cfg.FSDir + "/" + if t.cfg.SpotsDir != "" { + path += t.cfg.SpotsDir + "/" + } + task.Path = path + strconv.FormatUint(task.SpotID, 10) + "/" + + t.prepareWorkers.Submit(task) +} + +// Download original video, crop if needed (and upload cropped). +func (t *transcoderImpl) prepare(payload interface{}) { + task := payload.(*Task) + + // Download video from S3 + if err := t.downloadSpotVideo(task.SpotID, task.Path); err != nil { + t.failedTask(task, fmt.Errorf("can't download video, spot: %d, err: %s", task.SpotID, err.Error())) + return + } + + if task.HasToTrim() { + if err := t.cropSpotVideo(task.SpotID, task.Crop, task.Path); err != nil { + t.failedTask(task, fmt.Errorf("can't crop video, spot: %d, err: %s", task.SpotID, err.Error())) + return + } + } + + if !task.HasToTranscode() { + t.log.Info(context.Background(), "Spot video %d is too short for transcoding", task.SpotID) + t.doneTask(task) + } else { + t.transcodeWorkers.Submit(task) + } +} + +// Transcode video, upload to S3, save playlist to DB, delete local files. +func (t *transcoderImpl) transcode(payload interface{}) { + task := payload.(*Task) + + // Transcode spot video to HLS format + streamPlaylist, err := t.transcodeSpotVideo(task.SpotID, task.Path) + if err != nil { + t.failedTask(task, fmt.Errorf("can't transcode video, spot: %d, err: %s", task.SpotID, err.Error())) + return + } + + // Save stream playlist to DB + if err := t.streams.Add(task.SpotID, streamPlaylist); err != nil { + t.failedTask(task, fmt.Errorf("can't insert playlist to DB, spot: %d, err: %s", task.SpotID, err.Error())) + return + } + + if err := t.spots.SetStatus(task.SpotID, "processed"); err != nil { + t.log.Error(context.Background(), "Error updating spot status: %v", err) + } + t.doneTask(task) + + t.log.Info(context.Background(), "Transcoded spot %d, have to upload chunks to S3", task.SpotID) +} + +func (t *transcoderImpl) downloadSpotVideo(spotID uint64, path string) error { + start := time.Now() + + // Ensure the directory exists + if err := os.MkdirAll(path, 0755); err != nil { + t.log.Fatal(context.Background(), "Error creating directories: %v", err) + } + + video, err := t.objStorage.Get(fmt.Sprintf("%d/video.webm", spotID)) + if err != nil { + return err + } + defer video.Close() + + // Save file to disk + originVideo, err := os.Create(path + "origin.webm") + if err != nil { + return fmt.Errorf("can't create file: %s", err.Error()) + } + if _, err := io.Copy(originVideo, video); err != nil { + return fmt.Errorf("can't copy file: %s", err.Error()) + } + if fileInfo, err := originVideo.Stat(); err != nil { + t.log.Error(context.Background(), "Failed to get file info: %v", err) + } else { + metrics.RecordOriginalVideoSize(float64(fileInfo.Size())) + } + originVideo.Close() + + metrics.RecordOriginalVideoDownloadDuration(time.Since(start).Seconds()) + + t.log.Info(context.Background(), "Saved origin video to disk, spot: %d in %v sec", spotID, time.Since(start).Seconds()) + return nil +} + +func (t *transcoderImpl) cropSpotVideo(spotID uint64, crop []int, path string) error { + // Crop video + // ffmpeg -i input.webm -ss 5 -to 20 -c copy output.webm + + start := time.Now() + cmd := exec.Command("ffmpeg", "-i", path+"origin.webm", + "-ss", fmt.Sprintf("%.2f", float64(crop[0])/1000.0), + "-to", fmt.Sprintf("%.2f", float64(crop[1])/1000.0), + "-c", "copy", path+"cropped.mp4") + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + err := cmd.Run() + if err != nil { + return fmt.Errorf("failed to execute command: %v, stderr: %v", err, stderr.String()) + } + metrics.IncreaseVideosCropped() + metrics.RecordCroppingDuration(time.Since(start).Seconds()) + + t.log.Info(context.Background(), "Cropped spot %d in %v", spotID, time.Since(start).Seconds()) + + // mv cropped.webm origin.webm + err = os.Rename(path+"cropped.mp4", path+"origin.webm") + + // upload cropped video back to s3 + start = time.Now() + video, err := os.Open(path + "origin.webm") + if err != nil { + return fmt.Errorf("failed to open cropped video: %v", err) + } + defer video.Close() + + if fileInfo, err := video.Stat(); err != nil { + t.log.Error(context.Background(), "Failed to get file info: %v", err) + } else { + metrics.RecordCroppedVideoSize(float64(fileInfo.Size())) + } + + err = t.objStorage.Upload(video, fmt.Sprintf("%d/video.webm", spotID), "video/webm", objectstorage.NoCompression) + if err != nil { + return fmt.Errorf("failed to upload cropped video: %v", err) + } + + metrics.RecordCroppedVideoUploadDuration(time.Since(start).Seconds()) + + t.log.Info(context.Background(), "Uploaded cropped spot %d in %v", spotID, time.Since(start).Seconds()) + return nil +} + +func (t *transcoderImpl) transcodeSpotVideo(spotID uint64, path string) (string, error) { + // Transcode video tp HLS format + // ffmpeg -i origin.webm -c:v copy -c:a aac -b:a 128k -start_number 0 -hls_time 10 -hls_list_size 0 -f hls index.m3u8 + + start := time.Now() + videoPath := path + "origin.webm" + playlistPath := path + "index.m3u8" + cmd := exec.Command("ffmpeg", "-i", videoPath, "-c:v", "copy", "-c:a", "aac", "-b:a", "96k", + "-start_number", "0", "-hls_time", "10", "-hls_list_size", "0", "-f", "hls", playlistPath) + + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + err := cmd.Run() + if err != nil { + t.log.Error(context.Background(), "Failed to execute command: %v, stderr: %v", err, stderr.String()) + return "", err + } + metrics.IncreaseVideosTranscoded() + metrics.RecordTranscodingDuration(time.Since(start).Seconds()) + t.log.Info(context.Background(), "Transcoded spot %d in %v", spotID, time.Since(start).Seconds()) + + start = time.Now() + // Read the M3U8 file + file, err := os.Open(playlistPath) + if err != nil { + fmt.Println("Error opening file:", err) + return "", err + } + defer file.Close() + + var originalLines []string + var lines []string + var chunks []string + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + lines = append(lines, line) + if strings.HasPrefix(line, "index") && strings.HasSuffix(line, ".ts") { + chunks = append(chunks, line) + } + originalLines = append(originalLines, line) + } + if err := scanner.Err(); err != nil { + fmt.Println("Error reading file:", err) + return "", err + } + + // Insert stream chunks to s3 + for _, chunk := range chunks { + chunkPath := path + chunk + chunkFile, err := os.Open(chunkPath) + if err != nil { + fmt.Println("Error opening file:", err) + return "", err + } + defer chunkFile.Close() + + key := fmt.Sprintf("%d/%s", spotID, chunk) + err = t.objStorage.Upload(chunkFile, key, "video/mp2t", objectstorage.NoCompression) + if err != nil { + fmt.Println("Error uploading file:", err) + return "", err + } + } + metrics.RecordTranscodedVideoUploadDuration(time.Since(start).Seconds()) + + t.log.Info(context.Background(), "Uploaded chunks for spot %d in %v", spotID, time.Since(start).Seconds()) + return strings.Join(lines, "\n"), nil +} + +func (t *transcoderImpl) GetSpotStreamPlaylist(spotID uint64) ([]byte, error) { + return t.streams.Get(spotID) +} + +func (t *transcoderImpl) Wait() { + t.prepareWorkers.Pause() + t.transcodeWorkers.Pause() +} + +func (t *transcoderImpl) Close() { + t.close <- nil + t.prepareWorkers.Stop() + t.transcodeWorkers.Stop() +} diff --git a/ee/backend/pkg/objectstorage/azure/azure.go b/ee/backend/pkg/objectstorage/azure/azure.go index 642df1c97..0dab51388 100644 --- a/ee/backend/pkg/objectstorage/azure/azure.go +++ b/ee/backend/pkg/objectstorage/azure/azure.go @@ -140,6 +140,10 @@ func (s *storageImpl) GetPreSignedUploadUrl(key string) (string, error) { return sasURL, nil } +func (s *storageImpl) GetPreSignedDownloadUrl(key string) (string, error) { + return "", errors.New("not implemented") +} + func loadFileTag() map[string]string { // Load file tag from env key := "retention" diff --git a/ee/backend/pkg/spot/api/permissions.go b/ee/backend/pkg/spot/api/permissions.go new file mode 100644 index 000000000..1da671bf5 --- /dev/null +++ b/ee/backend/pkg/spot/api/permissions.go @@ -0,0 +1,11 @@ +package api + +import "strings" + +func getPermissions(urlPath string) []string { + res := []string{"SPOT"} + if strings.Contains(urlPath, "public-key") { + res = append(res, "SPOT_PUBLIC") + } + return res +} diff --git a/ee/backend/pkg/spot/api/tracer.go b/ee/backend/pkg/spot/api/tracer.go new file mode 100644 index 000000000..899327b78 --- /dev/null +++ b/ee/backend/pkg/spot/api/tracer.go @@ -0,0 +1,57 @@ +package api + +import ( + "encoding/json" + "net/http" + + "github.com/gorilla/mux" + + "openreplay/backend/pkg/spot/auth" + "openreplay/backend/pkg/spot/service" +) + +var routeMatch = map[string]string{ + "POST" + "/v1/spots": "createSpot", + "GET" + "/v1/spots/{id}": "getSpot", + "PATCH" + "/v1/spots/{id}": "updateSpot", + "GET" + "/v1/spots": "getSpots", + "DELETE" + "/v1/spots": "deleteSpots", + "POST" + "/v1/spots/{id}/comment": "addComment", + "GET" + "/v1/spots/{id}/video": "getSpotVideo", + "PATCH" + "/v1/spots/{id}/public-key": "updatePublicKey", +} + +func (e *Router) logRequest(r *http.Request, bodyBytes []byte, statusCode int) { + pathTemplate, err := mux.CurrentRoute(r).GetPathTemplate() + if err != nil { + e.log.Error(r.Context(), "failed to get path template: %s", err) + } + e.log.Info(r.Context(), "path template: %s", pathTemplate) + // Convert the parameters to json + query := r.URL.Query() + params := make(map[string]interface{}) + for key, values := range query { + if len(values) > 1 { + params[key] = values + } else { + params[key] = values[0] + } + } + jsonData, err := json.Marshal(params) + if err != nil { + e.log.Error(r.Context(), "failed to marshal query parameters: %s", err) + } + requestData := &service.RequestData{ + Action: routeMatch[r.Method+pathTemplate], + Method: r.Method, + PathFormat: pathTemplate, + Endpoint: r.URL.Path, + Payload: bodyBytes, + Parameters: jsonData, + Status: statusCode, + } + userData := r.Context().Value("userData").(*auth.User) + e.services.Tracer.Trace(userData, requestData) + // DEBUG + e.log.Info(r.Context(), "request data: %v", requestData) +} diff --git a/ee/backend/pkg/spot/auth/authorizer.go b/ee/backend/pkg/spot/auth/authorizer.go new file mode 100644 index 000000000..2117ef311 --- /dev/null +++ b/ee/backend/pkg/spot/auth/authorizer.go @@ -0,0 +1,25 @@ +package auth + +import "fmt" + +func (a *authImpl) IsAuthorized(authHeader string, permissions []string, isExtension bool) (*User, error) { + secret := a.secret + if isExtension { + secret = a.spotSecret + } + jwtInfo, err := parseJWT(authHeader, secret) + if err != nil { + return nil, err + } + + user, err := authUser(a.pgconn, jwtInfo.UserId, jwtInfo.TenantID, int(jwtInfo.IssuedAt.Unix())) + if err != nil { + return nil, err + } + for _, perm := range permissions { + if !user.HasPermission(perm) { + return nil, fmt.Errorf("user has no permission") + } + } + return user, nil +} diff --git a/ee/backend/pkg/spot/auth/storage.go b/ee/backend/pkg/spot/auth/storage.go new file mode 100644 index 000000000..7df89bb1d --- /dev/null +++ b/ee/backend/pkg/spot/auth/storage.go @@ -0,0 +1,32 @@ +package auth + +import ( + "fmt" + "openreplay/backend/pkg/db/postgres/pool" +) + +func authUser(conn pool.Pool, userID, tenantID, jwtIAT int) (*User, error) { + sql := `SELECT user_id, users.tenant_id, users.name, email, EXTRACT(epoch FROM spot_jwt_iat)::BIGINT AS spot_jwt_iat, roles.permissions + FROM users + JOIN tenants on users.tenant_id = tenants.tenant_id + JOIN roles on users.role_id = roles.role_id + WHERE users.user_id = $1 AND users.tenant_id = $2 AND users.deleted_at IS NULL ;` + + user := &User{} + var permissions []string + if err := conn.QueryRow(sql, userID, tenantID). + Scan(&user.ID, &user.TenantID, &user.Name, &user.Email, &user.JwtIat, &permissions); err != nil { + return nil, fmt.Errorf("user not found") + } + if user.JwtIat == 0 || abs(jwtIAT-user.JwtIat) > 1 { + return nil, fmt.Errorf("token expired") + } + user.Permissions = make(map[string]bool) + for _, perm := range permissions { + user.Permissions[perm] = true + } + if _, ok := user.Permissions["SPOT"]; !ok { + return nil, fmt.Errorf("user has no permissions") + } + return user, nil +} diff --git a/ee/backend/pkg/spot/builder.go b/ee/backend/pkg/spot/builder.go new file mode 100644 index 000000000..b1827897d --- /dev/null +++ b/ee/backend/pkg/spot/builder.go @@ -0,0 +1,45 @@ +package spot + +import ( + "openreplay/backend/internal/config/spot" + "openreplay/backend/pkg/db/postgres/pool" + "openreplay/backend/pkg/flakeid" + "openreplay/backend/pkg/logger" + "openreplay/backend/pkg/objectstorage" + "openreplay/backend/pkg/objectstorage/store" + "openreplay/backend/pkg/spot/auth" + "openreplay/backend/pkg/spot/service" + "openreplay/backend/pkg/spot/transcoder" +) + +type ServicesBuilder struct { + Flaker *flakeid.Flaker + ObjStorage objectstorage.ObjectStorage + Auth auth.Auth + Spots service.Spots + Keys service.Keys + Transcoder transcoder.Transcoder + Tracer service.Tracer +} + +func NewServiceBuilder(log logger.Logger, cfg *spot.Config, pgconn pool.Pool) (*ServicesBuilder, error) { + objStore, err := store.NewStore(&cfg.ObjectsConfig) + if err != nil { + return nil, err + } + flaker := flakeid.NewFlaker(cfg.WorkerID) + tracer, err := service.NewTracer(log, pgconn) + if err != nil { + return nil, err + } + spots := service.NewSpots(log, pgconn, flaker) + return &ServicesBuilder{ + Flaker: flaker, + ObjStorage: objStore, + Auth: auth.NewAuth(log, cfg.JWTSecret, cfg.JWTSpotSecret, pgconn), + Spots: spots, + Keys: service.NewKeys(log, pgconn), + Transcoder: transcoder.NewTranscoder(cfg, log, objStore, pgconn, spots), + Tracer: tracer, + }, nil +} diff --git a/ee/backend/pkg/spot/service/tracer.go b/ee/backend/pkg/spot/service/tracer.go new file mode 100644 index 000000000..8c3342470 --- /dev/null +++ b/ee/backend/pkg/spot/service/tracer.go @@ -0,0 +1,104 @@ +package service + +import ( + "context" + "errors" + "openreplay/backend/pkg/db/postgres" + db "openreplay/backend/pkg/db/postgres/pool" + "openreplay/backend/pkg/logger" + "openreplay/backend/pkg/pool" + "openreplay/backend/pkg/spot/auth" +) + +type Tracer interface { + Trace(user *auth.User, data *RequestData) error + Close() error +} + +type tracerImpl struct { + log logger.Logger + conn db.Pool + traces postgres.Bulk + saver pool.WorkerPool +} + +func NewTracer(log logger.Logger, conn db.Pool) (Tracer, error) { + switch { + case log == nil: + return nil, errors.New("logger is required") + case conn == nil: + return nil, errors.New("connection is required") + } + tracer := &tracerImpl{ + log: log, + conn: conn, + } + if err := tracer.initBulk(); err != nil { + return nil, err + } + tracer.saver = pool.NewPool(1, 200, tracer.sendTraces) + return tracer, nil +} + +func (t *tracerImpl) initBulk() (err error) { + t.traces, err = postgres.NewBulk(t.conn, + "traces", + "(user_id, tenant_id, auth, action, method, path_format, endpoint, payload, parameters, status)", + "($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", + 10, 50) + if err != nil { + return err + } + return nil +} + +type Task struct { + UserID *uint64 + TenantID uint64 + Auth *string + Data *RequestData +} + +func (t *tracerImpl) sendTraces(payload interface{}) { + rec := payload.(*Task) + t.log.Info(context.Background(), "Sending traces, %v", rec) + if err := t.traces.Append(rec.UserID, rec.TenantID, rec.Auth, rec.Data.Action, rec.Data.Method, rec.Data.PathFormat, + rec.Data.Endpoint, rec.Data.Payload, rec.Data.Parameters, rec.Data.Status); err != nil { + t.log.Error(context.Background(), "can't append trace: %s", err) + } +} + +type RequestData struct { + Action string + Method string + PathFormat string + Endpoint string + Payload []byte + Parameters []byte + Status int +} + +func (t *tracerImpl) Trace(user *auth.User, data *RequestData) error { + switch { + case user == nil: + return errors.New("user is required") + case data == nil: + return errors.New("request is required") + } + trace := &Task{ + UserID: &user.ID, + TenantID: user.TenantID, + Auth: &user.AuthMethod, + Data: data, + } + t.saver.Submit(trace) + return nil +} + +func (t *tracerImpl) Close() error { + t.saver.Stop() + if err := t.traces.Send(); err != nil { + return err + } + return nil +} diff --git a/scripts/helmcharts/openreplay/charts/http/templates/ingress.yaml b/scripts/helmcharts/openreplay/charts/http/templates/ingress.yaml index 07585480b..af2f23ecf 100644 --- a/scripts/helmcharts/openreplay/charts/http/templates/ingress.yaml +++ b/scripts/helmcharts/openreplay/charts/http/templates/ingress.yaml @@ -57,7 +57,7 @@ spec: name: minio port: number: 9000 - path: /(minio|mobs|sessions-assets|frontend|static|sourcemaps|ios-images|records|uxtesting-records)/ + path: /(minio|mobs|sessions-assets|frontend|static|sourcemaps|ios-images|records|uxtesting-records|spots)/ tls: - hosts: - {{ .Values.global.domainName }} diff --git a/scripts/helmcharts/openreplay/charts/spot/.helmignore b/scripts/helmcharts/openreplay/charts/spot/.helmignore new file mode 100644 index 000000000..0e8a0eb36 --- /dev/null +++ b/scripts/helmcharts/openreplay/charts/spot/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/scripts/helmcharts/openreplay/charts/spot/Chart.yaml b/scripts/helmcharts/openreplay/charts/spot/Chart.yaml new file mode 100644 index 000000000..5b489d2f0 --- /dev/null +++ b/scripts/helmcharts/openreplay/charts/spot/Chart.yaml @@ -0,0 +1,24 @@ +apiVersion: v2 +name: spot +description: A Helm chart for Kubernetes + +# A chart can be either an 'application' or a 'library' chart. +# +# Application charts are a collection of templates that can be packaged into versioned archives +# to be deployed. +# +# Library charts provide useful utilities or functions for the chart developer. They're included as +# a dependency of application charts to inject those utilities and functions into the rendering +# pipeline. Library charts do not define any templates and therefore cannot be deployed. +type: application + +# This is the chart version. This version number should be incremented each time you make changes +# to the chart and its templates, including the app version. +# Versions are expected to follow Semantic Versioning (https://semver.org/) +version: 0.1.1 + +# This is the version number of the application being deployed. This version number should be +# incremented each time you make changes to the application. Versions are not expected to +# follow Semantic Versioning. They should reflect the version the application is using. +# It is recommended to use it with quotes. +AppVersion: "v1.18.0" diff --git a/scripts/helmcharts/openreplay/charts/spot/templates/NOTES.txt b/scripts/helmcharts/openreplay/charts/spot/templates/NOTES.txt new file mode 100644 index 000000000..84342f218 --- /dev/null +++ b/scripts/helmcharts/openreplay/charts/spot/templates/NOTES.txt @@ -0,0 +1,22 @@ +1. Get the application URL by running these commands: +{{- if .Values.ingress.enabled }} +{{- range $host := .Values.ingress.hosts }} + {{- range .paths }} + http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }} + {{- end }} +{{- end }} +{{- else if contains "NodePort" .Values.service.type }} + export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "http.fullname" . }}) + export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}") + echo http://$NODE_IP:$NODE_PORT +{{- else if contains "LoadBalancer" .Values.service.type }} + NOTE: It may take a few minutes for the LoadBalancer IP to be available. + You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "spot.fullname" . }}' + export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "spot.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}") + echo http://$SERVICE_IP:{{ .Values.service.port }} +{{- else if contains "ClusterIP" .Values.service.type }} + export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "spot.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}") + export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}") + echo "Visit http://127.0.0.1:8080 to use your application" + kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT +{{- end }} diff --git a/scripts/helmcharts/openreplay/charts/spot/templates/_helpers.tpl b/scripts/helmcharts/openreplay/charts/spot/templates/_helpers.tpl new file mode 100644 index 000000000..7253664ea --- /dev/null +++ b/scripts/helmcharts/openreplay/charts/spot/templates/_helpers.tpl @@ -0,0 +1,62 @@ +{{/* +Expand the name of the chart. +*/}} +{{- define "spot.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +If release name contains chart name it will be used as a full name. +*/}} +{{- define "spot.fullname" -}} +{{- if .Values.fullnameOverride }} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- $name := default .Chart.Name .Values.nameOverride }} +{{- if contains $name .Release.Name }} +{{- .Release.Name | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} +{{- end }} +{{- end }} +{{- end }} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "spot.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Common labels +*/}} +{{- define "spot.labels" -}} +helm.sh/chart: {{ include "spot.chart" . }} +{{ include "spot.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end }} + +{{/* +Selector labels +*/}} +{{- define "spot.selectorLabels" -}} +app.kubernetes.io/name: {{ include "spot.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{/* +Create the name of the service account to use +*/}} +{{- define "spot.serviceAccountName" -}} +{{- if .Values.serviceAccount.create }} +{{- default (include "spot.fullname" .) .Values.serviceAccount.name }} +{{- else }} +{{- default "default" .Values.serviceAccount.name }} +{{- end }} +{{- end }} diff --git a/scripts/helmcharts/openreplay/charts/spot/templates/deployment.yaml b/scripts/helmcharts/openreplay/charts/spot/templates/deployment.yaml new file mode 100644 index 000000000..a6bf93583 --- /dev/null +++ b/scripts/helmcharts/openreplay/charts/spot/templates/deployment.yaml @@ -0,0 +1,140 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "spot.fullname" . }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "spot.labels" . | nindent 4 }} +spec: + {{- if not .Values.autoscaling.enabled }} + replicas: {{ .Values.replicaCount }} + {{- end }} + selector: + matchLabels: + {{- include "spot.selectorLabels" . | nindent 6 }} + template: + metadata: + {{- with .Values.podAnnotations }} + annotations: + {{- toYaml . | nindent 8 }} + {{- end }} + labels: + {{- include "spot.selectorLabels" . | nindent 8 }} + spec: + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + serviceAccountName: {{ include "spot.serviceAccountName" . }} + securityContext: + {{- toYaml .Values.podSecurityContext | nindent 8 }} + shareProcessNamespace: true + containers: + - name: {{ .Chart.Name }} + securityContext: + {{- toYaml .Values.securityContext | nindent 12 }} + {{- if .Values.global.enterpriseEditionLicense }} + image: "{{ tpl .Values.image.repository . }}:{{ .Values.image.tag | default .Chart.AppVersion }}-ee" + {{- else }} + image: "{{ tpl .Values.image.repository . }}:{{ .Values.image.tag | default .Chart.AppVersion }}" + {{- end }} + imagePullPolicy: {{ .Values.image.pullPolicy }} + {{- if .Values.healthCheck}} + {{- .Values.healthCheck | toYaml | nindent 10}} + {{- end}} + env: + {{- range $key, $val := .Values.env }} + - name: {{ $key }} + value: '{{ $val }}' + {{- end}} + {{- range $key, $val := .Values.global.env }} + - name: {{ $key }} + value: '{{ $val }}' + {{- end }} + - name: AWS_ACCESS_KEY_ID + {{- if .Values.global.s3.existingSecret }} + valueFrom: + secretKeyRef: + name: {{ .Values.global.s3.existingSecret }} + key: access-key + {{- else }} + value: {{ .Values.global.s3.accessKey }} + {{- end }} + - name: AWS_SECRET_ACCESS_KEY + {{- if .Values.global.s3.existingSecret }} + valueFrom: + secretKeyRef: + name: {{ .Values.global.s3.existingSecret }} + key: secret-key + {{- else }} + value: {{ .Values.global.s3.secretKey }} + {{- end }} + - name: AWS_REGION + value: '{{ .Values.global.s3.region }}' + - name: AWS_ENDPOINT + value: '{{- include "openreplay.s3Endpoint" . }}' + - name: LICENSE_KEY + value: '{{ .Values.global.enterpriseEditionLicense }}' + - name: KAFKA_SERVERS + value: '{{ .Values.global.kafka.kafkaHost }}:{{ .Values.global.kafka.kafkaPort }}' + - name: KAFKA_USE_SSL + value: '{{ .Values.global.kafka.kafkaUseSsl }}' + - name: pg_password + {{- if .Values.global.postgresql.existingSecret }} + valueFrom: + secretKeyRef: + name: {{ .Values.global.postgresql.existingSecret }} + key: postgresql-postgres-password + {{- else }} + value: '{{ .Values.global.postgresql.postgresqlPassword }}' + {{- end}} + - name: POSTGRES_STRING + value: 'postgres://{{ .Values.global.postgresql.postgresqlUser }}:$(pg_password)@{{ .Values.global.postgresql.postgresqlHost }}:{{ .Values.global.postgresql.postgresqlPort }}/{{ .Values.global.postgresql.postgresqlDatabase }}' + {{- include "openreplay.env.redis_string" .Values.global.redis | nindent 12 }} + ports: + {{- range $key, $val := .Values.service.ports }} + - name: {{ $key }} + containerPort: {{ $val }} + protocol: TCP + {{- end }} + volumeMounts: + - name: datadir + mountPath: /mnt/efs + {{- include "openreplay.volume.redis_ca_certificate.mount" .Values.global.redis | nindent 12 }} + {{- with .Values.persistence.mounts }} + {{- toYaml . | nindent 12 }} + {{- end }} + resources: + {{- toYaml .Values.resources | nindent 12 }} + {{- if eq (tpl .Values.pvc.name . ) "hostPath" }} + volumes: + {{- with .Values.persistence.volumes }} + {{- toYaml . | nindent 6 }} + {{- end }} + - name: datadir + hostPath: + # Ensure the file directory is created. + path: {{ tpl .Values.pvc.hostMountPath . }} + type: DirectoryOrCreate + {{- else }} + volumes: + {{- with .Values.persistence.volumes }} + {{- toYaml . | nindent 8 }} + {{- end }} + - name: datadir + persistentVolumeClaim: + claimName: "{{ tpl .Values.pvc.name . }}" + {{- end }} + {{- include "openreplay.volume.redis_ca_certificate" .Values.global.redis | nindent 6 }} + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/scripts/helmcharts/openreplay/charts/spot/templates/hpa.yaml b/scripts/helmcharts/openreplay/charts/spot/templates/hpa.yaml new file mode 100644 index 000000000..c39bbe733 --- /dev/null +++ b/scripts/helmcharts/openreplay/charts/spot/templates/hpa.yaml @@ -0,0 +1,29 @@ +{{- if .Values.autoscaling.enabled }} +apiVersion: autoscaling/v2beta1 +kind: HorizontalPodAutoscaler +metadata: + name: {{ include "spot.fullname" . }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "spot.labels" . | nindent 4 }} +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: {{ include "spot.fullname" . }} + minReplicas: {{ .Values.autoscaling.minReplicas }} + maxReplicas: {{ .Values.autoscaling.maxReplicas }} + metrics: + {{- if .Values.autoscaling.targetCPUUtilizationPercentage }} + - type: Resource + resource: + name: cpu + targetAverageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }} + {{- end }} + {{- if .Values.autoscaling.targetMemoryUtilizationPercentage }} + - type: Resource + resource: + name: memory + targetAverageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }} + {{- end }} +{{- end }} diff --git a/scripts/helmcharts/openreplay/charts/spot/templates/ingress.yaml b/scripts/helmcharts/openreplay/charts/spot/templates/ingress.yaml new file mode 100644 index 000000000..778cf548c --- /dev/null +++ b/scripts/helmcharts/openreplay/charts/spot/templates/ingress.yaml @@ -0,0 +1,36 @@ +{{- if .Values.ingress.enabled }} +{{- $fullName := include "spot.fullname" . -}} +{{- $svcPort := .Values.service.ports.http -}} +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: {{ $fullName }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "spot.labels" . | nindent 4 }} + annotations: + {{- with .Values.ingress.annotations }} + {{- toYaml . | nindent 4 }} + {{- end }} + nginx.ingress.kubernetes.io/rewrite-target: /$1 + nginx.ingress.kubernetes.io/upstream-hash-by: $http_x_forwarded_for +spec: + ingressClassName: "{{ tpl .Values.ingress.className . }}" + tls: + - hosts: + - {{ .Values.global.domainName }} + {{- if .Values.ingress.tls.secretName}} + secretName: {{ .Values.ingress.tls.secretName }} + {{- end}} + rules: + - host: {{ .Values.global.domainName }} + http: + paths: + - pathType: Prefix + backend: + service: + name: {{ $fullName }} + port: + number: {{ $svcPort }} + path: /spot/(.*) +{{- end }} diff --git a/scripts/helmcharts/openreplay/charts/spot/templates/service.yaml b/scripts/helmcharts/openreplay/charts/spot/templates/service.yaml new file mode 100644 index 000000000..973c77f75 --- /dev/null +++ b/scripts/helmcharts/openreplay/charts/spot/templates/service.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ include "spot.fullname" . }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "spot.labels" . | nindent 4 }} +spec: + type: {{ .Values.service.type }} + ports: + {{- range $key, $val := .Values.service.ports }} + - port: {{ $val }} + targetPort: {{ $key }} + protocol: TCP + name: {{ $key }} + {{- end}} + selector: + {{- include "spot.selectorLabels" . | nindent 4 }} diff --git a/scripts/helmcharts/openreplay/charts/spot/templates/serviceMonitor.yaml b/scripts/helmcharts/openreplay/charts/spot/templates/serviceMonitor.yaml new file mode 100644 index 000000000..bacbc61eb --- /dev/null +++ b/scripts/helmcharts/openreplay/charts/spot/templates/serviceMonitor.yaml @@ -0,0 +1,18 @@ +{{- if and ( .Capabilities.APIVersions.Has "monitoring.coreos.com/v1" ) ( .Values.serviceMonitor.enabled ) }} +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: {{ include "spot.fullname" . }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "spot.labels" . | nindent 4 }} + {{- if .Values.serviceMonitor.additionalLabels }} + {{- toYaml .Values.serviceMonitor.additionalLabels | nindent 4 }} + {{- end }} +spec: + endpoints: + {{- .Values.serviceMonitor.scrapeConfigs | toYaml | nindent 4 }} + selector: + matchLabels: + {{- include "spot.selectorLabels" . | nindent 6 }} +{{- end }} diff --git a/scripts/helmcharts/openreplay/charts/spot/templates/serviceaccount.yaml b/scripts/helmcharts/openreplay/charts/spot/templates/serviceaccount.yaml new file mode 100644 index 000000000..363621dab --- /dev/null +++ b/scripts/helmcharts/openreplay/charts/spot/templates/serviceaccount.yaml @@ -0,0 +1,13 @@ +{{- if .Values.serviceAccount.create -}} +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ include "spot.serviceAccountName" . }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "spot.labels" . | nindent 4 }} + {{- with .Values.serviceAccount.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +{{- end }} diff --git a/scripts/helmcharts/openreplay/charts/spot/templates/tests/test-connection.yaml b/scripts/helmcharts/openreplay/charts/spot/templates/tests/test-connection.yaml new file mode 100644 index 000000000..c01196213 --- /dev/null +++ b/scripts/helmcharts/openreplay/charts/spot/templates/tests/test-connection.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Pod +metadata: + name: "{{ include "spot.fullname" . }}-test-connection" + labels: + {{- include "spot.labels" . | nindent 4 }} + annotations: + "helm.sh/hook": test +spec: + containers: + - name: wget + image: busybox + command: ['wget'] + args: ['{{ include "spot.fullname" . }}:{{ .Values.service.port }}'] + restartPolicy: Never diff --git a/scripts/helmcharts/openreplay/charts/spot/values.yaml b/scripts/helmcharts/openreplay/charts/spot/values.yaml new file mode 100644 index 000000000..36e058136 --- /dev/null +++ b/scripts/helmcharts/openreplay/charts/spot/values.yaml @@ -0,0 +1,133 @@ +# Default values for openreplay. +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. + +replicaCount: 1 + +image: + repository: "{{ .Values.global.openReplayContainerRegistry }}/spot" + pullPolicy: IfNotPresent + # Overrides the image tag whose default is the chart appVersion. + tag: "" + +imagePullSecrets: [] +nameOverride: "spot" +fullnameOverride: "spot-openreplay" + +serviceAccount: + # Specifies whether a service account should be created + create: true + # Annotations to add to the service account + annotations: {} + # The name of the service account to use. + # If not set and create is true, a name is generated using the fullname template + name: "" + +podAnnotations: {} + +securityContext: + runAsUser: 1001 + runAsGroup: 1001 +podSecurityContext: + runAsUser: 1001 + runAsGroup: 1001 + fsGroup: 1001 + fsGroupChangePolicy: "OnRootMismatch" +# podSecurityContext: {} + # fsGroup: 2000 + +# securityContext: {} + # capabilities: + # drop: + # - ALL + # readOnlyRootFilesystem: true + # runAsNonRoot: true + # runAsUser: 1000 + +service: + type: ClusterIP + ports: + http: 8080 + metrics: 8888 + +serviceMonitor: + enabled: true + additionalLabels: + release: observability + scrapeConfigs: + - port: metrics + honorLabels: true + interval: 15s + path: /metrics + scheme: http + scrapeTimeout: 10s + +ingress: + enabled: true + className: "{{ .Values.global.ingress.controller.ingressClassResource.name }}" + annotations: + cert-manager.io/cluster-issuer: "letsencrypt-prod" + nginx.ingress.kubernetes.io/proxy-connect-timeout: "120" + nginx.ingress.kubernetes.io/proxy-send-timeout: "300" + nginx.ingress.kubernetes.io/proxy-read-timeout: "300" + nginx.ingress.kubernetes.io/cors-allow-methods: POST,PATCH,DELETE + nginx.ingress.kubernetes.io/cors-allow-headers: Content-Type,Authorization,Content-Encoding,X-Openreplay-Batch + nginx.ingress.kubernetes.io/cors-allow-origin: '*' + nginx.ingress.kubernetes.io/enable-cors: "true" + nginx.ingress.kubernetes.io/cors-expose-headers: "Content-Length" + # kubernetes.io/ingress.class: nginx + # kubernetes.io/tls-acme: "true" + tls: + secretName: openreplay-ssl + +resources: {} + # We usually recommend not to specify default resources and to leave this as a conscious + # choice for the user. This also increases chances charts run on environments with little + # resources, such as Minikube. If you do want to specify resources, uncomment the following + # lines, adjust them as necessary, and remove the curly braces after 'resources:'. + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + +autoscaling: + enabled: false + minReplicas: 1 + maxReplicas: 5 + targetCPUUtilizationPercentage: 80 + # targetMemoryUtilizationPercentage: 80 + +env: + TOKEN_SECRET: secret_token_string # TODO: generate on build + CACHE_ASSETS: true + FS_CLEAN_HRS: 24 + + +pvc: + # This can be either persistentVolumeClaim or hostPath. + # In case of pvc, you'll have to provide the pvc name. + # For example + # name: openreplay-efs + name: "{{ .Values.global.pvcRWXName }}" + hostMountPath: "{{ .Values.global.orTmpDir }}" + + +nodeSelector: {} + +tolerations: [] + +affinity: {} + + +persistence: {} + # # Spec of spec.template.spec.containers[*].volumeMounts + # mounts: + # - name: kafka-ssl + # mountPath: /opt/kafka/ssl + # # Spec of spec.template.spec.volumes + # volumes: + # - name: kafka-ssl + # secret: + # secretName: kafka-ssl