if (need & MA_INODE) {
ma->ma_need = MA_INODE;
+ if (need & MA_DIRENT_CNT)
+ ma->ma_attr.la_valid |= LA_DIRENT_CNT;
+ else
+ ma->ma_attr.la_valid &= ~LA_DIRENT_CNT;
rc = mo_attr_get(env, next, ma);
if (rc)
GOTO(out, rc);
if (parent == NULL)
RETURN(-ENOENT);
+ if (info->mti_mdt->mdt_enable_dir_auto_split)
+ ma_need |= MA_DIRENT_CNT;
+
if (info->mti_cross_ref) {
/* Only getattr on the child. Parent is on another node. */
mdt_set_disposition(info, ldlm_rep,
RETURN(-ENOENT);
}
- rc = mdt_getattr_internal(info, child, 0);
+ rc = mdt_getattr_internal(info, child, ma_need);
if (unlikely(rc != 0)) {
mdt_object_unlock(info, child, lhc, 1);
RETURN(rc);
result = 0;
}
}
- obj->oo_dirent_count = LU_DIRENT_COUNT_UNSET;
+ atomic_set(&obj->oo_dirent_count, LU_DIRENT_COUNT_UNSET);
LINVRNT(osd_invariant(obj));
return result;
LASSERT(S_ISDIR(obj->oo_inode->i_mode));
LASSERT(fid_is_namespace_visible(lu_object_fid(&obj->oo_dt.do_lu)));
- if (obj->oo_dirent_count != LU_DIRENT_COUNT_UNSET) {
- *count = obj->oo_dirent_count;
- RETURN(0);
- }
-
/* directory not initialized yet */
if (!dt->do_index_ops) {
*count = 0;
RETURN(0);
}
+ spin_lock(&obj->oo_guard);
+ *count = atomic_read(&obj->oo_dirent_count);
+ if (*count == LU_DIRENT_COUNT_UNSET)
+ atomic_set(&obj->oo_dirent_count, 0);
+ spin_unlock(&obj->oo_guard);
+ if (*count != LU_DIRENT_COUNT_UNSET)
+ RETURN(0);
+
+ *count = 0;
iops = &dt->do_index_ops->dio_it;
it = iops->init(env, dt, LUDA_64BITHASH);
if (IS_ERR(it))
- RETURN(PTR_ERR(it));
+ GOTO(out, rc = PTR_ERR(it));
rc = iops->load(env, it, 0);
if (rc < 0) {
- if (rc == -ENODATA) {
+ if (rc == -ENODATA)
rc = 0;
- *count = 0;
- }
- GOTO(out, rc);
+ GOTO(put, rc);
}
if (rc > 0)
rc = iops->next(env, it);
- for (*count = 0; rc == 0 || rc == -ESTALE; rc = iops->next(env, it)) {
+ for (; rc == 0 || rc == -ESTALE; rc = iops->next(env, it)) {
if (rc == -ESTALE)
continue;
(*count)++;
}
- if (rc == 1) {
- obj->oo_dirent_count = *count;
+ if (rc == 1 || rc == -ESTALE)
rc = 0;
- }
-out:
+put:
iops->put(env, it);
iops->fini(env, it);
-
+out:
+ /* If counting dirents failed, use the current count (if any).
+ *
+ * At worst this means the directory will not be split until the
+ * count can be completed successfully (remount or oo_dirent_count
+ * incremented by adding new entries). This avoids re-walking
+ * the whole directory on each access and hitting the same error.
+ */
+ if (rc && *count == 0)
+ *count = LU_DIRENT_COUNT_UNSET;
+ atomic_set(&obj->oo_dirent_count, *count);
RETURN(rc);
}
spin_unlock(&obj->oo_guard);
if (S_ISDIR(obj->oo_inode->i_mode) &&
+ (attr->la_valid & LA_DIRENT_CNT) &&
fid_is_namespace_visible(lu_object_fid(&dt->do_lu)))
rc = osd_dirent_count(env, dt, &attr->la_dirent_count);
+ else
+ attr->la_valid &= ~LA_DIRENT_CNT;
return rc;
}
oth = container_of(th, struct osd_thandle, ot_super);
LASSERT(oth->ot_handle->h_transaction != NULL);
if (fid_is_namespace_visible(lu_object_fid(&obj->oo_dt.do_lu)))
- obj->oo_dirent_count = 0;
+ atomic_set(&obj->oo_dirent_count, 0);
result = osd_mkfile(info, obj, mode, hint, th, attr);
return result;
rc = PTR_ERR(bh);
}
- if (!rc && fid_is_namespace_visible(lu_object_fid(&dt->do_lu)) &&
- obj->oo_dirent_count != LU_DIRENT_COUNT_UNSET) {
- /* NB, dirent count may not be accurate, because it's counted
- * without lock.
- */
- if (obj->oo_dirent_count)
- obj->oo_dirent_count--;
- else
- obj->oo_dirent_count = LU_DIRENT_COUNT_UNSET;
- }
+ if (!rc && fid_is_namespace_visible(lu_object_fid(&dt->do_lu)))
+ atomic_dec_if_positive(&obj->oo_dirent_count);
if (hlock != NULL)
ldiskfs_htree_unlock(hlock);
else
hlock, th);
}
}
- if (!rc && fid_is_namespace_visible(lu_object_fid(&pobj->oo_dt.do_lu))
- && pobj->oo_dirent_count != LU_DIRENT_COUNT_UNSET)
- pobj->oo_dirent_count++;
+ if (!rc && fid_is_namespace_visible(lu_object_fid(&pobj->oo_dt.do_lu))){
+ int dirent_count = atomic_read(&pobj->oo_dirent_count);
+
+ /* avoid extremely unlikely 2B-entry directory overflow case */
+ if (dirent_count != LU_DIRENT_COUNT_UNSET &&
+ likely(dirent_count < INT_MAX - NR_CPUS))
+ atomic_inc(&pobj->oo_dirent_count);
+ }
if (hlock != NULL)
ldiskfs_htree_unlock(hlock);