diff --git a/api/auth_middleware.go b/api/auth_middleware.go index 05cc3bbe..2060f207 100644 --- a/api/auth_middleware.go +++ b/api/auth_middleware.go @@ -347,8 +347,16 @@ func (app *ApiServer) authMiddleware(c *fiber.Ctx) error { return fiber.NewError(fiber.StatusUnauthorized, "Invalid or expired access token") } - // Not authorized to act on behalf of myId - if myId != 0 && !pkceAuthed && !app.isAuthorizedRequest(c.Context(), myId, wallet) { + // Not authorized to act on behalf of myId. + // + // Exception: /users/:userId/feed/for-you accepts user_id as a viewer hint + // used only for response decoration (has_current_user_reposted etc.); the + // path :userId — not user_id — controls what gets personalized. Treat the + // query user_id as advisory rather than authoritative on this route so + // the endpoint can be called like the other public read endpoints. + allowUnauthenticatedViewerId := strings.HasSuffix(c.Path(), "/feed/for-you") + + if myId != 0 && !pkceAuthed && !allowUnauthenticatedViewerId && !app.isAuthorizedRequest(c.Context(), myId, wallet) { return fiber.NewError( fiber.StatusForbidden, fmt.Sprintf( diff --git a/api/server.go b/api/server.go index 15bab7f9..6796d8bd 100644 --- a/api/server.go +++ b/api/server.go @@ -454,6 +454,7 @@ func NewApiServer(config config.Config) *ApiServer { g.Get("/users/:userId/contests", app.v1UserContests) g.Get("/users/:userId/playlists", app.v1UserPlaylists) g.Get("/users/:userId/feed", app.v1UsersFeed) + g.Get("/users/:userId/feed/for-you", app.v1FeedForYou) g.Get("/users/:userId/connected_wallets", app.v1UsersConnectedWallets) g.Get("/users/:userId/transactions/audio", app.v1UsersTransactionsAudio) g.Get("/users/:userId/transactions/audio/count", app.v1UsersTransactionsAudioCount) diff --git a/api/swagger/swagger-v1.yaml b/api/swagger/swagger-v1.yaml index 7993a739..433ca891 100644 --- a/api/swagger/swagger-v1.yaml +++ b/api/swagger/swagger-v1.yaml @@ -9433,6 +9433,75 @@ paths: "500": description: Server error content: {} + /users/{id}/feed/for-you: + get: + tags: + - users + summary: Get For You feed for user + description: + Returns a personalized For You feed for the user identified in the + path. Twitter-style multi-source pipeline — candidate retrieval + (in-network, trending, underground) → linear ranking (recency + decay × engagement × social affinity, weighted by source) → + diversity (per-artist cap + consecutive-same-artist lookahead). + operationId: Get User For You Feed + security: + - {} + - OAuth2: + - read + parameters: + - name: id + in: path + description: A User ID + required: true + schema: + type: string + - name: limit + in: query + description: The number of items to fetch + schema: + type: integer + default: 25 + minimum: 1 + maximum: 100 + - name: offset + in: query + description: + The number of items to skip. Useful for pagination (page number + * limit) + schema: + type: integer + default: 0 + minimum: 0 + maximum: 200 + - name: max_per_artist + in: query + description: + Maximum number of tracks per artist on a single page. Used by the + diversity pass to cap consecutive same-artist results. + schema: + type: integer + default: 3 + minimum: 1 + maximum: 10 + - name: user_id + in: query + description: The user ID of the user making the request + schema: + type: string + responses: + "200": + description: Success + content: + application/json: + schema: + $ref: "#/components/schemas/tracks" + "400": + description: Bad request + content: {} + "500": + description: Server error + content: {} /users/{id}/library/albums: get: tags: diff --git a/api/v1_events_remix_contests.go b/api/v1_events_remix_contests.go index 3e75696c..859cede3 100644 --- a/api/v1_events_remix_contests.go +++ b/api/v1_events_remix_contests.go @@ -36,14 +36,17 @@ func (app *ApiServer) v1EventsRemixContests(c *fiber.Ctx) error { "u.is_deactivated = false", "u.is_available = true", "(e.entity_type != 'track' OR (t.track_id IS NOT NULL AND t.is_delete = false AND t.is_unlisted = false))", - // Shadow-ban filters — mirror what v1_event_comments.go applies to - // comment authors. Two parallel signals so the filter catches the - // full population: low-quality / impersonator / bot accounts via - // `aggregate_user.score < 0`, and community-flagged users via the - // karma-muted set (sum of muters' follower_count crosses the - // karmaCommentCountThreshold). Hosts in either bucket disappear - // from the discovery list. - "e.user_id NOT IN (SELECT user_id FROM low_abuse_score)", + // Shadow-ban filters — two parallel community signals lifted from + // v1_event_comments / v1_track_comments. A host disappears from the + // discovery list if either: + // 1. They authored a comment that crossed the high-karma-reporter + // threshold (sum of reporters' follower_count exceeds + // karmaCommentCountThreshold) — same threshold that hides the + // comment itself on comment endpoints, just lifted from + // comment_id to author user_id. + // 2. They are in the karma-muted set (sum of muters' + // follower_count crosses karmaCommentCountThreshold). + "e.user_id NOT IN (SELECT user_id FROM karma_reported_authors)", "e.user_id NOT IN (SELECT muted_user_id FROM muted_by_karma)", } @@ -67,8 +70,23 @@ func (app *ApiServer) v1EventsRemixContests(c *fiber.Ctx) error { GROUP BY muted_user_id HAVING SUM(aggregate_user.follower_count) >= @karmaCommentCountThreshold ), - low_abuse_score AS ( - SELECT user_id FROM aggregate_user WHERE score < 0 + -- Comments that crossed the high-karma reporter threshold — identical + -- shape to high_karma_reporters in v1_track_comments / v1_event_comments. + high_karma_reporters AS ( + SELECT comment_reports.comment_id + FROM comment_reports + JOIN aggregate_user ON comment_reports.user_id = aggregate_user.user_id + WHERE comment_reports.is_delete = false + GROUP BY comment_reports.comment_id + HAVING SUM(aggregate_user.follower_count) >= @karmaCommentCountThreshold + ), + -- Authors of any comment in high_karma_reporters. Lifts the per-comment + -- shadow-ban signal up to the user level so the contest list hides + -- hosts whose comments are already being hidden by the same threshold. + karma_reported_authors AS ( + SELECT DISTINCT comments.user_id + FROM comments + JOIN high_karma_reporters ON high_karma_reporters.comment_id = comments.comment_id ) SELECT e.event_id, diff --git a/api/v1_events_remix_contests_test.go b/api/v1_events_remix_contests_test.go index db72adb5..f0d1e448 100644 --- a/api/v1_events_remix_contests_test.go +++ b/api/v1_events_remix_contests_test.go @@ -464,23 +464,29 @@ func TestRemixContestsExcludesUnavailableContent(t *testing.T) { // TestRemixContestsExcludesShadowbannedHosts covers the two parallel // shadow-ban signals applied to the discovery list: -// 1. `aggregate_user.score < 0` (account-quality signal — bots, -// impersonators, fast-challenge runners). +// 1. `karma_reported_authors` — host authored a comment that crossed the +// high-karma-reporter threshold (sum of reporters' follower_count +// >= karmaCommentCountThreshold). Same threshold that hides the +// comment itself on comment endpoints, lifted to author user_id. // 2. The karma-muted set — host has been muted by users whose combined -// follower_count crosses karmaCommentCountThreshold (community-driven -// signal). Same shape used in v1_event_comments for comment authors. +// follower_count crosses karmaCommentCountThreshold. func TestRemixContestsExcludesShadowbannedHosts(t *testing.T) { app := emptyTestApp(t) cleanHostID := 9601 - lowScoreHostID := 9602 + karmaReportedHostID := 9602 karmaMutedHostID := 9603 - highKarmaMuterID := 9604 + highKarmaUserID := 9604 cleanTrackID := 8601 - lowScoreTrackID := 8602 + karmaReportedTrackID := 8602 karmaMutedTrackID := 8603 + // Comment authored by karmaReportedHost on its own track; the high-karma + // user reports it, which should cross the threshold and propagate the + // shadow-ban from comment_id up to the host's user_id. + reportedCommentID := 7701 + start := parseTime(t, "2024-01-02") end := parseTime(t, "2099-01-01") @@ -493,7 +499,7 @@ func TestRemixContestsExcludesShadowbannedHosts(t *testing.T) { }, { "event_id": 802, "event_type": "remix_contest", "entity_type": "track", - "entity_id": lowScoreTrackID, "user_id": lowScoreHostID, + "entity_id": karmaReportedTrackID, "user_id": karmaReportedHostID, "created_at": start, "end_date": end, }, { @@ -504,36 +510,39 @@ func TestRemixContestsExcludesShadowbannedHosts(t *testing.T) { }, "users": []map[string]any{ {"user_id": cleanHostID, "handle": "clean_host"}, - {"user_id": lowScoreHostID, "handle": "low_score_host"}, + {"user_id": karmaReportedHostID, "handle": "karma_reported_host"}, {"user_id": karmaMutedHostID, "handle": "karma_muted_host"}, - {"user_id": highKarmaMuterID, "handle": "high_karma_muter"}, + {"user_id": highKarmaUserID, "handle": "high_karma_user"}, }, "tracks": []map[string]any{ {"track_id": cleanTrackID, "owner_id": cleanHostID, "created_at": start}, - {"track_id": lowScoreTrackID, "owner_id": lowScoreHostID, "created_at": start}, + {"track_id": karmaReportedTrackID, "owner_id": karmaReportedHostID, "created_at": start}, {"track_id": karmaMutedTrackID, "owner_id": karmaMutedHostID, "created_at": start}, }, + "comments": []map[string]any{ + { + "comment_id": reportedCommentID, "user_id": karmaReportedHostID, + "entity_id": karmaReportedTrackID, "entity_type": "Track", "text": "reported comment", + }, + }, + "comment_reports": []map[string]any{ + {"comment_id": reportedCommentID, "user_id": highKarmaUserID}, + }, "muted_users": []map[string]any{ - // High-karma muter mutes the karma-muted host — combined with the + // High-karma user mutes the karma-muted host — combined with the // follower_count bump below, this should cross the threshold. - {"user_id": highKarmaMuterID, "muted_user_id": karmaMutedHostID}, + {"user_id": highKarmaUserID, "muted_user_id": karmaMutedHostID}, }, } database.Seed(app.pool.Replicas[0], fixtures) - // `aggregate_user` rows are created by the users trigger; tweak the two - // fields we care about: score on the low-score host, and the muter's - // follower_count so the karma-muted CTE actually trips. + // `aggregate_user` rows are created by the users trigger; bump the + // high-karma user's follower_count past the threshold so both + // karma_reported_authors (via comment_reports) and muted_by_karma + // (via muted_users) trip on their respective host. _, err := app.pool.Exec(context.Background(), - `UPDATE aggregate_user SET score = $1 WHERE user_id = $2`, - -1, lowScoreHostID, - ) - if err != nil { - t.Fatal(err) - } - _, err = app.pool.Exec(context.Background(), `UPDATE aggregate_user SET follower_count = $1 WHERE user_id = $2`, - karmaCommentCountThreshold+1, highKarmaMuterID, + karmaCommentCountThreshold+1, highKarmaUserID, ) if err != nil { t.Fatal(err) @@ -550,11 +559,11 @@ func TestRemixContestsExcludesShadowbannedHosts(t *testing.T) { }) }) - t.Run("host with score < 0 is excluded", func(t *testing.T) { + t.Run("host with karma-reported comment is excluded", func(t *testing.T) { _, body := testGet(t, app, "/v1/events/remix-contests") eventIds := pluckStrings(body, "data.#.event_id") assert.NotContains(t, eventIds, trashid.MustEncodeHashID(802), - "contest hosted by a user with aggregate_user.score < 0 must not be returned") + "contest hosted by a user who authored a comment in high_karma_reporters must not be returned") }) t.Run("karma-muted host is excluded", func(t *testing.T) { diff --git a/api/v1_users_feed_for_you.go b/api/v1_users_feed_for_you.go new file mode 100644 index 00000000..4a0208ed --- /dev/null +++ b/api/v1_users_feed_for_you.go @@ -0,0 +1,394 @@ +package api + +import ( + "api.audius.co/api/dbv1" + "github.com/gofiber/fiber/v2" + "github.com/jackc/pgx/v5" +) + +type GetUsersFeedForYouParams struct { + Limit int `query:"limit" default:"25" validate:"min=1,max=100"` + Offset int `query:"offset" default:"0" validate:"min=0,max=200"` + MaxPerArtist int `query:"max_per_artist" default:"3" validate:"min=1,max=10"` +} + +// v1FeedForYou returns a personalized "For You" track feed for the user +// identified in the path. Modeled on Twitter's open-sourced 2023 +// algorithm (the-algorithm-ml). The shape of the pipeline is +// candidate-retrieval → ranking → filtering+diversity, the same +// three-stage pattern Twitter uses on top of a learned "heavy ranker." +// Audius doesn't yet have a trained ranker, so the heavy ranker is +// approximated by a hand-tuned linear blend below; the candidate +// retrieval / diversity passes carry over directly so a learned model +// can drop in later. +// +// 1. CANDIDATE RETRIEVAL (UNION across three sources, each capped): +// - in_network — tracks uploaded in the last 14 days by users I follow. +// Strongest "this is for me" signal. Capped at 200. +// - trending — top week-trending from track_trending_scores. +// Mirrors GET /tracks/trending. Capped at 100. +// - underground — week-trending tracks whose owner has < 1500 +// follower & following count (mirrors GET /tracks/trending/underground). +// Capped at 50. +// +// The original v1 of this endpoint also had a 4th `similar_artists` +// source — a 1-hop saves-graph collaborative filter. It was removed +// because the saves self-join produced a 301M-row merge for power users +// on prod and the endpoint never reliably completed within Cloudflare's +// upstream limit. The three-source pipeline below is the lean +// replacement. +// +// 2. RANKING — three light signals combined linearly: +// +// recency_score = exp(-ln(2) * age_hours / 48) +// // 48h half-life: 48h-old → 0.5, 96h → 0.25 +// engagement_score = ln(1 + 3*saves + 2*reposts + 1*plays) / 12 +// // saves > reposts > plays, log-compressed +// social_boost = 1.0 + min(my_affinity_with_artist / 4, 1) +// // up to ~2x for artists I already engage with often +// source_weight = {in_network: 1.20, trending: 1.00, +// underground: 0.95} +// +// final_score = (0.55 * recency_score + 0.45 * engagement_score) +// * social_boost * source_weight +// +// 3. FILTERS — applied once after the union to keep the candidate set cheap: +// - is_delete / is_unlisted / is_available / stem_of (track liveness) +// - users.is_deactivated / is_available (owner liveness — same shape +// as v1_events_remix_contests.go) +// - access_authorities: caller's wallet must be on the list, else +// ungated only (matches the v1_users_feed authed-wallet pattern) +// - already-saved by the path-param user (don't resurface) +// - the path-param user's own uploads +// +// 4. DIVERSITY — author-cap of N tracks per owner via row_number() +// (configurable via max_per_artist; default 3), then a Go-side greedy +// pass that, when the next track shares an owner with the previously +// emitted one, prefers the next non-same-owner candidate within a +// 5-position lookahead. This is a "consecutive same-artist penalty" +// without paying for a second ranker. +// +// PERFORMANCE NOTES: follow_set, the affinity sub-selects, and the +// affinity join itself are all bounded — old engagement is weak signal +// of current taste and unbounded scans are what previously took the +// endpoint over the Cloudflare upstream limit for power users +// (see PRs #805 and #806). The trending source relies on a partial +// index on track_trending_scores for the TRACKS/pnagD/week/null-genre +// slice (see ddl/migrations/0198_*). +// +// PAGINATION is offset/limit applied on the diversity-ordered list, so +// pages are stable as long as the underlying scores haven't shifted. +// +// Path: +// - id (required): the user being personalized for. Resolved by +// requireUserIdMiddleware; the handler returns the 400 from that +// middleware on an invalid hash id. +// +// Query params: +// - limit (default 25, max 100) +// - offset (default 0, max 200) +// - max_per_artist (default 3, max 10) — author cap per page +// - user_id (optional): the caller's id. Independent of the +// path id — used to populate has_current_user_reposted and similar +// viewer-relative fields on the returned tracks. Same role it plays +// on every other /v1/users/{id}/... endpoint. The auth middleware +// treats this as advisory rather than authoritative on this route. +func (app *ApiServer) v1FeedForYou(c *fiber.Ctx) error { + var params = GetUsersFeedForYouParams{} + if err := app.ParseAndValidateQueryParams(c, ¶ms); err != nil { + return err + } + + userId := app.getUserId(c) + myId := app.getMyId(c) + authedWallet := app.tryGetAuthedWallet(c) + + // Pull a candidate pool larger than the page size so the diversity + // cap and the consecutive-same-artist pass have headroom to reorder. + const candidatePoolSize = 200 + + sql := ` + WITH + -- Cap to the 500 most-recently-followed users. A power user with + -- thousands of follows pulls a huge hash table here that then has to + -- join against every recent track upload to find in-network candidates, + -- so the planner can stall. Recent follows are a better signal of + -- current taste anyway. + follow_set AS ( + SELECT followee_user_id AS user_id + FROM follows + WHERE follower_user_id = @userId + AND is_current = true + AND is_delete = false + ORDER BY created_at DESC + LIMIT 500 + ), + my_saved_tracks AS ( + SELECT save_item_id AS track_id + FROM saves + WHERE user_id = @userId + AND save_type = 'track' + AND is_current = true + AND is_delete = false + ), + -- Per-artist engagement strength (saves + reposts + plays of any of + -- their tracks by me). Used for the social_boost multiplier. + -- + -- Each sub-select is bounded by recency: a heavy listener can have + -- hundreds of thousands of play rows, and the unbounded union forces + -- a full scan of those rows on every request. The inner join against + -- tracks is further restricted to tracks created in the last 90 days + -- so old uploads can't pull this CTE wide. Old engagement is a weak + -- signal of current taste anyway. + my_artist_affinity AS ( + SELECT t.owner_id AS artist_id, + LN(1 + COUNT(*)) AS affinity + FROM ( + (SELECT save_item_id AS track_id FROM saves + WHERE user_id = @userId AND save_type = 'track' + AND is_current = true AND is_delete = false + ORDER BY created_at DESC + LIMIT 200) + UNION ALL + (SELECT repost_item_id AS track_id FROM reposts + WHERE user_id = @userId AND repost_type = 'track' + AND is_current = true AND is_delete = false + ORDER BY created_at DESC + LIMIT 200) + UNION ALL + (SELECT play_item_id AS track_id FROM plays + WHERE user_id = @userId + ORDER BY created_at DESC + LIMIT 500) + ) eng + JOIN tracks t ON t.track_id = eng.track_id + AND t.created_at >= NOW() - INTERVAL '90 days' + GROUP BY t.owner_id + ), + -- Source 1: in-network (followed-creator) recent uploads. + -- Bounded so a power-user with thousands of follows doesn't pull a + -- multi-thousand-row pool we'd just throw away after ranking. + cand_in_network AS ( + SELECT t.track_id, 'in_network'::text AS source + FROM tracks t + JOIN follow_set f ON f.user_id = t.owner_id + WHERE t.is_current = true + AND t.is_delete = false + AND t.is_unlisted = false + AND t.is_available = true + AND t.stem_of IS NULL + AND t.created_at >= NOW() - INTERVAL '14 days' + ORDER BY t.created_at DESC + LIMIT 200 + ), + -- Source 2: weekly trending. + cand_trending AS ( + SELECT tts.track_id, 'trending'::text AS source + FROM track_trending_scores tts + JOIN tracks t ON t.track_id = tts.track_id + AND t.is_current = true + AND t.is_delete = false + AND t.is_unlisted = false + AND t.is_available = true + WHERE tts.type = 'TRACKS' + AND tts.version = 'pnagD' + AND tts.time_range = 'week' + AND (tts.genre IS NULL OR tts.genre = '') + ORDER BY tts.score DESC, tts.track_id DESC + LIMIT 100 + ), + -- Source 3: underground trending (sub-1500 follower & following artists). + cand_underground AS ( + SELECT tts.track_id, 'underground'::text AS source + FROM track_trending_scores tts + JOIN tracks t ON t.track_id = tts.track_id + AND t.is_current = true + AND t.is_delete = false + AND t.is_unlisted = false + AND t.is_available = true + JOIN aggregate_user au ON au.user_id = t.owner_id + WHERE tts.type = 'TRACKS' + AND tts.version = 'pnagD' + AND tts.time_range = 'week' + AND (tts.genre IS NULL OR tts.genre = '') + AND au.follower_count < 1500 + AND au.following_count < 1500 + ORDER BY tts.score DESC, tts.track_id DESC + LIMIT 50 + ), + -- One row per track_id. DISTINCT ON keeps the strongest (lowest-prio) + -- source so an in-network track that's also trending keeps the + -- in_network weight rather than the lower trending weight. + candidates AS ( + SELECT DISTINCT ON (track_id) track_id, source + FROM ( + SELECT track_id, source, 1 AS prio FROM cand_in_network + UNION ALL + SELECT track_id, source, 2 AS prio FROM cand_trending + UNION ALL + SELECT track_id, source, 3 AS prio FROM cand_underground + ) u + ORDER BY track_id, prio + ), + filtered AS ( + SELECT + c.track_id, + c.source, + t.owner_id, + t.created_at, + COALESCE(at.save_count, 0) AS save_count, + COALESCE(at.repost_count, 0) AS repost_count, + COALESCE(ap.count, 0) AS play_count, + COALESCE(maa.affinity, 0) AS affinity + FROM candidates c + JOIN tracks t ON t.track_id = c.track_id + JOIN users u ON u.user_id = t.owner_id + LEFT JOIN aggregate_track at ON at.track_id = c.track_id + LEFT JOIN aggregate_plays ap ON ap.play_item_id = c.track_id + LEFT JOIN my_artist_affinity maa ON maa.artist_id = t.owner_id + WHERE t.is_current = true + AND t.is_delete = false + AND t.is_unlisted = false + AND t.is_available = true + AND t.stem_of IS NULL + AND u.is_current = true + AND u.is_deactivated = false + AND u.is_available = true + AND (t.access_authorities IS NULL + OR (COALESCE(@authed_wallet, '') <> '' + AND EXISTS ( + SELECT 1 FROM unnest(t.access_authorities) aa + WHERE lower(aa) = lower(@authed_wallet) + ))) + AND NOT EXISTS ( + SELECT 1 FROM my_saved_tracks ms WHERE ms.track_id = c.track_id + ) + AND t.owner_id <> @userId + ), + scored AS ( + SELECT + f.track_id, + f.owner_id, + EXP(-LN(2) * GREATEST(EXTRACT(EPOCH FROM (NOW() - f.created_at)) / 3600.0, 0) / 48.0) + AS recency_score, + LN(1 + 3 * f.save_count + 2 * f.repost_count + f.play_count) / 12.0 + AS engagement_score, + 1.0 + LEAST(f.affinity / 4.0, 1.0) AS social_boost, + CASE f.source + WHEN 'in_network' THEN 1.20 + WHEN 'trending' THEN 1.00 + WHEN 'underground' THEN 0.95 + ELSE 1.00 + END AS source_weight + FROM filtered f + ), + final_scored AS ( + SELECT + track_id, + owner_id, + (0.55 * recency_score + 0.45 * engagement_score) + * social_boost * source_weight AS score + FROM scored + ), + capped AS ( + SELECT track_id, owner_id, score, + ROW_NUMBER() OVER (PARTITION BY owner_id ORDER BY score DESC, track_id DESC) + AS rn_artist + FROM final_scored + ) + SELECT track_id, owner_id + FROM capped + WHERE rn_artist <= @maxPerArtist + ORDER BY score DESC, track_id DESC + LIMIT @poolSize + ` + + rows, err := app.pool.Query(c.Context(), sql, pgx.NamedArgs{ + "userId": userId, + "poolSize": candidatePoolSize, + "maxPerArtist": params.MaxPerArtist, + "authed_wallet": authedWallet, + }) + if err != nil { + return err + } + + type ranked struct { + TrackID int32 + OwnerID int32 + } + pool, err := pgx.CollectRows(rows, pgx.RowToStructByPos[ranked]) + if err != nil { + return err + } + + // Greedy diversity pass: keep the global rank order, but if the next + // track shares an owner with the one just emitted, prefer the next + // non-same-owner candidate within a small lookahead. Soft penalty on + // consecutive-same-artist runs without computing a second ranker. + const lookahead = 5 + ordered := make([]ranked, 0, len(pool)) + used := make([]bool, len(pool)) + var lastOwner int32 = -1 + for n := 0; n < len(pool); n++ { + pickIdx := -1 + for i := 0; i < len(pool) && i < n+lookahead+1; i++ { + if used[i] { + continue + } + if pickIdx == -1 { + pickIdx = i + } + if pool[i].OwnerID != lastOwner { + pickIdx = i + break + } + } + if pickIdx == -1 { + break + } + used[pickIdx] = true + ordered = append(ordered, pool[pickIdx]) + lastOwner = pool[pickIdx].OwnerID + } + + start := params.Offset + if start > len(ordered) { + start = len(ordered) + } + end := start + params.Limit + if end > len(ordered) { + end = len(ordered) + } + page := ordered[start:end] + + trackIds := make([]int32, len(page)) + for i, r := range page { + trackIds[i] = r.TrackID + } + + tracks, err := app.queries.Tracks(c.Context(), dbv1.TracksParams{ + GetTracksParams: dbv1.GetTracksParams{ + Ids: trackIds, + MyID: myId, + AuthedWallet: authedWallet, + }, + }) + if err != nil { + return err + } + + // Tracks() returns rows in id order; re-emit in our ranked order. + byId := make(map[int32]dbv1.Track, len(tracks)) + for _, t := range tracks { + byId[t.TrackID] = t + } + sorted := make([]dbv1.Track, 0, len(trackIds)) + for _, id := range trackIds { + if t, ok := byId[id]; ok { + sorted = append(sorted, t) + } + } + + return v1TracksResponse(c, sorted) +} diff --git a/api/v1_users_feed_for_you_test.go b/api/v1_users_feed_for_you_test.go new file mode 100644 index 00000000..0811830f --- /dev/null +++ b/api/v1_users_feed_for_you_test.go @@ -0,0 +1,153 @@ +package api + +import ( + "fmt" + "testing" + "time" + + "api.audius.co/api/dbv1" + "api.audius.co/database" + "api.audius.co/trashid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// feedForYouFixtures builds a small graph that exercises the in-network +// and trending candidate sources plus the standard filters. +// +// user 1 = me (the viewer) +// user 2 = an artist I follow -> in-network candidates +// user 3 = an artist I do NOT follow -> trending candidate +// user 4 = an underground artist -> underground candidate +// user 7 = a deactivated artist -> filter test +// user 8 = saved artist -> already-saved filter +func feedForYouFixtures() database.FixtureMap { + now := time.Now() + hoursAgo := func(h int) time.Time { return now.Add(-time.Duration(h) * time.Hour) } + users := []map[string]any{ + {"user_id": 1, "handle": "me", "handle_lc": "me", "wallet": "0x0000000000000000000000000000000000000001"}, + {"user_id": 2, "handle": "followed", "handle_lc": "followed", "wallet": "0x0000000000000000000000000000000000000002"}, + {"user_id": 3, "handle": "trending_artist", "handle_lc": "trending_artist", "wallet": "0x0000000000000000000000000000000000000003"}, + {"user_id": 4, "handle": "underground", "handle_lc": "underground", "wallet": "0x0000000000000000000000000000000000000004"}, + {"user_id": 7, "handle": "deactivated", "handle_lc": "deactivated", "wallet": "0x0000000000000000000000000000000000000007", "is_deactivated": true}, + {"user_id": 8, "handle": "saved_artist", "handle_lc": "saved_artist", "wallet": "0x0000000000000000000000000000000000000008"}, + } + + aggregateUser := []map[string]any{ + {"user_id": 1, "follower_count": 0, "following_count": 1}, + {"user_id": 2, "follower_count": 100, "following_count": 10}, + {"user_id": 3, "follower_count": 5000, "following_count": 200}, + {"user_id": 4, "follower_count": 100, "following_count": 50}, + {"user_id": 7, "follower_count": 0, "following_count": 0}, + {"user_id": 8, "follower_count": 200, "following_count": 50}, + } + + tracks := []map[string]any{ + {"track_id": 101, "owner_id": 2, "title": "in-network 1", "created_at": hoursAgo(2)}, + {"track_id": 102, "owner_id": 2, "title": "in-network 2", "created_at": hoursAgo(10)}, + {"track_id": 201, "owner_id": 3, "title": "trending", "created_at": hoursAgo(72)}, + {"track_id": 301, "owner_id": 4, "title": "underground", "created_at": hoursAgo(50)}, + {"track_id": 801, "owner_id": 8, "title": "already saved", "created_at": hoursAgo(100)}, + {"track_id": 701, "owner_id": 7, "title": "deactivated artist track", "created_at": hoursAgo(2)}, + {"track_id": 901, "owner_id": 2, "title": "unlisted", "created_at": hoursAgo(2), "is_unlisted": true}, + {"track_id": 902, "owner_id": 2, "title": "deleted", "created_at": hoursAgo(2), "is_delete": true}, + } + + follows := []map[string]any{ + {"follower_user_id": 1, "followee_user_id": 2}, + } + + saves := []map[string]any{ + {"user_id": 1, "save_item_id": 801, "save_type": "track"}, + } + + trackTrendingScores := []map[string]any{ + {"track_id": 201, "score": 9_000_000_000, "time_range": "week"}, + {"track_id": 301, "score": 5_000_000_000, "time_range": "week"}, + } + + aggregateTrack := []map[string]any{ + {"track_id": 101, "save_count": 5, "repost_count": 2}, + {"track_id": 102, "save_count": 1, "repost_count": 0}, + {"track_id": 201, "save_count": 100, "repost_count": 50}, + {"track_id": 301, "save_count": 30, "repost_count": 10}, + } + + return database.FixtureMap{ + "users": users, + "aggregate_user": aggregateUser, + "tracks": tracks, + "follows": follows, + "saves": saves, + "track_trending_scores": trackTrendingScores, + "aggregate_track": aggregateTrack, + } +} + +// TestV1FeedForYou_RequiresValidUserId asserts the path :userId is +// validated by requireUserIdMiddleware — a junk hash id should be a 400, +// not silently treated as user 0. +func TestV1FeedForYou_RequiresValidUserId(t *testing.T) { + app := emptyTestApp(t) + status, _ := testGet(t, app, "/v1/users/not-a-real-id/feed/for-you") + assert.Equal(t, 400, status) +} + +// TestV1FeedForYou_EmptyFeedForNewUser asserts a brand-new user with no +// follows, no engagement, and no available candidates gets a 200 with an +// empty Data array — not a 500 or a crash. This is the cold-start case. +func TestV1FeedForYou_EmptyFeedForNewUser(t *testing.T) { + app := emptyTestApp(t) + app.skipAuthCheck = true + + // Only the viewer exists — no follows, no tracks, no trending rows, + // no engagement. There's nothing for the pipeline to surface. + database.Seed(app.pool.Replicas[0], database.FixtureMap{ + "users": []map[string]any{ + {"user_id": 42, "handle": "newbie", "handle_lc": "newbie", + "wallet": "0x0000000000000000000000000000000000000042"}, + }, + }) + + var response struct { + Data []dbv1.Track + } + path := "/v1/users/" + trashid.MustEncodeHashID(42) + "/feed/for-you" + status, body := testGet(t, app, path, &response) + require.Equal(t, 200, status, string(body)) + assert.Empty(t, response.Data, "new user with no graph should get an empty feed") +} + +// TestV1FeedForYou_PaginationDoesNotRepeat asserts that two consecutive +// pages of the diversity-ordered list don't overlap on track ids. +func TestV1FeedForYou_PaginationDoesNotRepeat(t *testing.T) { + app := emptyTestApp(t) + app.skipAuthCheck = true + database.Seed(app.pool.Replicas[0], feedForYouFixtures()) + + page := func(limit, offset int) []int32 { + var resp struct { + Data []dbv1.Track + } + path := fmt.Sprintf("/v1/users/%s/feed/for-you?limit=%d&offset=%d", + trashid.MustEncodeHashID(1), limit, offset) + status, body := testGet(t, app, path, &resp) + require.Equal(t, 200, status, string(body)) + ids := make([]int32, len(resp.Data)) + for i, tr := range resp.Data { + ids[i] = tr.TrackID + } + return ids + } + + first := page(2, 0) + second := page(2, 2) + + seen := map[int32]bool{} + for _, id := range first { + seen[id] = true + } + for _, id := range second { + assert.Falsef(t, seen[id], "track %d appeared on both pages", id) + } +} diff --git a/ddl/migrations/0198_track_trending_scores_for_you_idx.sql b/ddl/migrations/0198_track_trending_scores_for_you_idx.sql new file mode 100644 index 00000000..9e1135be --- /dev/null +++ b/ddl/migrations/0198_track_trending_scores_for_you_idx.sql @@ -0,0 +1,33 @@ +-- Partial index covering the slice of track_trending_scores that the +-- /v1/users/{id}/feed/for-you "trending" and "underground" candidate +-- sources hit on every request: +-- +-- WHERE type = 'TRACKS' +-- AND version = 'pnagD' +-- AND time_range = 'week' +-- AND (genre IS NULL OR genre = '') +-- ORDER BY score DESC, track_id +-- LIMIT 100/50 +-- +-- Without this index EXPLAIN shows a fixed ~12s scan of the full +-- track_trending_scores table for every request, regardless of caller. +-- This was the single biggest non-similar-artists cost in the original +-- For You endpoint (#807). +-- +-- Size budget: the matching slice is on the order of a few thousand +-- rows (one row per current week-trending track), so the index is small. +-- +-- NOTE: intentionally NOT wrapped in BEGIN/COMMIT so that +-- CREATE INDEX CONCURRENTLY can run without holding an ACCESS EXCLUSIVE +-- lock on track_trending_scores. IF NOT EXISTS makes the migration +-- idempotent. + +create index concurrently if not exists idx_track_trending_scores_for_you + on track_trending_scores (score desc, track_id) + where type = 'TRACKS' + and version = 'pnagD' + and time_range = 'week' + and (genre is null or genre = ''); + +comment on index idx_track_trending_scores_for_you is + 'Partial index for the For You feed trending/underground candidate sources; replaces a ~12s full-table scan with a small index seek.';