Skip to content

Commit

Permalink
fix: k8s meta index
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Jan 13, 2025
1 parent e37e787 commit 9292932
Show file tree
Hide file tree
Showing 12 changed files with 414 additions and 199 deletions.
12 changes: 4 additions & 8 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,13 +491,9 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
static_cast<uint32_t>(INT32_FLAG(batch_send_metric_size)),
static_cast<uint32_t>(INT32_FLAG(merge_log_count_limit)),
static_cast<uint32_t>(INT32_FLAG(batch_send_interval))};
if (!mBatcher.Init(itr ? *itr : Json::Value(),
this,
strategy,
!mContext->IsExactlyOnceEnabled() && mShardHashKeys.empty()
&& mTelemetryType != sls_logs::SLS_TELEMETRY_TYPE_METRICS)) {
// when either exactly once is enabled or ShardHashKeys is not empty or telemetry type is metrics, we don't
// enable group batch
if (!mBatcher.Init(
itr ? *itr : Json::Value(), this, strategy, !mContext->IsExactlyOnceEnabled() && mShardHashKeys.empty())) {
// when either exactly once is enabled or ShardHashKeys is not empty, we don't enable group batch
return false;
}

Expand Down Expand Up @@ -897,7 +893,7 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item)
}
}
#ifdef __ENTERPRISE__
bool hasNetworkError = sendResult == SEND_NETWORK_ERROR;
bool hasNetworkError = (sendResult == SEND_NETWORK_ERROR || sendResult == SEND_SERVER_ERROR);
EnterpriseSLSClientManager::GetInstance()->UpdateHostStatus(
mProject, mCandidateHostsInfo->GetMode(), data->mCurrentHost, !hasNetworkError);
mCandidateHostsInfo->SelectBestHost();
Expand Down
18 changes: 0 additions & 18 deletions core/unittest/flusher/FlusherSLSUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,24 +439,6 @@ void FlusherSLSUnittest::OnSuccessfulInit() {
ctx.SetExactlyOnceFlag(false);
SenderQueueManager::GetInstance()->Clear();

configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com",
"TelemetryType": "metrics"
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value());
SenderQueueManager::GetInstance()->Clear();

// go param
ctx.SetIsFlushingThroughGoPipelineFlag(true);
configStr = R"(
Expand Down
11 changes: 10 additions & 1 deletion pkg/helper/k8smeta/k8s_meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/alibaba/ilogtail/pkg/logger"
)

const hostIPIndexPrefix = "host/"

type k8sMetaCache struct {
metaStore *DeferredDeletionMetaStore
clientset *kubernetes.Clientset
Expand Down Expand Up @@ -211,10 +213,12 @@ func (m *k8sMetaCache) preProcessCommon(obj interface{}) interface{} {
runtimeObj, ok := obj.(runtime.Object)
if !ok {
logger.Error(context.Background(), "K8S_META_PRE_PROCESS_ERROR", "object is not runtime object", obj)
return obj
}
metaObj, err := meta.Accessor(runtimeObj)
if err != nil {
logger.Error(context.Background(), "K8S_META_PRE_PROCESS_ERROR", "object is not meta object", err)
return obj
}
// fill empty kind
if runtimeObj.GetObjectKind().GroupVersionKind().Empty() {
Expand All @@ -238,6 +242,7 @@ func (m *k8sMetaCache) preProcessPod(obj interface{}) interface{} {
m.preProcessCommon(obj)
pod, ok := obj.(*v1.Pod)
if !ok {
logger.Error(context.Background(), "K8S_META_PRE_PROCESS_ERROR", "object is not pod", obj)
return obj
}
pod.ManagedFields = nil
Expand Down Expand Up @@ -291,7 +296,11 @@ func generateHostIPKey(obj interface{}) ([]string, error) {
if !ok {
return []string{}, fmt.Errorf("object is not a pod")
}
return []string{pod.Status.HostIP}, nil
return []string{addHostIPIndexPrefex(pod.Status.HostIP)}, nil
}

func addHostIPIndexPrefex(ip string) string {
return hostIPIndexPrefix + ip
}

func generateServiceIPKey(obj interface{}) ([]string, error) {
Expand Down
106 changes: 49 additions & 57 deletions pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@ import (
"github.com/alibaba/ilogtail/pkg/logger"
)

type IndexItem struct {
Keys map[string]struct{} // alternative to set, struct{} is zero memory
}

func NewIndexItem() IndexItem {
return IndexItem{
Keys: make(map[string]struct{}),
}
}

func (i IndexItem) Add(key string) {
i.Keys[key] = struct{}{}
}

func (i IndexItem) Remove(key string) {
delete(i.Keys, key)
}

type DeferredDeletionMetaStore struct {
keyFunc cache.KeyFunc
indexRules []IdxFunc
Expand All @@ -19,7 +37,7 @@ type DeferredDeletionMetaStore struct {

// cache
Items map[string]*ObjectWrapper
Index map[string][]string
Index map[string]IndexItem
lock sync.RWMutex

// timer
Expand Down Expand Up @@ -47,7 +65,7 @@ func NewDeferredDeletionMetaStore(eventCh chan *K8sMetaEvent, stopCh <-chan stru
stopCh: stopCh,

Items: make(map[string]*ObjectWrapper),
Index: make(map[string][]string),
Index: make(map[string]IndexItem),

gracePeriod: gracePeriod,
sendFuncs: make(map[string]*SendFuncWithStopCh),
Expand All @@ -68,8 +86,16 @@ func (m *DeferredDeletionMetaStore) Get(key []string) map[string][]*ObjectWrappe
if !ok {
continue
}
for _, realKey := range realKeys {
result[k] = append(result[k], m.Items[realKey])
for realKey := range realKeys.Keys {
if obj, ok := m.Items[realKey]; ok {
if obj.Raw != nil {
result[k] = append(result[k], obj)
} else {
logger.Error(context.Background(), "K8S_META_HANDLE_ALARM", "raw object not found", realKey)
}
} else {
logger.Error(context.Background(), "K8S_META_HANDLE_ALARM", "key not found", realKey)
}
}
}
return result
Expand Down Expand Up @@ -160,10 +186,8 @@ func (m *DeferredDeletionMetaStore) handleEvent() {
select {
case event := <-m.eventCh:
switch event.EventType {
case EventTypeAdd:
m.handleAddEvent(event)
case EventTypeUpdate:
m.handleUpdateEvent(event)
case EventTypeAdd, EventTypeUpdate:
m.handleAddOrUpdateEvent(event)
case EventTypeDelete:
m.handleDeleteEvent(event)
case EventTypeDeferredDelete:
Expand All @@ -184,46 +208,32 @@ func (m *DeferredDeletionMetaStore) handleEvent() {
}
}

func (m *DeferredDeletionMetaStore) handleAddEvent(event *K8sMetaEvent) {
key, err := m.keyFunc(event.Object.Raw)
if err != nil {
logger.Error(context.Background(), "K8S_META_HANDLE_ALARM", "handle k8s meta with keyFunc error", err)
return
}
idxKeys := m.getIdxKeys(event.Object)
m.lock.Lock()
m.Items[key] = event.Object
for _, idxKey := range idxKeys {
if _, ok := m.Index[idxKey]; !ok {
m.Index[idxKey] = make([]string, 0)
}
m.Index[idxKey] = append(m.Index[idxKey], key)
}
m.lock.Unlock()
m.registerLock.RLock()
for _, f := range m.sendFuncs {
f.SendFunc([]*K8sMetaEvent{event})
}
m.registerLock.RUnlock()
}

func (m *DeferredDeletionMetaStore) handleUpdateEvent(event *K8sMetaEvent) {
func (m *DeferredDeletionMetaStore) handleAddOrUpdateEvent(event *K8sMetaEvent) {
key, err := m.keyFunc(event.Object.Raw)
if err != nil {
logger.Error(context.Background(), "K8S_META_HANDLE_ALARM", "handle k8s meta with keyFunc error", err)
return
}
idxKeys := m.getIdxKeys(event.Object)
m.lock.Lock()
// should delete oldIdxKeys in two cases:
// 1. update event
// 2. add event when the previous object is between deleted and deferred delete
if obj, ok := m.Items[key]; ok {
var oldIdxKeys []string
event.Object.FirstObservedTime = obj.FirstObservedTime
oldIdxKeys = m.getIdxKeys(obj)
for _, idxKey := range oldIdxKeys {
m.Index[idxKey].Remove(key)
}
}

m.Items[key] = event.Object
for _, idxKey := range idxKeys {
if _, ok := m.Index[idxKey]; !ok {
m.Index[idxKey] = make([]string, 0)
m.Index[idxKey] = NewIndexItem()
}
m.Index[idxKey] = append(m.Index[idxKey], key)
m.Index[idxKey].Add(key)
}
m.lock.Unlock()
m.registerLock.RLock()
Expand Down Expand Up @@ -273,34 +283,16 @@ func (m *DeferredDeletionMetaStore) handleDeferredDeleteEvent(event *K8sMetaEven
if obj.Deleted {
delete(m.Items, key)
for _, idxKey := range idxKeys {
for i, k := range m.Index[idxKey] {
if k == key {
m.Index[idxKey] = append(m.Index[idxKey][:i], m.Index[idxKey][i+1:]...)
break
}
if _, ok := m.Index[idxKey]; !ok {
continue
}
if len(m.Index[idxKey]) == 0 {
m.Index[idxKey].Remove(key)
if len(m.Index[idxKey].Keys) == 0 {
delete(m.Index, idxKey)
}
}
} else {
// there is a new add event between delete event and deferred delete event
// clear invalid index
newIdxKeys := m.getIdxKeys(obj)
for i := range idxKeys {
if idxKeys[i] != newIdxKeys[i] {
for j, k := range m.Index[idxKeys[i]] {
if k == key {
m.Index[idxKeys[i]] = append(m.Index[idxKeys[i]][:j], m.Index[idxKeys[i]][j+1:]...)
break
}
}
if len(m.Index[idxKeys[i]]) == 0 {
delete(m.Index, idxKeys[i])
}
}
}
}
// if deleted is false, there is a new add event between delete event and deferred delete event
}
}

Expand Down
Loading

0 comments on commit 9292932

Please sign in to comment.