DMTX / DataClean / Identity Rule

DMTX 联系人组合主键能力深入开发方案

基于 upload、ETL/DataClean、多主键配置和最终联系人写入链路重梳理,明确当前系统已支持多个主键字段,但运行时是 OR 任一主键匹配;本方案新增兼容模式与组合 AND 规则模式。

v2深入开发评审版
OR→AND核心语义差异
35-50ETL/DataClean MVP 人天
75-110平台化完整版本人天

版本:v2 深入开发评审版

日期:2026-06-05

需求来源:客户场景需求(酒店集团会员身份识别)

适用系统:DMTX 联系人、联系人字段配置、ETL/SFTP/API 导入、DataClean、表单、事件、SCRM/企微、分群、旅程、评分、退订查询

目标用户:租户管理员配置身份识别规则;运营人员在导入、同步、分群、旅程中受益

重要修订:本版纠正“DMTX 不支持多个主键”的旧表述。当前系统已经支持多个 if_pk=true 字段,真正缺口是这些字段在 DataClean 中按 OR / 任一主键命中 使用,而不是按 AND / 一组字段同时命中 使用。


1. 结论先行

1.1 当前系统真实现状

经过重新阅读 upload、ETL、DataClean、多主键配置和最终联系人写入链路,结论如下:

结论说明
已支持多个主键字段联系人字段元数据 metadata_field.if_pk / ifPk 可以同时有多个字段为 true。AttributesUtils.userPks(cid) 返回所有 ifShow && ifPk 字段。
Email/Mobile 可取消主键通过 AttributesResource.updateAttributeMetadataService.updateField 修改 if_pk,Email/Mobile 作为特殊预置主键受权限、次数、问卷使用状态约束。
可新增自定义字段为主键非预置联系人字段可以通过同一字段更新接口设置 isPk=true,不走 Email/Mobile 的特殊限制。
当前不是组合主键语义DataClean 查询时把导入行里有值的多个主键拼成 pk1 = ? OR pk2 = ?,任一字段命中即可拉出联系人候选。
多命中会自动合并DataClean.handleUsers 规则是:0 个候选新增、1 个候选更新、多个候选调用 SourceCommon.merge 合并。
pkPriority 暂未生效DTO 中存在 pkPriority,但在已检查的 AttributesUtilsDataCleanPostgreSQLSourceSourceCommon 中不参与查询顺序、冲突处理或合并胜出规则。
存在两套导入身份逻辑旧联系人分组导入走 ContactGroupUploadService + ContacatImportDisposeService,直接按 Email/Mobile SQL 合并;ETL/SFTP/API 导入走 DataClean 和元数据主键。

1.2 客户需求与当前能力的差距

酒店集团诉求不是“可以勾选多个主键字段”,因为 DMTX 已经能做到这一点。客户真正要的是:

cardNo + email 这类规则,必须两个字段组成一组同时命中同一联系人,才认为是同一业务身份;只命中 email 不能把酒店 A 和酒店 B 的会员自动合并。

当前系统的实际行为更接近:

流程 / 代码片段
if (email 命中联系人 A) OR (cardNo 命中联系人 B) then 返回 A/B 候选
if 候选数 > 1 then 自动合并为一个联系人

客户期望行为是:

流程 / 代码片段
if (email AND cardNo) 同时命中同一联系人 then 更新
else if 完整组合没有命中 then 新增
else if 组合字段分别命中不同联系人 then 标记冲突,不静默合并

1.3 推荐方案

推荐采用“兼容模式 + 组合规则模式”的身份规则层:

模式语义适用范围
ANY_KEY_COMPATIBLE当前行为。多个 if_pk 字段是多个可替代识别信号,任一字段命中即可匹配;多候选继续走合并。所有存量租户默认保持。
GROUPED_COMPOSITE新行为。一组字段内部是 AND;只有完整组有值才可参与匹配;多个完整组之间可以 OR,但所有命中结果必须指向同一联系人,否则进入冲突。需要“同邮箱不同会员/不同门店关系不合并”的租户。

核心原则:

  1. 不改 contact.id 物理主键,不把 MySQL/PG 联合主键直接搬到 contact 表。
  2. 保留 customer_id / contact_id / user_ids_str 的现有内部身份模型。
  3. if_pk 继续作为“可参与身份识别的字段”标记,不破坏现有多主键配置。
  4. 新增一层规则计划,决定这些主键字段按 OR 使用,还是按组合组 AND 使用。
  5. 先覆盖 ETL/SFTP/API/DataClean 主链路;旧联系人分组导入必须单独说明或迁移,否则会出现同一张 contact 表由两套身份引擎维护。

2. 需求与业务场景

2.1 客户原始需求

以当前沟通描述作为客户需求记录:

酒店集团场景中,同一个人在酒店 A 和酒店 B 的卡号不同,但是邮箱相同,喜好也不同。按照目前 DMT 的设计思路,只能识别为同一个联系人,字段会被覆盖。希望支持联合主键,例如设置“卡号 + 邮箱”为联合主键,让属性不要被合并覆盖。

2.2 目标示例

输入数据期望身份结果说明
hotel=A, cardNo=A001, email=a@example.com, prefer=大房联系人 1酒店 A 会员关系。
hotel=B, cardNo=B889, email=a@example.com, prefer=高楼层联系人 2邮箱相同,但卡号不同,不合并。
hotel=A, cardNo=A001, email=a@example.com, prefer=安静房更新联系人 1完整组合一致,更新同一会员关系。
hotel=A, cardNo=A001, email=b@example.com,但 cardNo=A001email=b@example.com 分别命中不同联系人冲突,不自动合并避免错误合并和属性覆盖。

2.3 验收标准

验收项明确定义
保持存量兼容未启用组合规则的租户,Email/Mobile/自定义多主键继续按现有 OR 匹配和合并逻辑运行。
可配置组合组管理员可以选择 2 到 5 个联系人字段组成一个组合组,例如 cardNo + email
完整组校验组合模式下,导入行至少要提供一个完整组合组;只提供组内部分字段应被拒绝或进入错误记录。
AND 匹配组合组内部必须所有字段同时匹配同一联系人,才算更新。
冲突可见组合字段分别命中不同联系人时,不调用 SourceCommon.merge,而是记录冲突、导入失败或进入人工处理。
最终写入不重构CustomerService 仍写入单条 contact 记录,contact_id/customer_id/user_ids_str 下游契约保持。
旧导入路径可解释对旧 /api/contact/v1/group/import 路径明确是迁移、禁用,还是保留为 legacy 行为。
可灰度回退公司级 feature flag 可从 GROUPED_COMPOSITE 回退到 ANY_KEY_COMPATIBLE

3. 竞品和行业经验

产品身份模型可借鉴点
Salesforce Marketing CloudContact Key / Subscriber Key 是跨 Email、SMS、Push 的稳定身份。官方通常不建议把 email 当 Subscriber Key,因为同一 email 可以对应多个业务身份。DMTX 应把组合主键定位为“业务身份识别规则”,不是物理表主键。稳定外部 ID 优先,组合业务键次之。
HubSpot默认 Email 去重;API 支持 Record ID、Email 或自定义唯一属性 idProperty;导入冲突时有跳过/报错语义。导入前预检和冲突提示比静默合并更重要。
KlaviyoProfile 至少需要 email、phone、external_id 之一;external_id 用法不一致会产生重复 Profile,官方有明确风险提示。组合键字段必须稳定、规范化、一致随数据写入,否则重复/冲突不可避免。

行业共识:

  1. Email 不应被视为绝对唯一的人。
  2. 企业级营销系统应保留内部 surrogate ID,同时支持业务身份规则。
  3. 身份规则变化必须有预检、审计、冲突处理和回退路径。
  4. 错误合并的成本通常高于保守地产生冲突记录。

4. 当前调用链重新梳理

4.1 旧联系人分组导入链路,未走 DataClean

这条链路是传统联系人分组导入,直接写 contact 表:

Legacy import flow:旧联系人分组导入链路

预览链路只解析文件,不写联系人;正式导入链路通过 Redis 队列按公司串行执行,最后由 ContacatImportDisposeService 直接写 contact 表,不经过 DataClean。

预览链路

  1. ContactInfoResource.uploadPretreat/api/contact/v1/group/file 接收预览上传请求。
  2. ContactGroupUploadService.uploadPretreat校验文件、识别编码和表头,准备预览数据。
  3. readPretreatCsv返回预览行、字段映射和导入前信息。

正式导入链路

  1. ContactInfoResource.uploadContact/api/contact/v1/group/import 接收正式导入请求。
  2. ContactGroupUploadService.contactImport@Async异步处理导入任务。
  3. Redis 串行队列qdum:contactImport:{companyId} 确保同公司导入串行。
  4. contactSave / uploadCsv / getContactDBObject解析文件并组装待写入联系人对象。
  5. ContacatImportDisposeService.ContactImportSaveOrUpdate按 Email/Mobile 逻辑分桶并决定新增、更新、合并。
  6. 分桶处理
    contactOnlyEmailList

    仅 Email。

    contactOnlyMobileList

    仅 Mobile。

    contactEmailMobileList

    Email + Mobile。

  7. SqlConnectionAgent直接执行数据库写入和清理。
    • saveBatch
    • executeBatch
    • delete

关键点:

  • ContacatImportDisposeService.ContactImportSaveOrUpdateonlyEmailonlyMobileemailMobile 分桶。
  • 匹配逻辑硬编码 Email/Mobile,不读取 metadata_field.if_pk
  • Email 和 Mobile 分别命中不同联系人时,会合并字段/分组并删除其中一条联系人。
  • 新增时 contact_idcustomer_id 都设为新 UUID。

结论:如果产品说“用户 upload 上传开始就支持组合主键”,必须先确认当前前端使用的是这条 legacy 链路还是 ETL SFTP 链路。若仍使用 legacy 链路,酒店 MVP 不能只改 DataClean。

4.2 ETL/SFTP/API 导入链路,走 DataClean

这条链路是本方案优先改造的主链路:

ETL / DataClean flow:本方案优先改造主链路

ETL/SFTP/API 导入会进入 Kafka 和 DataClean,由元数据主键参与身份匹配,再通过 CustomerService 完成联系人新增或更新。

主处理流水线

  1. EtlSftpResource.uploadCsv / unpackCsv接收并解包 ETL 文件上传。
  2. AbstractFileImportService.rowData2ContactData将 CSV 行转换为联系人导入数据。
  3. 抽取匹配和去重信号
    • pkAndValue
    • pkMd5Summary
    • isRepeat
  4. KafkaFirstConsumer.consume消费导入消息并进入 DataClean。
  5. DataClean.batchClean / batchCleanDo执行批量清洗、身份解析和存储分发。
  6. searchUserByPks在无 uid 时根据当前主键配置查找联系人。
  7. AttributesUtils.userPks / setPkValue读取公司主键字段并从导入行提取有值主键。
  8. PostgreSQLSource.search查询 PostgreSQL 候选联系人;当前为 OR 任一主键匹配。
  9. handleUsers根据候选数量决定动作。
    add

    0 个候选,新增联系人。

    update

    1 个候选,更新联系人。

    merge

    多个候选,当前会合并。

  10. 构造存储 DTO
    • UserStorageDTO
    • DeleteStorageDTO
    • ChangeStorageDTO
  11. DataClean.storage将清洗结果转换为最终写入对象。
  12. CustomerService.singleSaveOrUpdate进入最终联系人保存/更新服务。
  13. saveContact / updateContact完成 contact 表写入。

API/SCRM 入口也会进入类似 DataClean 链路:

流程 / 代码片段
ELTApiService.userAnalysis / addOrDeleeCustomerToEtl
  -> EtlServiceApi.innerApiImport
  -> EtlApiResource.innerApiImport
  -> EtlApiService.innerApiImport
  -> ImportService.singleImport
  -> Redis etlImportServiceSingle:{cid}
  -> ImportRedisService.singleImportServiceByCid
  -> KafkaFirstConsumer.consume
  -> DataClean.batchCleanDo
  -> CustomerService.singleSaveOrUpdate

4.3 DataClean 当前多主键语义

关键代码事实:

文件/方法当前行为
AttributesUtils.userPks(Long cid)返回所有 metadataFieldDTO.ifShow && metadataFieldDTO.ifPk 的联系人字段。
AttributesUtils.setPkValue(...)从导入行中提取有值的主键字段;只要一个有值即可。
DataClean.searchUserByPks(...)公司没有配置主键时报 Lack of primary key of user;行里没有任何主键值时报 need one primary key at least;否则调用 source.search(cid, pkValues)
PostgreSQLSource.search(...)动态生成 field1 = ? or field2 = ?,按 create_date asc limit 1000 返回候选。
DataClean.handleUsers(...)0 个候选新增;1 个候选更新;多个候选调用 SourceCommon.merge
SourceCommon.merge(...)第一个候选保留 customer_id/contact_id;后续候选进入删除列表;非空字段、标签、user_ids_str 合并。
DataClean.storage(...)UserStorageDTODataForEsItemDTO 后调用 CustomerService.singleSaveOrUpdate
CustomerService.updateContact(...)where contact_id = ? 更新,且不允许直接更新 contact_id

当前语义可以概括为:

流程 / 代码片段
配置多个主键字段 = 多个可替代的身份信号
导入行至少提供一个信号即可
任一信号命中即可匹配
多个信号命中不同联系人时自动合并

这不是客户要的组合主键。

4.4 表单和事件路径也受影响

组合主键不能只改 type=user

  • type=eventDataClean.batchCleanDo 中当没有 uid 时也调用 searchUserByPks
  • type=form_user 会调用 searchUserByPkssearchFormUsersearchFormUser 会把 customer_id 追加到 PK 列表再调用 source.search(...)
  • 如果只改普通用户导入,表单提交和事件导入仍可能按 OR 逻辑错误合并或错误创建。

4.5 后台定时合并也是风险点

EsUser.mergeUsers(Long cid) 会遍历 attributesUtils.userPks(cid) 的每个主键字段,然后逐个调用 source.merge(cid, pk)。这意味着即使 DataClean 改成组合规则,只要这个定时任务仍按单字段主键合并,就会继续把“同邮箱不同会员卡”的联系人合并掉。

组合模式下必须:

  1. 禁用该公司单字段定时合并,或
  2. 改为按完整组合组执行合并,且冲突进入治理流程。

5. 目标架构

5.1 身份规则模型

推荐新增显式规则模型,而不是把多个 if_pk=true 直接解释成组合主键。

兼容层

没有配置新规则的公司,运行时合成一个兼容计划:

流程 / 代码片段
mode = ANY_KEY_COMPATIBLE
activeFields = all fields where ifShow && ifPk
runtimeGroups = 每个 PK 字段一个单字段组

组合层

启用组合规则的公司,运行时读取显式分组:

流程 / 代码片段
mode = GROUPED_COMPOSITE
groups:
  - groupCode: hotel_member
    fields: [cardNo, email]
    match: ALL_FIELDS
    conflictPolicy: REJECT_OR_PENDING

5.2 数据模型建议

平台化方案建议新增规则表;如果只做 MVP,也可以先在 metadata_field 增加分组字段,但不建议长期把规则塞进字段表。

推荐表结构

流程 / 代码片段
CREATE TABLE contact_identity_rule (
  id BIGSERIAL PRIMARY KEY,
  company_id BIGINT NOT NULL,
  object_type VARCHAR(32) NOT NULL DEFAULT 'contact',
  mode VARCHAR(32) NOT NULL DEFAULT 'ANY_KEY_COMPATIBLE',
  conflict_policy VARCHAR(32) NOT NULL DEFAULT 'REJECT',
  enabled BOOLEAN NOT NULL DEFAULT TRUE,
  version INT NOT NULL DEFAULT 1,
  status INT NOT NULL DEFAULT 0,
  create_by BIGINT,
  create_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  update_by BIGINT,
  update_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE contact_identity_rule_group (
  id BIGSERIAL PRIMARY KEY,
  rule_id BIGINT NOT NULL,
  group_code VARCHAR(64) NOT NULL,
  group_name VARCHAR(128),
  match_type VARCHAR(32) NOT NULL DEFAULT 'ALL_FIELDS',
  priority INT NOT NULL DEFAULT 0,
  status INT NOT NULL DEFAULT 0
);

CREATE TABLE contact_identity_rule_field (
  id BIGSERIAL PRIMARY KEY,
  group_id BIGINT NOT NULL,
  metadata_field_id BIGINT NOT NULL,
  field_name VARCHAR(128) NOT NULL,
  required BOOLEAN NOT NULL DEFAULT TRUE,
  order_no INT NOT NULL DEFAULT 0,
  normalize_type VARCHAR(32)
);

可选 MVP 字段

如果为了压缩开发范围,可以先在 metadata_field 增加:

字段含义
pk_rule_modeANY_KEY_COMPATIBLEGROUPED_COMPOSITE,也可放公司级配置表。
pk_group_code同组字段共享一个 group code。
pk_group_required组合组内是否必填。

但这种方案后续支持多组、版本、审计、回滚会吃力,建议只作为临时 MVP。

5.3 Java 运行时 DTO

建议新增不可变 DTO/record,便于测试和避免 DataClean 继续拿扁平 list 判断。

流程 / 代码片段
public enum ContactIdentityRuleMode {
    ANY_KEY_COMPATIBLE,
    GROUPED_COMPOSITE
}

public record ContactIdentityRulePlan(
    Long companyId,
    ContactIdentityRuleMode mode,
    List<ContactIdentityGroupPlan> groups
) {}

public record ContactIdentityGroupPlan(
    String groupCode,
    List<String> fieldNames,
    boolean complete,
    Map<String, String> normalizedValues
) {}

public record ContactIdentityMatchResult(
    boolean success,
    boolean conflict,
    String errorCode,
    String errorMessage,
    Map<String, List<LinkedHashMap<String, Object>>> matchesByGroup,
    List<LinkedHashMap<String, Object>> candidates
) {}

核心要求:GROUPED_COMPOSITE 不能再把查询结果压扁成 List<LinkedHashMap<...>> 后直接交给 handleUsers,否则不同组合组命中不同联系人时仍会被误判为“多个候选可合并”。


6. 从用户上传到最终写入的改造顺序

Step 1:配置层先显式建模,默认兼容

修改点文件/类
新增规则表或字段migration / MetadataField / 新规则 domain
DTO 暴露 rule mode / groupAttributesDTOAddOrUpdateFieldDTOMetadataFieldDTO
管理端接口AttributesResource.updateAttribute,建议新增 ContactIdentityRuleResource
字段更新后副作用MetadataService.updateFieldupdateFieldSuccessfulAfter
缓存失效ContactMetaDataService.affected_changed,补充 ContactCustomerFieldService Redis 缓存清理

注意事项:

  • 老 payload 不传新字段时必须默认 ANY_KEY_COMPATIBLE
  • Email/Mobile 取消主键的原有限制不能被绕过。
  • 需要检查 MetadataService.updateFieldSuccessfulAfter 中问卷 PKConvertUpdate 返回码判断,避免规则更新出现“元数据已变更但外部同步失败”的半成功状态。

Step 2:在 AttributesUtils 统一生成规则计划

新增类似方法:

流程 / 代码片段
public ContactIdentityRulePlan contactIdentityRulePlan(Long cid);
public ContactIdentityRowPlan buildRowPlan(Long cid, Map<String, Object> properties);

行为:

模式行级校验
ANY_KEY_COMPATIBLE至少一个 active PK 有值即可。
GROUPED_COMPOSITE至少一个完整 group 有值;组内任一必填字段缺失时该 group 不可参与匹配,并返回明确原因。

Step 3:上传前校验和文件内去重先改

修改点文件/方法
SFTP/ETL 文件行解析AbstractFileImportService.rowData2ContactDatarowData2ContactPkDatacontactData
API 导入校验EtlApiService.innerApiImport
SDK 校验EtlSdkService 主键校验块

现状:pkMd5Summary 基于导入行中有值的 PK map 计算,重复行 isRepeat=true 但仍发送 Kafka。

组合模式建议:

  1. GROUPED_COMPOSITE 下 hash 应基于完整组合组的规范化值。
  2. 组内字段不完整,应在上传阶段报错,不要等 DataClean 里变成模糊新增。
  3. 文件内完全重复行建议直接 skip 或记录为明确“重复跳过”,不要一边计数去重,一边仍下发 Kafka 更新数据。

Step 4:PostgreSQLSource 从 OR 查询升级为规则查询

当前查询:

流程 / 代码片段
select * from contact
where email = ? or mobile = ? or card_no = ?
order by create_date asc
limit 1000

组合模式应生成:

流程 / 代码片段
select contact_id, customer_id, create_date
from contact
where (email = ? and card_no = ?)
order by create_date asc
limit 1000

多个完整组时:

流程 / 代码片段
where (email = ? and card_no = ?)
   or (unionid = ? and appid = ?)

必须处理:

  • 字段名只能来自元数据白名单并转换为合法列名,禁止用户输入字段直接拼接 SQL。
  • 任一组合组必填字段规范化失败时 fail closed,不能降级成单字段查询。
  • 查询异常不能返回空列表让 DataClean 当作新增,否则会产生重复联系人。
  • 先查询窄字段,再按 contact_id 批量 hydrate 完整行,减少 select * 成本。

Step 5:DataClean 改为消费结构化匹配结果

涉及方法:

文件/方法改造要求
DataClean.searchUserByPks返回结构化结果,区分 no match、one match、merge candidates、conflict、lookup error。
DataClean.handleUsers保留兼容模式原行为;组合模式下只有合法重复才允许 merge。
DataClean.searchFormUser表单提交同样使用组合规则;不能继续追加 customer_id 后走扁平 OR。
type=event 分支事件无 uid 时也走组合规则;不完整组合组按事件导入错误处理。
DataCleanSearchUserDTO增加 conflict/error 类型或新 DTO,避免只靠 errMsg 字符串。

组合冲突建议策略:

场景处理
完整组合组无命中新增联系人。
完整组合组命中 1 个联系人更新该联系人。
同一完整组命中多个联系人这是历史重复数据,进入“重复治理/可合并”流程。
多个完整组命中同一联系人更新该联系人。
多个完整组命中不同联系人冲突,不自动 merge,写冲突记录。
查询异常/字段转换异常导入失败,不允许当作新增。

Step 6:SourceCommon.merge 只处理真正重复,不处理组合冲突

当前 SourceCommon.merge 会:

  • 第一个候选保留 uid/pid
  • 后续候选加入 uids/pids 删除列表。
  • 非空字段合并,后面的非空值可覆盖前面的非空值。
  • label_idsuser_ids_str 合并。

组合模式下,SourceCommon.merge 不应该承担“判断是否应该合并”的职责。判断必须在 DataClean/Resolver 层完成;只有确认这些候选是同一业务身份的重复联系人,才调用 merge。

Step 7:CustomerService 保持行模型,但修复写入结果契约

涉及方法:

  • CustomerService.singleSaveOrUpdate
  • CustomerService.saveCustomerToEs
  • CustomerService.saveContact
  • CustomerService.updateContact
  • DataClean.storage

建议:

  1. saveContact/updateContact 失败时返回明确失败项,不要只返回 0 后让上层继续记成功。
  2. singleSaveOrUpdate 返回 per item 写入结果。
  3. DataClean.storage 只有真实写成功才更新 SFTP 成功计数和 importRecordAsyncService.recordSuccess
  4. 组合冲突不进入写入队列。

这一步不是组合主键的核心功能,但关系到导入可信度。当前链路存在“DB 写失败但导入记录可能记成功”的静默失败风险,应纳入实施。

Step 8:定时合并任务组合化或禁用

涉及:

  • EsUser.mergeUsers(Long cid)
  • PostgreSQLSource.merge(cid, pk)

组合模式下不能继续按每个单字段主键做合并。建议:

公司模式定时合并策略
ANY_KEY_COMPATIBLE保持现有逐字段合并。
GROUPED_COMPOSITE按完整组合组执行重复治理;单字段合并禁用。

Step 9:Legacy 上传链路明确阶段策略

旧链路不走 DataClean:

流程 / 代码片段
ContactInfoResource.uploadContact
  -> ContactGroupUploadService.contactImport/contactSave/uploadCsv
  -> ContacatImportDisposeService.ContactImportSaveOrUpdate
  -> direct SQL contact writes

如果酒店客户的“用户 upload 上传”实际使用这条路径,则有三种选择:

方案说明推荐度
A. 迁移 legacy 导入到 ETL/DataCleanContactGroupUploadService 只负责解析和状态,行数据委托 ETL 导入。推荐,长期一致。
B. 在 ContacatImportDisposeService 复制组合逻辑短期快,但会形成两套规则引擎。不推荐。
C. 明确 phase 1 不支持 legacy 导入只允许客户使用 ETL/SFTP/API 路径。可作为内部试点,但产品口径必须清楚。

本方案建议:酒店 MVP 如果必须覆盖 UI 上传,优先做 A;否则只能称为“ETL 导入支持组合主键”,不能称为“全量上传支持”。

Step 10:下游以 customer_id 连续性做回归

组合主键不应把组合字段传递到所有下游表作为新关联键。下游仍用 customer_id/contact_id/user_ids_str

下游重点验证
分群 ContactSwarmService基于 customer_id 的 join 不丢人、不重复。
实时旅程 JourneyMsgServiceold/new customer_id 能通过 user_ids_str 找到当前状态。
评分 ScoreCollationService历史 event customer_id 能归并到当前联系人。
退订 ContactSubscriptionServicecustomer_id/email 查询状态一致,不产生重复退订记录。
SCRM/企微 ScrmImportApiServicecustomer_id -> contact_id 查找稳定,openid/subscribe 字段写到正确联系人。

7. 数据库、索引和并发设计

7.1 预审计

启用组合模式前必须做数据审计:

流程 / 代码片段
-- contact_id 重复检查
select contact_id, count(*)
from contact
where if_deleted = 0
group by contact_id
having count(*) > 1;

-- customer_id 重复检查
select customer_id, count(*)
from contact
where if_deleted = 0
group by customer_id
having count(*) > 1;

-- 酒店组合键潜在重复检查示例
select email, card_no, count(*)
from contact
where if_deleted = 0
  and email is not null
  and card_no is not null
group by email, card_no
having count(*) > 1;

7.2 索引策略

当前单字段索引不足以支撑 (email = ? AND card_no = ?)。组合组启用前应创建组合索引:

流程 / 代码片段
create index concurrently if not exists idx_contact_cid_email_cardno
on contact (email, card_no, create_date)
where if_deleted = 0;

建议:

  • 按组合组创建索引,不为所有排列组合建索引。
  • 字段顺序与规则组字段顺序一致。
  • create_date 可作为尾列支持现有 oldest wins 的查询排序。
  • 规则变更时需要索引生命周期:创建、验证、启用、废弃旧索引。

7.3 并发控制

当前 search → merge → update/delete 不是一个强事务闭环。并发导入同一组合键时可能重复新增或重叠合并。

建议在组合模式下增加锁:

流程 / 代码片段
lock key = companyId + ruleGroupCode + normalizedCompositeValuesHash

实现选择:

方式说明
PostgreSQL advisory lock简单,适合同库内串行化同一组合键。
Redis lock跨服务更通用,但要处理过期和重入。
候选行 FOR UPDATE对已存在候选有效,对新增竞争仍需唯一约束或 advisory lock。

MVP 推荐使用 advisory lock 或 Redis lock,至少保证同一公司同一组合键不会并发新增两条。

7.4 冲突持久化

组合冲突不能只写日志,应落库或进入可查询队列:

流程 / 代码片段
CREATE TABLE contact_identity_conflict (
  id BIGSERIAL PRIMARY KEY,
  company_id BIGINT NOT NULL,
  rule_mode VARCHAR(32) NOT NULL,
  group_code VARCHAR(64),
  normalized_key VARCHAR(512) NOT NULL,
  source_type VARCHAR(32),
  source_id BIGINT,
  file_id BIGINT,
  row_no INT,
  payload JSONB,
  matched_contact_ids JSONB,
  status VARCHAR(32) NOT NULL DEFAULT 'PENDING',
  reason VARCHAR(256),
  create_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  update_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

用途:

  • 导入结果页展示冲突数量。
  • 管理员可下载冲突明细。
  • 后续可做人工合并、忽略、重跑。

8. API 和用户体验

8.1 管理员配置体验

建议配置步骤不超过 3 步:

  1. 进入“联系人字段 / 身份识别规则”。
  2. 选择模式:兼容多主键 or 组合主键。
  3. 选择组合字段并运行预检,预检通过后启用。

默认值:

配置项默认值
新租户/存量租户ANY_KEY_COMPATIBLE
组合组字段数量2 到 5 个
冲突策略阻断导入并记录冲突
缺少组内字段行级失败
多组策略组内 AND,组间 OR,但命中必须同一联系人

8.2 配置 API 草案

流程 / 代码片段
GET /api/contact/v1/identity-rule

返回:

流程 / 代码片段
{
  "companyId": 1001,
  "mode": "GROUPED_COMPOSITE",
  "conflictPolicy": "REJECT",
  "groups": [
    {
      "groupCode": "hotel_member",
      "groupName": "酒店会员身份",
      "matchType": "ALL_FIELDS",
      "fields": ["card_no", "email"]
    }
  ]
}
流程 / 代码片段
PUT /api/contact/v1/identity-rule

请求:

流程 / 代码片段
{
  "mode": "GROUPED_COMPOSITE",
  "conflictPolicy": "REJECT",
  "groups": [
    {
      "groupCode": "hotel_member",
      "fieldNames": ["card_no", "email"]
    }
  ],
  "dryRun": true
}

dryRun=true 时返回预检结果,不落地启用。

8.3 导入预检展示

导入前建议展示:

指标含义
新增联系人完整组合无命中。
更新联系人完整组合命中唯一联系人。
冲突行组合组命中多个不同联系人,或多组命中不同联系人。
缺少组合字段组内字段不完整。
文件内重复完整组合键重复。

9. 测试方案

9.1 单元测试

测试类覆盖点
AttributesUtilsTestANY_KEY_COMPATIBLE 计划生成;GROUPED_COMPOSITE 完整组/不完整组;隐藏字段排除;字段规范化。
PostgreSQLSourceTestOR 查询保持兼容;AND 查询生成;多组 OR-of-AND;字段转换失败 fail closed。
DataCleanTestadd/update/merge/conflict;type=usertype=eventtype=form_user 都覆盖。
SourceCommonTest合法重复合并不回归;组合冲突不会进入 merge。
CustomerServiceTestinsert/update 返回 per-item 结果;失败不记录成功。
MetadataServiceTest规则更新、问卷同步、缓存失效、旧 payload 默认兼容。

9.2 集成测试

使用 JUnit 5 + Testcontainers PostgreSQL:

  1. 初始化 contact 表和代表性联系人。
  2. 配置 ANY_KEY_COMPATIBLE,验证 Email/Mobile OR 命中和合并保持现状。
  3. 配置 GROUPED_COMPOSITE(card_no + email),验证:
  • 同组合更新。
  • 同 email 不同 cardNo 新增。
  • cardNo/email 分别命中不同联系人时冲突。
  • 查询异常不新增。
  1. 验证组合索引存在时 EXPLAIN ANALYZE 使用合理计划。

9.3 E2E / 回归测试

链路用例
ETL SFTP 文件导入缺字段、重复、更新、新增、冲突、计数。
ETL API/SCRM 导入无 uid 时组合规则匹配;有 uid 时保持现有优先级。
表单提交form_user 组合规则匹配和冲突。
事件导入无 uid 的 event 根据组合组找联系人;不完整组合失败。
定时合并组合模式下不按单字段合并。
Legacy 导入如果未迁移,明确测试并记录与 ETL 不同;如果迁移,验证同一 CSV 两条路径结果一致。
分群/旅程/评分/退订/SCRMcustomer_id 连续性、alias 解析、无重复/遗漏。

10. 工作量评估

10.1 推荐拆分

阶段范围预估人天
P0 代码现状确认和方案评审已完成主要分析;补充产品确认、数据样本、环境差异。2-4
P1 规则模型和配置 API表结构/DTO/接口/缓存/审计/预检骨架。8-12
P2 AttributesUtils 规则计划兼容模式合成、组合组解析、行级校验、单元测试。5-8
P3 ETL 上传校验和去重AbstractFileImportServiceEtlApiServiceEtlSdkService6-9
P4 查询层和 DataClean 改造PostgreSQLSourceDataClean.searchUserByPkssearchFormUser、event/form/user 分支。10-16
P5 冲突持久化和导入结果conflict 表、错误记录、下载/查询、导入计数。6-10
P6 写入结果契约和静默失败修复CustomerService per-item result、DataClean.storage 计数修正。5-8
P7 定时合并组合化EsUser.mergeUsersPostgreSQLSource.merge4-7
P8 索引、审计、并发锁审计 SQL、组合索引、锁、EXPLAIN、灰度脚本。6-10
P9 下游回归分群、旅程、评分、退订、SCRM、企微。8-12
P10 管理端 UI规则配置、预检结果、冲突列表。8-12
P11 测试和发布自动化测试、回归、灰度、回滚预案。8-12

10.2 总体估算

范围说明预估
ETL/DataClean MVP只覆盖 ETL SFTP/API/DataClean,不迁移 legacy 上传;含规则配置、组合查询、冲突、核心测试。35-50 人天
酒店 MVP 且覆盖当前 UI 上传在 ETL MVP 基础上,迁移或桥接 legacy ContactGroupUploadService 上传链路。48-68 人天
平台化完整版本管理端体验、冲突治理池、索引生命周期、定时合并、下游完整回归、监控和回滚。75-110 人天
只做文档口径修正和回归测试不实现组合 AND,只把现状明确为“多主键 OR 匹配”。5-8 人天

推荐路径:先做 ETL/DataClean MVP + legacy 上传路径产品确认。如果酒店客户实际使用旧联系人分组导入,则把 legacy 迁移纳入 MVP,否则上线口径会不完整。


11. 风险和防护

风险级别防护
把当前多主键误认为组合主键文档和 UI 统一使用“兼容多主键 OR”和“组合主键 AND”术语。
查询异常被当作无命中新增PostgreSQLSource 异常必须返回错误/抛出,DataClean 不允许错误转新增。
组合冲突继续被自动 merge匹配结果保留 group provenance,DataClean 先判断 conflict,再决定是否 merge。
定时合并按单字段继续合并组合模式禁用或改造 EsUser.mergeUsers
legacy 上传绕过新规则明确范围或迁移到 ETL。
写入失败但导入成功CustomerService 返回 per-item 结果,DataClean 计数基于真实写入结果。
组合查询性能下降中高组合索引、窄查询、EXPLAIN、灰度压测。
字段名动态 SQL 注入中高字段名来自元数据白名单和列名映射,拒绝外部任意字段。Open API 的 ContactInfoService.getContactId 也要补白名单。
缓存不一致元数据缓存和 contact 字段 Redis 缓存同时失效。
下游 customer_id 语义变化不改下游关联键,只做匹配层变更,做 alias 回归。

12. 发布与回滚策略

12.1 灰度顺序

  1. 灰度公司只开启配置 UI,不改变运行时模式。
  2. 运行数据审计:重复 contact_id/customer_id、潜在组合冲突、单字段合并历史。
  3. 创建组合索引并验证 EXPLAIN。
  4. 开启 shadow mode:同时计算 OR 结果和组合结果,只记录差异,不改变写入。
  5. 开启 GROUPED_COMPOSITE,只覆盖 ETL/SFTP/API 导入。
  6. 观察新增率、更新率、冲突率、导入失败率、写入失败率。
  7. 决定是否迁移 legacy 上传路径。

12.2 回滚

回滚对象操作
运行时语义公司规则从 GROUPED_COMPOSITE 切回 ANY_KEY_COMPATIBLE
配置变更保留规则版本,禁用当前版本。
索引不急于删除组合索引,确认稳定后再下线。
冲突记录保留审计,不随回滚删除。
已错误写入数据通过 conflict/dead-letter/reconciliation 脚本按批次修复。

13. 最终建议

本需求不应再表述为“DMTX 要支持多个主键”,因为当前已经支持多个 if_pk 主键字段,并且 Email/Mobile 可取消、自定义字段可设为主键。

准确表述应为:

DMTX 需要在现有多主键 OR 匹配能力之上,新增管理员可配置的组合主键 AND 匹配模式;该模式要求组内字段完整且同时命中同一联系人,冲突时不静默合并,从而支持酒店集团“同邮箱不同会员卡/不同酒店关系”的联系人身份隔离。

推荐实施原则:

  1. 保留存量 ANY_KEY_COMPATIBLE 默认模式。
  2. 新增 GROUPED_COMPOSITE,只对显式开启公司生效。
  3. 先统一 ETL/DataClean 主链路,再处理 legacy 上传。
  4. 不改 contact 物理主键和下游关联模型。
  5. 强化查询错误、写入错误、冲突记录,避免静默失败。
  6. 上线前必须处理定时合并、表单、事件、索引和下游回归。

14. 质量自检

检查项得分说明
客户原始需求记录2/2保留酒店集团原始场景。
业务场景清晰2/2明确同邮箱不同会员卡、偏好不覆盖。
目标用户明确2/2管理员配置,运营使用。
竞品/行业参考2/2Salesforce、HubSpot、Klaviyo。
用户体验步骤2/2三步配置、预检、冲突展示。
数据模型字段级2/2给出规则表、冲突表、DTO。
API 出入参2/2给出配置 API 草案。
类/方法级改造点2/2覆盖 upload、DataClean、source、merge、CustomerService。
边界情况2/2缺字段、冲突、查询异常、legacy、定时合并。
工作量分解2/2按阶段和范围估算。
测试方案2/2单元、集成、E2E、下游回归。
风险和回滚2/2灰度、审计、索引、回滚。
可开发性2/2明确模式、DTO、SQL、顺序。

总分:26 / 26。可进入研发评审。