-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(worker): digest by key #7569
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -261,12 +261,14 @@ export class AddJob { | |
private async updateMetadata(response: ExecuteOutput, command: AddJobCommand) { | ||
let metadata = {} as IWorkflowStepMetadata; | ||
const outputs = response.outputs as DigestOutput; | ||
// digest value is pre-computed by framework and passed as digestKey | ||
const outputDigestValue = outputs?.digestKey; | ||
const digestType = getDigestType(outputs); | ||
|
||
if (isTimedDigestOutput(outputs)) { | ||
metadata = { | ||
type: DigestTypeEnum.TIMED, | ||
digestKey: outputs?.digestKey, | ||
digestValue: outputDigestValue, | ||
timed: { cronExpression: outputs?.cron }, | ||
} as IDigestTimedMetadata; | ||
|
||
|
@@ -278,7 +280,7 @@ export class AddJob { | |
{ | ||
$set: { | ||
'digest.type': metadata.type, | ||
'digest.digestKey': metadata.digestKey, | ||
'digest.digestValue': metadata.digestValue, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any backwards compatability concerns? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nvm, saw it later in the query builder There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah this part is about the framework digest key, which was not working before. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm shocked 🤯 |
||
'digest.amount': metadata.amount, | ||
'digest.unit': metadata.unit, | ||
'digest.timed.cronExpression': metadata.timed?.cronExpression, | ||
|
@@ -291,7 +293,7 @@ export class AddJob { | |
metadata = { | ||
type: digestType, | ||
amount: outputs?.amount, | ||
digestKey: outputs?.digestKey, | ||
digestValue: outputDigestValue, | ||
unit: outputs.unit ? castUnitToDigestUnitEnum(outputs?.unit) : undefined, | ||
backoff: digestType === DigestTypeEnum.BACKOFF, | ||
backoffAmount: outputs.lookBackWindow?.amount, | ||
|
@@ -306,7 +308,7 @@ export class AddJob { | |
{ | ||
$set: { | ||
'digest.type': metadata.type, | ||
'digest.digestKey': metadata.digestKey, | ||
'digest.digestValue': metadata.digestValue, | ||
'digest.amount': metadata.amount, | ||
'digest.unit': metadata.unit, | ||
'digest.backoff': metadata.backoff, | ||
|
@@ -321,7 +323,7 @@ export class AddJob { | |
metadata = { | ||
type: digestType, | ||
amount: outputs?.amount, | ||
digestKey: outputs?.digestKey, | ||
digestValue: outputDigestValue, | ||
unit: outputs.unit ? castUnitToDigestUnitEnum(outputs?.unit) : undefined, | ||
} as IDigestRegularMetadata; | ||
|
||
|
@@ -333,7 +335,7 @@ export class AddJob { | |
{ | ||
$set: { | ||
'digest.type': metadata.type, | ||
'digest.digestKey': metadata.digestKey, | ||
'digest.digestValue': metadata.digestValue, | ||
'digest.amount': metadata.amount, | ||
'digest.unit': metadata.unit, | ||
}, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,7 +47,7 @@ export class MergeOrCreateDigest { | |
|
||
const digestMeta = job.digest as IDigestBaseMetadata; | ||
const digestKey = digestMeta?.digestKey; | ||
const digestValue = getNestedValue(job.payload, digestKey); | ||
const digestValue = digestMeta?.digestValue ?? getNestedValue(job.payload, digestKey); | ||
|
||
const digestAction = command.filtered | ||
? { digestResult: DigestCreationResultEnum.SKIPPED } | ||
|
@@ -150,9 +150,13 @@ export class MergeOrCreateDigest { | |
} | ||
|
||
private getLockKey(job: JobEntity, digestKey: string | undefined, digestValue: string | number | undefined): string { | ||
let resource = `environment:${job._environmentId}:template:${job._templateId}:subscriber:${job._subscriberId}`; | ||
const resource = `environment:${job._environmentId}:template:${job._templateId}:subscriber:${job._subscriberId}`; | ||
if (digestKey && digestValue) { | ||
resource = `${resource}:digestKey:${digestKey}:digestValue:${digestValue}`; | ||
return `${resource}:digestKey:${digestKey}:digestValue:${digestValue}`; | ||
} | ||
|
||
if (digestValue) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the scenario where we have a key but not a digestValue? Is the key only the fixed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry i missed this one before merging. |
||
return `${resource}:digestValue:${digestValue}`; | ||
} | ||
|
||
return resource; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -189,9 +189,10 @@ export class JobRepository extends BaseRepository<JobDBModel, JobEntity, Enforce | |
job.digest?.type === DigestTypeEnum.BACKOFF || | ||
(job.digest as IDigestRegularMetadata)?.backoff || | ||
(digestMeta?.backoff && digestMeta?.backoff); | ||
const digestQuery = this.buildDigestQuery(digestKey, digestValue); | ||
|
||
if (isBackoff) { | ||
const trigger = await this.getTrigger(job, digestMeta, digestKey, digestValue); | ||
const trigger = await this.getTriggerJob(job, digestMeta, digestQuery); | ||
if (!trigger) { | ||
return { | ||
digestResult: DigestCreationResultEnum.SKIPPED, | ||
|
@@ -219,7 +220,7 @@ export class JobRepository extends BaseRepository<JobDBModel, JobEntity, Enforce | |
_templateId: job._templateId, | ||
_environmentId: this.convertStringToObjectId(job._environmentId), | ||
_subscriberId: this.convertStringToObjectId(job._subscriberId), | ||
...(digestKey && { [`payload.${digestKey}`]: digestValue }), | ||
...digestQuery, | ||
}, | ||
'_id _notificationId' | ||
); | ||
|
@@ -252,18 +253,22 @@ export class JobRepository extends BaseRepository<JobDBModel, JobEntity, Enforce | |
}; | ||
} | ||
|
||
private buildDigestQuery(digestKey: string | undefined, digestValue: string | number | undefined) { | ||
const digestQueryV1 = digestKey ? { [`payload.${digestKey}`]: digestValue } : null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In v1, we handled digest aggregation by:
|
||
// Digest key parsing is handled by the framework, leaving only the digest value available here | ||
const digestQueryV2 = !digestKey && digestValue ? { [`digest.digestValue`]: digestValue } : null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now we store the final digest value directly. We don't track how this value was generated - we simply check if this exact digest value already exists in our system. |
||
const digestQuery = digestQueryV1 || digestQueryV2; | ||
|
||
return digestQuery || {}; | ||
} | ||
|
||
private getBackoffDate(metadata: IDigestRegularMetadata | undefined) { | ||
return sub(new Date(), { | ||
[metadata?.backoffUnit as string]: metadata?.backoffAmount, | ||
}); | ||
} | ||
|
||
private getTrigger( | ||
job: JobEntity, | ||
metadata?: IDigestRegularMetadata, | ||
digestKey?: string, | ||
digestValue?: string | number | ||
) { | ||
private getTriggerJob(job: JobEntity, metadata?: IDigestRegularMetadata, digestQuery?: Record<string, unknown>) { | ||
const query = { | ||
updatedAt: { | ||
$gte: this.getBackoffDate(metadata), | ||
|
@@ -276,7 +281,7 @@ export class JobRepository extends BaseRepository<JobDBModel, JobEntity, Enforce | |
type: StepTypeEnum.TRIGGER, | ||
_environmentId: job._environmentId, | ||
_subscriberId: job._subscriberId, | ||
...(digestKey && { [`payload.${digestKey}`]: digestValue }), | ||
...digestQuery, | ||
}; | ||
|
||
return this.findOne(query); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert the removal of digest key in Dashboard.