int rc = 0;
ENTRY;
+ if (mdd_is_dead_obj(src_obj))
+ RETURN(-ESTALE);
+
/* Local ops, no lookup before link, check filename length here. */
if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
RETURN(-ENAMETOOLONG);
mdo_ref_del(env, obj, handle);
}
-/* insert named index, add reference if isdir */
-static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
- const struct lu_fid *lf, const char *name, int is_dir,
- struct thandle *handle, struct lustre_capa *capa)
+static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
+ const char *name, struct thandle *handle,
+ struct lustre_capa *capa)
+{
+ struct dt_object *next = mdd_object_child(pobj);
+ int rc;
+ ENTRY;
+
+ if (dt_try_as_dir(env, next)) {
+ rc = next->do_index_ops->dio_delete(env, next,
+ (struct dt_key *)name,
+ handle, capa);
+ } else
+ rc = -ENOTDIR;
+
+ RETURN(rc);
+}
+
+static int __mdd_index_insert_only(const struct lu_env *env,
+ struct mdd_object *pobj,
+ const struct lu_fid *lf, const char *name,
+ struct thandle *handle,
+ struct lustre_capa *capa)
{
struct dt_object *next = mdd_object_child(pobj);
int rc;
} else {
rc = -ENOTDIR;
}
-
- if (rc == 0) {
- if (is_dir) {
- mdd_write_lock(env, pobj, MOR_TGT_PARENT);
- __mdd_ref_add(env, pobj, handle);
- mdd_write_unlock(env, pobj);
- }
- }
RETURN(rc);
}
-/* delete named index, drop reference if isdir */
-static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
- const char *name, int is_dir, struct thandle *handle,
- struct lustre_capa *capa)
+/* insert named index, add reference if isdir */
+static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
+ const struct lu_fid *lf, const char *name, int is_dir,
+ struct thandle *handle, struct lustre_capa *capa)
{
- struct dt_object *next = mdd_object_child(pobj);
int rc;
ENTRY;
- if (dt_try_as_dir(env, next)) {
- rc = next->do_index_ops->dio_delete(env, next,
- (struct dt_key *)name,
- handle, capa);
- if (rc == 0 && is_dir) {
- int is_dot = 0;
-
- if (name != NULL && name[0] == '.' && name[1] == 0)
- is_dot = 1;
- mdd_write_lock(env, pobj, MOR_TGT_PARENT);
- __mdd_ref_del(env, pobj, handle, is_dot);
- mdd_write_unlock(env, pobj);
- }
- } else
- rc = -ENOTDIR;
-
+ rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa);
+ if (rc == 0 && is_dir) {
+ mdd_write_lock(env, pobj, MOR_TGT_PARENT);
+ __mdd_ref_add(env, pobj, handle);
+ mdd_write_unlock(env, pobj);
+ }
RETURN(rc);
}
-static int
-__mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
- const struct lu_fid *lf, const char *name,
- struct thandle *handle, struct lustre_capa *capa)
+/* delete named index, drop reference if isdir */
+static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
+ const char *name, int is_dir, struct thandle *handle,
+ struct lustre_capa *capa)
{
- struct dt_object *next = mdd_object_child(pobj);
int rc;
ENTRY;
- if (dt_try_as_dir(env, next)) {
- struct md_ucred *uc = md_ucred(env);
+ rc = __mdd_index_delete_only(env, pobj, name, handle, capa);
+ if (rc == 0 && is_dir) {
+ int is_dot = 0;
- rc = next->do_index_ops->dio_insert(env, next,
- __mdd_fid_rec(env, lf),
- (const struct dt_key *)name,
- handle, capa, uc->mu_cap &
- CFS_CAP_SYS_RESOURCE_MASK);
- } else {
- rc = -ENOTDIR;
+ if (name != NULL && name[0] == '.' && name[1] == 0)
+ is_dot = 1;
+ mdd_write_lock(env, pobj, MOR_TGT_PARENT);
+ __mdd_ref_del(env, pobj, handle, is_dot);
+ mdd_write_unlock(env, pobj);
}
+
RETURN(rc);
}
+
/** Store a namespace change changelog record
* If this fails, we must fail the whole transaction; we don't
* want the change to commit without the log entry.
struct obd_device *obd = mdd->mdd_obd_dev;
struct mds_obd *mds = &obd->u.mds;
unsigned int qids[MAXQUOTAS] = { 0, 0 };
- int quota_opc = 0, rec_pending = 0;
+ int quota_opc = 0, rec_pending[MAXQUOTAS] = { 0, 0 };
#endif
int rc;
ENTRY;
rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA);
if (!rc) {
+ void *data = NULL;
+ mdd_data_get(env, mdd_tobj, &data);
quota_opc = FSFILT_OP_LINK;
mdd_quota_wrapper(la_tmp, qids);
/* get block quota for parent */
lquota_chkquota(mds_quota_interface_ref, obd,
- qids[USRQUOTA], qids[GRPQUOTA], 1,
- &rec_pending, NULL, LQUOTA_FLAGS_BLK,
- NULL, 0);
+ qids, rec_pending, 1, NULL,
+ LQUOTA_FLAGS_BLK, data, 1);
}
}
#endif
out_pending:
#ifdef HAVE_QUOTA_SUPPORT
if (quota_opc) {
- if (rec_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qids[USRQUOTA], qids[GRPQUOTA],
- rec_pending, 1);
+ lquota_pending_commit(mds_quota_interface_ref, obd,
+ qids, rec_pending, 1);
/* Trigger dqacq for the parent owner. If failed,
* the next call for lquota_chkquota will process it. */
lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
* will be deleted during mdd_close() */
if (obj->mod_count) {
rc = __mdd_orphan_add(env, obj, th);
- if (rc == 0)
+ if (rc == 0) {
obj->mod_flags |= ORPHAN_OBJ;
+ CDEBUG(D_HA, "Object "DFID" is going to be "
+ "an orphan, open count = %d\n",
+ PFID(mdd_object_fid(obj)),
+ obj->mod_count);
+ }
}
obj->mod_flags |= DEAD_OBJ;
int rc;
ENTRY;
+ if (mdd_is_dead_obj(cobj))
+ RETURN(-ESTALE);
+
rc = mdd_may_delete(env, pobj, cobj, ma, 1, 1);
RETURN(rc);
struct obd_device *obd = mdd->mdd_obd_dev;
struct mds_obd *mds = &obd->u.mds;
unsigned int qids[MAXQUOTAS] = { 0, 0 };
- int quota_opc = 0, rec_pending = 0;
+ int quota_opc = 0, rec_pending[MAXQUOTAS] = { 0, 0 };
cfs_cap_t save = uc->mu_cap;
#endif
int rc;
rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
if (!rc) {
+ void *data = NULL;
+ mdd_data_get(env, mdd_obj, &data);
quota_opc = FSFILT_OP_LINK;
mdd_quota_wrapper(la_tmp, qids);
/* get block quota for parent */
lquota_chkquota(mds_quota_interface_ref, obd,
- qids[USRQUOTA], qids[GRPQUOTA],
- 1, &rec_pending, NULL,
- LQUOTA_FLAGS_BLK, NULL, 0);
+ qids, rec_pending, 1, NULL,
+ LQUOTA_FLAGS_BLK, data, 1);
}
} else {
uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK;
#ifdef HAVE_QUOTA_SUPPORT
if (mds->mds_quota) {
if (quota_opc) {
- if (rec_pending)
- lquota_pending_commit(mds_quota_interface_ref,
- obd, qids[USRQUOTA],
- qids[GRPQUOTA],
- rec_pending, 1);
+ lquota_pending_commit(mds_quota_interface_ref,
+ obd, qids, rec_pending, 1);
/* Trigger dqacq for the parent owner. If failed,
* the next call for lquota_chkquota will process it*/
lquota_adjust(mds_quota_interface_ref, obd, 0, qids,
* processed in cmr_rename_tgt before mdd_rename_tgt and enable
* MDS_PERM_BYPASS.
* So check may_delete, but not check nlink of tgt_pobj. */
- LASSERT(tobj);
+
rc = mdd_may_delete(env, tgt_pobj, tobj, ma, 1, 1);
RETURN(rc);
struct mds_obd *mds = &obd->u.mds;
unsigned int qcids[MAXQUOTAS] = { 0, 0 };
unsigned int qpids[MAXQUOTAS] = { 0, 0 };
- int quota_opc = 0, rec_pending = 0;
+ int quota_copc = 0, quota_popc = 0;
+ int rec_pending[MAXQUOTAS] = { 0, 0 };
#endif
int rc;
ENTRY;
rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA);
if (!rc) {
- quota_opc = FSFILT_OP_LINK;
+ void *data = NULL;
+ mdd_data_get(env, mdd_tpobj, &data);
+ quota_popc = FSFILT_OP_LINK;
mdd_quota_wrapper(la_tmp, qpids);
/* get block quota for target parent */
lquota_chkquota(mds_quota_interface_ref, obd,
- qpids[USRQUOTA], qpids[GRPQUOTA], 1,
- &rec_pending, NULL, LQUOTA_FLAGS_BLK,
- NULL, 0);
+ qpids, rec_pending, 1, NULL,
+ LQUOTA_FLAGS_BLK, data, 1);
}
}
#endif
#ifdef HAVE_QUOTA_SUPPORT
if (mds->mds_quota && ma->ma_valid & MA_INODE &&
ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
- quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+ quota_copc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
mdd_quota_wrapper(&ma->ma_attr, qcids);
}
#endif
out_pending:
#ifdef HAVE_QUOTA_SUPPORT
if (mds->mds_quota) {
- if (rec_pending)
+ if (quota_popc)
lquota_pending_commit(mds_quota_interface_ref, obd,
- qpids[USRQUOTA],
- qpids[GRPQUOTA],
- rec_pending, 1);
- if (quota_opc)
- /* Trigger dqrel/dqacq on the target owner of child and
- * parent. If failed, the next call for lquota_chkquota
+ qpids, rec_pending, 1);
+
+ if (quota_copc)
+ /* Trigger dqrel on the target owner of child.
+ * If failed, the next call for lquota_chkquota
* will process it. */
- lquota_adjust(mds_quota_interface_ref, obd, qcids,
- qpids, rc, quota_opc);
+ lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids,
+ rc, quota_copc);
}
#endif
return rc;
if (!md_should_create(spec->sp_cr_flags))
RETURN(0);
-
+ lmm_size = ma->ma_lmm_size;
rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
spec, attr);
if (rc)
unsigned int qcids[MAXQUOTAS] = { 0, 0 };
unsigned int qpids[MAXQUOTAS] = { 0, 0 };
int quota_opc = 0, block_count = 0;
- int inode_pending = 0, block_pending = 0, parent_pending = 0;
+ int inode_pending[MAXQUOTAS] = { 0, 0 };
+ int block_pending[MAXQUOTAS] = { 0, 0 };
+ int parent_pending[MAXQUOTAS] = { 0, 0 };
#endif
ENTRY;
mdd_quota_wrapper(&ma->ma_attr, qcids);
mdd_quota_wrapper(la_tmp, qpids);
/* get file quota for child */
- lquota_chkquota(mds_quota_interface_ref, obd,
- qcids[USRQUOTA], qcids[GRPQUOTA], 1,
- &inode_pending, NULL, 0, NULL, 0);
+ lquota_chkquota(mds_quota_interface_ref, obd, qcids,
+ inode_pending, 1, NULL, 0, NULL, 0);
switch (ma->ma_attr.la_mode & S_IFMT) {
case S_IFLNK:
case S_IFDIR:
/* get block quota for child and parent */
if (block_count)
lquota_chkquota(mds_quota_interface_ref, obd,
- qcids[USRQUOTA], qcids[GRPQUOTA],
- block_count,
- &block_pending, NULL,
+ qcids, block_pending,
+ block_count, NULL,
LQUOTA_FLAGS_BLK, NULL, 0);
if (!same)
lquota_chkquota(mds_quota_interface_ref, obd,
- qpids[USRQUOTA], qpids[GRPQUOTA], 1,
- &parent_pending, NULL,
+ qpids, parent_pending, 1, NULL,
LQUOTA_FLAGS_BLK, NULL, 0);
}
}
* first.
*/
if (S_ISREG(attr->la_mode)) {
+ lmm_size = ma->ma_lmm_size;
rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
spec, attr);
if (rc)
out_pending:
#ifdef HAVE_QUOTA_SUPPORT
if (quota_opc) {
- if (inode_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qcids[USRQUOTA], qcids[GRPQUOTA],
- inode_pending, 0);
- if (block_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qcids[USRQUOTA], qcids[GRPQUOTA],
- block_pending, 1);
- if (parent_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qpids[USRQUOTA], qpids[GRPQUOTA],
- parent_pending, 1);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qcids,
+ inode_pending, 0);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qcids,
+ block_pending, 1);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qpids,
+ parent_pending, 1);
/* Trigger dqacq on the owner of child and parent. If failed,
* the next call for lquota_chkquota will process it. */
lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
* the other case has been processed in cml_rename
* before mdd_rename and enable MDS_PERM_BYPASS. */
LASSERT(sobj);
+
+ if (mdd_is_dead_obj(sobj))
+ RETURN(-ESTALE);
+
rc = mdd_may_delete(env, src_pobj, sobj, ma, 1, 0);
if (rc)
RETURN(rc);
struct dynlock_handle *sdlh, *tdlh;
struct thandle *handle;
const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
+ const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
int is_dir;
- int rc;
+ int rc, rc2;
#ifdef HAVE_QUOTA_SUPPORT
struct obd_device *obd = mdd->mdd_obd_dev;
unsigned int qspids[MAXQUOTAS] = { 0, 0 };
unsigned int qtcids[MAXQUOTAS] = { 0, 0 };
unsigned int qtpids[MAXQUOTAS] = { 0, 0 };
- int quota_opc = 0, rec_pending = 0;
+ int quota_copc = 0, quota_popc = 0;
+ int rec_pending[MAXQUOTAS] = { 0, 0 };
#endif
ENTRY;
rc = mdd_la_get(env, mdd_tpobj, la_tmp,
BYPASS_CAPA);
if (!rc) {
- quota_opc = FSFILT_OP_LINK;
+ void *data = NULL;
+ mdd_data_get(env, mdd_tpobj, &data);
+ quota_popc = FSFILT_OP_LINK;
mdd_quota_wrapper(la_tmp, qtpids);
/* get block quota for target parent */
lquota_chkquota(mds_quota_interface_ref,
- obd, qtpids[USRQUOTA],
- qtpids[GRPQUOTA], 1,
- &rec_pending, NULL,
+ obd, qtpids,
+ rec_pending, 1, NULL,
LQUOTA_FLAGS_BLK,
- NULL, 0);
+ data, 1);
}
}
}
/* "mv dir1 dir2" needs "dir1/.." link update */
if (is_dir && mdd_sobj) {
- rc = __mdd_index_delete(env, mdd_sobj, dotdot, is_dir, handle,
+ rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
mdd_object_capa(env, mdd_spobj));
if (rc)
- GOTO(cleanup, rc);
+ GOTO(fixup_spobj2, rc);
- rc = __mdd_index_insert(env, mdd_sobj, tpobj_fid, dotdot,
- is_dir, handle,
- mdd_object_capa(env, mdd_tpobj));
- if (rc)
- GOTO(cleanup, rc);
+ rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, dotdot,
+ handle, mdd_object_capa(env, mdd_tpobj));
+ if (rc) {
+ GOTO(fixup_spobj, rc);
+ }
}
/* Remove target name from target directory
*/
rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
mdd_object_capa(env, mdd_tpobj));
- if (rc != 0 && rc != -ENOENT)
- GOTO(cleanup, rc);
+ if (rc != 0) {
+ if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
+ /* tname might been renamed to something else */
+ GOTO(fixup_spobj, rc);
+ }
+ if (rc != -ENOENT)
+ GOTO(fixup_spobj, rc);
+ }
/* Insert new fid with target name into target dir */
rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
mdd_object_capa(env, mdd_tpobj));
if (rc)
- GOTO(cleanup, rc);
+ GOTO(fixup_spobj, rc);
LASSERT(ma->ma_attr.la_valid & LA_CTIME);
la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la,
handle, 0);
if (rc)
- GOTO(cleanup, rc);
+ GOTO(fixup_tpobj, rc);
}
/* Remove old target object
*/
if (tobj && mdd_object_exists(mdd_tobj)) {
mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
+ if (mdd_is_dead_obj(mdd_tobj)) {
+ mdd_write_unlock(env, mdd_tobj);
+ /* shld not be dead, something is wrong */
+ CERROR("tobj is dead, something is wrong\n");
+ rc = -EINVAL;
+ goto cleanup;
+ }
__mdd_ref_del(env, mdd_tobj, handle, 0);
/* Remove dot reference. */
la->la_valid = LA_CTIME;
rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
if (rc)
- GOTO(cleanup, rc);
+ GOTO(fixup_tpobj, rc);
rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
mdd_write_unlock(env, mdd_tobj);
if (rc)
- GOTO(cleanup, rc);
+ GOTO(fixup_tpobj, rc);
#ifdef HAVE_QUOTA_SUPPORT
if (mds->mds_quota && ma->ma_valid & MA_INODE &&
ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
- quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+ quota_copc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
mdd_quota_wrapper(&ma->ma_attr, qtcids);
}
#endif
la->la_valid = LA_CTIME | LA_MTIME;
rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0);
if (rc)
- GOTO(cleanup, rc);
+ GOTO(fixup_tpobj, rc);
if (mdd_spobj != mdd_tpobj) {
la->la_valid = LA_CTIME | LA_MTIME;
}
EXIT;
+
+fixup_tpobj:
+ if (rc) {
+ rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
+ mdd_object_capa(env, mdd_tpobj));
+ if (rc2)
+ CWARN("tp obj fix error %d\n",rc2);
+
+ }
+fixup_spobj:
+ if (rc && is_dir && mdd_sobj) {
+ rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
+ mdd_object_capa(env, mdd_spobj));
+
+ if (rc2)
+ CWARN("sp obj dotdot delete error %d\n",rc2);
+
+
+ rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid,
+ dotdot, handle,
+ mdd_object_capa(env, mdd_spobj));
+ if (rc2)
+ CWARN("sp obj dotdot insert error %d\n",rc2);
+ }
+
+fixup_spobj2:
+ if (rc) {
+ rc2 = __mdd_index_insert(env, mdd_spobj,
+ lf, sname, is_dir, handle,
+ mdd_object_capa(env, mdd_tpobj));
+ if (rc2)
+ CWARN("sp obj fix error %d\n",rc2);
+ }
cleanup:
if (likely(tdlh) && sdlh != tdlh)
mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
out_pending:
#ifdef HAVE_QUOTA_SUPPORT
if (mds->mds_quota) {
- if (rec_pending)
+ if (quota_popc)
lquota_pending_commit(mds_quota_interface_ref, obd,
- qtpids[USRQUOTA],
- qtpids[GRPQUOTA],
- rec_pending, 1);
- /* Trigger dqrel on the source owner of parent.
- * If failed, the next call for lquota_chkquota will
- * process it. */
- lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc,
- FSFILT_OP_UNLINK_PARTIAL_PARENT);
- if (quota_opc)
- /* Trigger dqrel/dqacq on the target owner of child and
- * parent. If failed, the next call for lquota_chkquota
+ qtpids, rec_pending, 1);
+
+ if (quota_copc) {
+ /* Trigger dqrel on the source owner of parent.
+ * If failed, the next call for lquota_chkquota will
+ * process it. */
+ lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc,
+ FSFILT_OP_UNLINK_PARTIAL_PARENT);
+
+ /* Trigger dqrel on the target owner of child.
+ * If failed, the next call for lquota_chkquota
* will process it. */
lquota_adjust(mds_quota_interface_ref, obd, qtcids,
- qtpids, rc, quota_opc);
+ qtpids, rc, quota_copc);
+ }
}
#endif
return rc;