-
Notifications
You must be signed in to change notification settings - Fork 3.7k
feat: support TruncateCollection api to clear collection data #46167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat: support TruncateCollection api to clear collection data #46167
Conversation
Signed-off-by: sijie-ni-0214 <sijie.ni@zilliz.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: sijie-ni-0214 The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
[ci-v2-notice]
To rerun ci-v2 checks, comment with:
If you have any questions or requests, please contact @zhikunyao. |
|
@sijie-ni-0214 cpu-e2e job failed, comment |
|
@sijie-ni-0214 go-sdk check failed, comment |
| } | ||
| } | ||
|
|
||
| message DropSegmentsByTimeRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please don't introduce new cross-coord rpc.
we can just add new function into mixcoord client,
we will remove those redundant rpc in future.
refer to the implmentation of NotifyDropPartition.
| * | ||
| * @return Status | ||
| */ | ||
| rpc TruncateCollection(milvus.TruncateCollectionRequest) returns (common.Status) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be a bad design for an api to set the response as a enum but not a message struct.
If you want to return other things in future, such as deletion rows, you can not extend the api anymore.
| rpc UpdateLoadConfig(UpdateLoadConfigRequest) returns (common.Status) {} | ||
| rpc RunAnalyzer(RunAnalyzerRequest) returns(milvus.RunAnalyzerResponse){} | ||
| rpc ValidateAnalyzer(ValidateAnalyzerRequest) returns(common.Status){} | ||
| rpc ManualUpdateCurrentTarget(ManualUpdateCurrentTargetRequest) returns(common.Status){} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also remove the cross-coord rpc use function directly.
| clone := make(Str2Str) | ||
| for key, value := range m { | ||
| clone[key] = value | ||
| if m1 == nil || m2 == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is empty map equal to nil map in this semantic?
the semantic of current implementation is not equal to the original implement.
| zap.Duration("duration", dropSegmentsDuration)) | ||
|
|
||
| // Check if the collection is loaded in QueryCoord, if not, skip ManualUpdateCurrentTarget | ||
| resp, err := c.mixCoord.ShowLoadCollections(ctx, &querypb.ShowCollectionsRequest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please wrap following operation at querycoord side, return collection not loaded in ManualUpdateCurrentTarget.
handle the error directly, ErrCollectionNotLoaded is just a ignorable error.
| return err | ||
| } | ||
| dropSegmentsDuration := dropSegmentsTr.ElapseSpan() | ||
| log.Ctx(ctx).Info("mydebug: drop segments by time done", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wired log "mydebug"
| } | ||
|
|
||
| // ManualUpdateCurrentTarget is used to manually update the current target for TruncateCollection | ||
| func (s *Server) ManualUpdateCurrentTarget(ctx context.Context, req *querypb.ManualUpdateCurrentTargetRequest) (*commonpb.Status, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be a function, but not a grpc api.
| if t.Base == nil { | ||
| t.Base = commonpbutil.NewMsgBase() | ||
| } | ||
| t.Base.MsgType = commonpb.MsgType_DropCollection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
although it's a deprecated filed, but wrong message type here, make it correct.
| log.Ctx(ctx).Info("receive DropSegmentsByTime request", | ||
| zap.Int64("collectionID", req.GetCollectionID())) | ||
|
|
||
| channels, err := s.getChannelsByCollectionID(ctx, req.GetCollectionID()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why get channel list here, it's already in the FlushTsList.
| return merr.Status(err), nil | ||
| } | ||
| // get segments to drop | ||
| segments := s.meta.GetSegmentsByChannel(channelName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should introduce a new api such as TruncateChannel(timetick uint64).
otherwise, the read and write operation is not protected by the mutex of meta.
issue: #46166