struct dt_object *dt);
void (*do_write_unlock)(const struct lu_env *env,
struct dt_object *dt);
+ int (*do_write_locked)(const struct lu_env *env,
+ struct dt_object *dt);
/**
* Note: following ->do_{x,}attr_{set,get}() operations are very
* similar to ->moo_{x,}attr_{set,get}() operations in struct
#define MAY_RGETFACL (1 << 14)
enum {
- MDS_CHECK_SPLIT = 1 << 0,
- MDS_CROSS_REF = 1 << 1,
- MDS_VTX_BYPASS = 1 << 2,
- MDS_PERM_BYPASS = 1 << 3,
- MDS_SOM = 1 << 4,
- MDS_QUOTA_IGNORE = 1 << 5
+ MDS_CHECK_SPLIT = 1 << 0,
+ MDS_CROSS_REF = 1 << 1,
+ MDS_VTX_BYPASS = 1 << 2,
+ MDS_PERM_BYPASS = 1 << 3,
+ MDS_SOM = 1 << 4,
+ MDS_QUOTA_IGNORE = 1 << 5,
+ MDS_CLOSE_CLEANUP = 1 << 6
};
struct mds_rec_create {
int reset = 1;
ENTRY;
+ LASSERT(mdd_write_locked(env, obj) != 0);
+
rc = mdd_iattr_get(env, obj, ma);
if (rc == 0 && ma->ma_attr.la_nlink == 0) {
+ obj->mod_flags |= DEAD_OBJ;
/* add new orphan and the object
* will be deleted during mdd_close() */
if (obj->mod_count) {
rc = __mdd_orphan_add(env, obj, th);
- if (rc == 0) {
- obj->mod_flags |= ORPHAN_OBJ;
- CDEBUG(D_HA, "Object "DFID" is going to be "
- "an orphan, open count = %d\n",
+ if (rc == 0)
+ CDEBUG(D_HA, "Object "DFID" is inserted into "
+ "orphan list, open count = %d\n",
PFID(mdd_object_fid(obj)),
obj->mod_count);
- }
- }
-
- obj->mod_flags |= DEAD_OBJ;
- if (!(obj->mod_flags & ORPHAN_OBJ)) {
+ else
+ CERROR("Object "DFID" fail to be an orphan, "
+ "open count = %d, maybe cause failed "
+ "open replay\n",
+ PFID(mdd_object_fid(obj)),
+ obj->mod_count);
+ } else {
rc = mdd_object_kill(env, obj, ma);
if (rc == 0)
reset = 0;
enum mdd_object_role role);
void mdd_write_unlock(const struct lu_env *env, struct mdd_object *obj);
void mdd_read_unlock(const struct lu_env *env, struct mdd_object *obj);
+int mdd_write_locked(const struct lu_env *env, struct mdd_object *obj);
void mdd_pdlock_init(struct mdd_object *obj);
unsigned long mdd_name2hash(const char *name);
next->do_ops->do_read_unlock(env, next);
}
+int mdd_write_locked(const struct lu_env *env, struct mdd_object *obj)
+{
+ struct dt_object *next = mdd_object_child(obj);
+
+ return next->do_ops->do_write_locked(env, next);
+}
/* Methods for parallel directory locking */
}
/*
- * called with obj not locked.
+ * called with obj locked.
*/
-
int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
struct mdd_object *obj, struct lu_attr *la)
{
int rc;
ENTRY;
+ LASSERT(mdd_write_locked(env, obj) != 0);
+
+ if (unlikely(!S_ISREG(mdd_object_type(obj))))
+ RETURN(0);
+
if (unlikely(la->la_nlink != 0)) {
CWARN("Attempt to destroy OSS object when nlink == %d\n",
la->la_nlink);
/* get lov ea */
- rc = mdd_get_md_locked(env, obj, ma->ma_lmm, &ma->ma_lmm_size,
- XATTR_NAME_LOV);
+ rc = mdd_get_md(env, obj, ma->ma_lmm, &ma->ma_lmm_size,
+ XATTR_NAME_LOV);
if (rc <= 0) {
CWARN("Get lov ea failed for "DFID" rc = %d\n",
struct md_attr *ma)
{
struct mdd_object *mdd_obj = md2mdd_obj(obj);
+ struct mdd_device *mdd = mdo2mdd(obj);
struct thandle *handle;
int rc;
int reset = 1;
/* release open count */
mdd_obj->mod_count --;
- if (mdd_obj->mod_count == 0) {
+ if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
/* remove link to object from orphan index */
- if (mdd_obj->mod_flags & ORPHAN_OBJ)
- __mdd_orphan_del(env, mdd_obj, handle);
+ rc = __mdd_orphan_del(env, mdd_obj, handle);
+ if (rc == 0) {
+ CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
+ "list, OSS objects to be destroyed.\n",
+ PFID(mdd_object_fid(mdd_obj)));
+ } else {
+ CERROR("Object "DFID" can not be deleted from orphan "
+ "list, maybe cause OST objects can not be "
+ "destroyed (err: %d).\n",
+ PFID(mdd_object_fid(mdd_obj)), rc);
+ /* If object was not deleted from orphan list, do not
+ * destroy OSS objects, which will be done when next
+ * recovery. */
+ GOTO(out, rc);
+ }
}
rc = mdd_iattr_get(env, mdd_obj, ma);
- if (rc == 0) {
- if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
- rc = mdd_object_kill(env, mdd_obj, ma);
+ /* Object maybe not in orphan list originally, it is rare case for
+ * mdd_finish_unlink() failure. */
+ if (rc == 0 && ma->ma_attr.la_nlink == 0) {
#ifdef HAVE_QUOTA_SUPPORT
- if (mds->mds_quota) {
- quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
- mdd_quota_wrapper(&ma->ma_attr, qids);
- }
+ if (mds->mds_quota) {
+ quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+ mdd_quota_wrapper(&ma->ma_attr, qids);
+ }
#endif
- if (rc == 0)
- reset = 0;
+ /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
+ if (ma->ma_valid & MA_FLAGS &&
+ ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
+ rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
+ } else {
+ rc = mdd_object_kill(env, mdd_obj, ma);
+ if (rc == 0)
+ reset = 0;
}
+
+ if (rc != 0)
+ CERROR("Error when prepare to delete Object "DFID" , "
+ "which will cause OST objects can not be "
+ "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
}
+ EXIT;
+out:
if (reset)
ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
quota_opc);
#endif
- RETURN(rc);
+ return rc;
}
/*
int rc;
ENTRY;
+ LASSERT(mdd_write_locked(env, obj) != 0);
+ LASSERT(!(obj->mod_flags & ORPHAN_OBJ));
+ LASSERT(obj->mod_count > 0);
+
mdd_orphan_write_lock(env, mdd);
rc = mdd_orphan_insert_obj(env, mdd, obj, op, th);
dotdot, th, BYPASS_CAPA, 1);
out:
+ if (rc == 0)
+ obj->mod_flags |= ORPHAN_OBJ;
+
mdd_orphan_write_unlock(env, mdd);
RETURN(rc);
struct thandle *th)
{
struct lu_attr *la = &mdd_env_info(env)->mti_la;
- int rc;
+ int rc = 0;
+ ENTRY;
/* No need to lock this object as its recovery phase, and
* no other thread can access it. But we need to lock it
* as its precondition for osd api we using. */
- mdd_write_lock(env, obj, MOR_TGT_CHILD);
mdo_ref_del(env, obj, th);
if (S_ISDIR(mdd_object_type(obj))) {
mdo_ref_del(env, obj, th);
mdd_orphan_ref_del(env, mdd, th);
- mdd_write_unlock(env, obj);
} else {
/* regular file , cleanup linked ost objects */
rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
- mdd_write_unlock(env, obj);
- if (rc)
- RETURN(rc);
-
- mdd_lov_destroy(env, mdd, obj, la);
+ if (rc == 0)
+ rc = mdd_lov_destroy(env, mdd, obj, la);
}
- return 0;
+ RETURN(rc);
}
static int orph_index_delete(const struct lu_env *env,
ENTRY;
+ LASSERT(mdd_write_locked(env, obj) != 0);
+ LASSERT(obj->mod_flags & ORPHAN_OBJ);
+ LASSERT(obj->mod_count == 0);
+
LASSERT(dor);
key = orph_key_fill(env, mdo2fid(obj), op);
mdo_ref_del(env, obj, th);
mdd_orphan_ref_del(env, mdd, th);
}
- } else
+ obj->mod_flags &= ~ORPHAN_OBJ;
+ } else {
CERROR("could not delete object: rc = %d\n",rc);
+ }
- obj->mod_flags &= ~ORPHAN_OBJ;
mdd_orphan_write_unlock(env, mdd);
RETURN(rc);
}
{
struct thandle *th = NULL;
struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
- int rc;
+ int rc = 0;
ENTRY;
mdd_txn_param_build(env, mdd, MDD_TXN_UNLINK_OP);
RETURN(-ENOMEM);
}
- mdd_orphan_write_lock(env, mdd);
- rc = mdd_orphan_delete_obj(env, mdd, key, th);
- if (!rc)
- orphan_object_kill(env, obj, mdd, th);
- else
- CERROR("could not delete object: rc = %d\n",rc);
-
- mdd_orphan_write_unlock(env, mdd);
+ mdd_write_lock(env, obj, MOR_TGT_CHILD);
+ if (likely(obj->mod_count == 0)) {
+ mdd_orphan_write_lock(env, mdd);
+ rc = mdd_orphan_delete_obj(env, mdd, key, th);
+ if (!rc)
+ orphan_object_kill(env, obj, mdd, th);
+ else
+ CERROR("could not delete object: rc = %d\n",rc);
+ mdd_orphan_write_unlock(env, mdd);
+ }
+ mdd_write_unlock(env, obj);
mdd_trans_stop(env, mdd, 0, th);
RETURN(rc);
CWARN("Found orphan! Delete it\n");
rc = orphan_object_destroy(env, mdo, key);
} else {
- CDEBUG(D_HA, "Found orphan, open count = %d\n", mdo->mod_count);
- mdo->mod_flags |= ORPHAN_OBJ;
+ mdd_write_lock(env, mdo, MOR_TGT_CHILD);
+ if (likely(mdo->mod_count > 0)) {
+ CDEBUG(D_HA, "Found orphan, open count = %d\n",
+ mdo->mod_count);
+ mdo->mod_flags |= ORPHAN_OBJ;
+ }
+ mdd_write_unlock(env, mdo);
}
mdd_object_put(env, mdo);
do {
key = (void *)iops->key(env, it);
- if (IS_ERR(key))
+ if (IS_ERR(key)) {
+ CERROR("key failed when clean pending.\n");
goto next;
+ }
key_sz = iops->key_size(env, it);
/* filter out "." and ".." entries from
if (orphan_key_to_fid(mti_key, &fid))
goto next;
- if (!fid_is_sane(&fid))
+ if (!fid_is_sane(&fid)) {
+ CERROR("fid is not sane when clean pending.\n");
goto next;
+ }
/* kill orphan object */
cookie = iops->store(env, it);
result = iops->next(env, it);
} while (result == 0);
result = 0;
- } else if (result == 0)
+ } else if (result == 0) {
+ CERROR("Input/Output for clean pending.\n");
/* Index contains no zero key? */
result = -EIO;
+ }
iops->put(env, it);
iops->fini(env, it);
- } else
+ } else {
+ CERROR("not enough memory for clean pending.\n");
result = -ENOMEM;
+ }
RETURN(result);
}
int cookie_size;
lmm_size = mdt->mdt_max_mdsize;
- cookie_size = mdt->mdt_max_cookiesize;
OBD_ALLOC(ma->ma_lmm, lmm_size);
if (ma->ma_lmm == NULL)
GOTO(out_lmm, rc = -ENOMEM);
+
+ cookie_size = mdt->mdt_max_cookiesize;
OBD_ALLOC(ma->ma_cookie, cookie_size);
if (ma->ma_cookie == NULL)
GOTO(out_cookie, rc = -ENOMEM);
/* Close any open files (which may also cause orphan unlinking). */
list_for_each_entry_safe(mfd, n, &closing_list, mfd_list) {
list_del_init(&mfd->mfd_list);
- /* TODO: if we close the unlinked file,
- * we need to remove its objects from OST */
memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
ma->ma_lmm_size = lmm_size;
ma->ma_cookie_size = cookie_size;
- ma->ma_need = MA_LOV | MA_COOKIE;
- ma->ma_valid = 0;
+ ma->ma_need = 0;
+ /* It is not for setattr, just tell MDD to send
+ * DESTROY RPC to OSS if needed */
+ ma->ma_attr_flags = MDS_CLOSE_CLEANUP;
+ ma->ma_valid = MA_FLAGS;
mdt_mfd_close(info, mfd);
}
info->mti_mdt = NULL;
up_write(&obj->oo_sem);
}
+static int osd_object_write_locked(const struct lu_env *env,
+ struct dt_object *dt)
+{
+ struct osd_object *obj = osd_dt_obj(dt);
+
+ LINVRNT(osd_invariant(obj));
+
+ return obj->oo_owner == env;
+}
+
static int capa_is_sane(const struct lu_env *env,
struct osd_device *dev,
struct lustre_capa *capa,
.do_write_lock = osd_object_write_lock,
.do_read_unlock = osd_object_read_unlock,
.do_write_unlock = osd_object_write_unlock,
+ .do_write_locked = osd_object_write_locked,
.do_attr_get = osd_attr_get,
.do_attr_set = osd_attr_set,
.do_ah_init = osd_ah_init,
.do_write_lock = osd_object_write_lock,
.do_read_unlock = osd_object_read_unlock,
.do_write_unlock = osd_object_write_unlock,
+ .do_write_locked = osd_object_write_locked,
.do_attr_get = osd_attr_get,
.do_attr_set = osd_attr_set,
.do_ah_init = osd_ah_init,