手把手教你用Python+Milvus搭建一个简易Look-alike系统(附Graph Embedding代码)

张开发
2026/4/19 10:49:29 15 分钟阅读

分享文章

手把手教你用Python+Milvus搭建一个简易Look-alike系统(附Graph Embedding代码)
从零构建用户相似度挖掘系统基于Graph Embedding与Milvus的实战指南在数字营销和个性化推荐领域寻找与种子用户相似的目标人群Look-alike一直是核心挑战。传统基于规则标签的方法不仅效率低下而且难以捕捉用户间复杂的非线性关系。本文将带你用Python构建一个完整的Look-alike系统涵盖用户关系图构建、EGES算法实现、Milvus向量数据库集成等关键环节。1. 系统架构设计与技术选型一个完整的Look-alike系统通常包含数据层、算法层和服务层三个核心模块。我们选择的技术栈组合兼顾了效果与工程落地性数据处理层Pandas NetworkX图嵌入算法EGES增强型Graph Embedding向量检索Milvus 2.0服务封装FastAPI技术对比分析方案类型代表算法优点缺点适用场景基于标签TGI扩散实现简单精度有限冷启动阶段基于协同过滤ItemCF无需特征工程稀疏性问题行为数据丰富场景基于图网络EGES关系捕捉能力强计算成本高社交/行为关联强场景基于深度学习GNN自动特征学习训练复杂超大规模数据提示EGES(Enhanced Graph Embedding with Side Information)是BGE算法的改进版通过融合节点属性信息提升嵌入质量特别适合用户行为稀疏的场景。2. 用户关系图构建实战用户关系图是Graph Embedding的基础其构建质量直接影响最终效果。我们从模拟数据生成开始演示完整的图构建流程。2.1 模拟数据生成import pandas as pd import numpy as np # 生成10000个模拟用户 users [fuser_{i} for i in range(10000)] # 用户属性年龄、性别、城市 attrs { age: np.random.randint(18, 60, size10000), gender: np.random.choice([M,F], size10000), city: np.random.choice([北京,上海,广州,深圳], size10000) } # 用户行为数据物品交互序列 items [fitem_{i} for i in range(500)] behavior_data [] for user in users: for _ in range(np.random.randint(5, 20)): behavior_data.append({ user_id: user, item_id: np.random.choice(items), timestamp: pd.Timestamp.now() - pd.Timedelta(minutesnp.random.randint(0, 10080)) }) df_behavior pd.DataFrame(behavior_data)2.2 图构建策略用户关系边的定义需要结合业务场景常见构建方式包括共现关系共同交互过相同物品的用户建立边时序关系连续交互行为的用户形成边属性相似 demographic特征相似的用户连接import networkx as nx from collections import defaultdict # 基于物品共现构建用户关系图 G nx.Graph() user_item_map defaultdict(set) # 建立用户-物品二分图 for _, row in df_behavior.iterrows(): user_item_map[row[user_id]].add(row[item_id]) # 计算用户间Jaccard相似度作为边权重 users list(user_item_map.keys()) for i in range(len(users)): for j in range(i1, len(users)): set_i user_item_map[users[i]] set_j user_item_map[users[j]] intersection len(set_i set_j) union len(set_i | set_j) if union 0 and intersection 2: # 至少两个共同物品 weight intersection / union G.add_edge(users[i], users[j], weightweight)注意实际业务中需要根据数据特点调整边权重计算方式例如引入时间衰减因子。3. EGES算法实现与优化EGES算法核心思想是在DeepWalk基础上融合节点属性信息其模型架构包含以下几个关键组件主嵌入层学习节点拓扑结构特征属性嵌入层处理节点属性特征注意力机制动态融合多特征源3.1 基础实现import torch import torch.nn as nn import torch.nn.functional as F class EGES(nn.Module): def __init__(self, num_nodes, num_attrs, embed_dim64): super(EGES, self).__init__() self.node_embed nn.Embedding(num_nodes, embed_dim) self.attr_embeds nn.ModuleList([ nn.Embedding(num_attrs[attr], embed_dim) for attr in num_attrs ]) self.attention nn.Linear(embed_dim, 1) def forward(self, node_ids, attr_ids): # 主嵌入 h_node self.node_embed(node_ids) # 属性嵌入 attr_embs [] for i, embed in enumerate(self.attr_embeds): attr_embs.append(embed(attr_ids[:,i])) # 注意力权重 all_embs torch.stack([h_node] attr_embs, dim1) attn_scores F.softmax(self.attention(all_embs), dim1) # 加权融合 final_embed torch.sum(attn_scores * all_embs, dim1) return final_embed3.2 训练技巧负采样优化采用动态负采样策略对高频节点增加采样概率多任务学习同时优化链接预测和属性预测任务渐进式训练先训练主嵌入层再联合训练属性嵌入def train_epoch(model, data_loader, optimizer): model.train() total_loss 0 for batch in data_loader: optimizer.zero_grad() # 正样本 pos_nodes, pos_attrs batch[pos] pos_emb model(pos_nodes, pos_attrs) # 负样本 neg_nodes, neg_attrs batch[neg] neg_emb model(neg_nodes, neg_attrs) # 损失计算 pos_scores torch.sum(pos_emb[::2] * pos_emb[1::2], dim1) neg_scores torch.sum(pos_emb[::2] * neg_emb[1::2], dim1) loss -torch.mean(torch.log(torch.sigmoid(pos_scores - neg_scores))) loss.backward() optimizer.step() total_loss loss.item() return total_loss / len(data_loader)4. Milvus向量数据库集成Milvus作为专为向量检索优化的数据库能够高效处理大规模相似度查询。我们重点介绍部署配置和性能优化技巧。4.1 集群部署方案对于千万级用户规模的系统推荐采用分布式集群部署version: 3.5 services: milvus: image: milvusdb/milvus:v2.0.0 ports: - 19530:19530 environment: - ETCD_ENABLEDtrue - MINIO_ENABLEDtrue volumes: - ./volumes/milvus:/var/lib/milvus deploy: resources: limits: cpus: 4 memory: 8G etcd: image: quay.io/coreos/etcd:v3.5.0 ports: - 2379:2379 environment: - ETCD_AUTO_COMPACTION_RETENTION1 volumes: - ./volumes/etcd:/etcd minio: image: minio/minio:RELEASE.2021-06-17T00-10-46Z ports: - 9000:9000 environment: - MINIO_ACCESS_KEYminioadmin - MINIO_SECRET_KEYminioadmin volumes: - ./volumes/minio:/data4.2 性能优化参数通过调整以下参数可以显著提升查询性能from pymilvus import Collection, connections, FieldSchema, CollectionSchema, DataType # 连接配置 connections.connect( default, hostlocalhost, port19530, secureFalse ) # 集合Schema定义 fields [ FieldSchema(nameuser_id, dtypeDataType.VARCHAR, is_primaryTrue, max_length64), FieldSchema(nameembedding, dtypeDataType.FLOAT_VECTOR, dim64) ] schema CollectionSchema( fieldsfields, descriptionUser embedding collection, enable_dynamic_fieldFalse ) # 创建集合 collection Collection( nameuser_embeddings, schemaschema, consistency_levelStrong ) # 索引配置 index_params { index_type: IVF_FLAT, metric_type: IP, # 内积相似度 params: {nlist: 1024} } collection.create_index( field_nameembedding, index_paramsindex_params ) # 加载优化 collection.load( replica_number2, _refreshFalse, _timeout30 )提示生产环境建议nlist设置为集群CPU核心数的4-8倍查询时nprobe设置为nlist的1/16到1/8。5. 效果评估与线上部署完整的Look-alike系统需要建立科学的评估体系我们设计了三层评估指标离线指标覆盖率Coverage扩展人群对种子人群的覆盖比例相似度Similarity向量空间余弦距离分布在线指标CTR提升率转化成本下降率业务指标ROI投资回报率用户留存率变化A/B测试结果示例策略人群规模CTRCVR单次转化成本规则扩展50万1.2%0.8%¥25.6EGESMilvus50万2.7%1.5%¥18.3EGESMilvus200万2.1%1.2%¥20.1在Docker化部署时建议采用以下服务架构├── app/ │ ├── api/ # FastAPI接口 │ ├── models/ # 算法模型 │ ├── services/ # 业务逻辑 │ └── config.py ├── docker-compose.yml ├── Dockerfile └── requirements.txt接口关键实现示例from fastapi import FastAPI from pymilvus import Collection import numpy as np app FastAPI() collection Collection(user_embeddings) app.post(/lookalike) async def find_similar_users( seed_users: list[str], top_k: int 1000, expand_ratio: float 10.0 ): # 获取种子用户向量 seed_embeddings get_embeddings(seed_users) # 相似度搜索 search_params { metric_type: IP, params: {nprobe: 128} } results collection.search( dataseed_embeddings, anns_fieldembedding, paramsearch_params, limitint(len(seed_users)*expand_ratio), output_fields[user_id] ) # 结果聚合与去重 similar_users set() for hits in results: for hit in hits: similar_users.add(hit.entity.get(user_id)) return { similar_users: list(similar_users)[:top_k], count: len(similar_users) }在实际项目中我们遇到了几个典型问题及解决方案冷启动问题新用户缺乏行为数据时采用属性相似度作为补充数据稀疏性引入跨域行为数据如浏览、搜索、购买丰富关系图在线性能实现多级缓存策略对高频查询结果缓存5-10分钟

更多文章