}
}
+static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ struct mdt_lock_handle *lh,
+ struct ldlm_enqueue_info *einfo,
+ struct list_head *slave_locks,
+ int decref)
+{
+ if (mdt_object_remote(obj)) {
+ mdt_unlock_list(info, slave_locks, decref);
+ mdt_object_unlock(info, obj, lh, decref);
+ } else {
+ mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
+ }
+}
+
/*
* lock parents of links, and also check whether total locks don't exceed
* RS_MAX_LOCKS.
* \retval 1 on success, but total lock count may exceed RS_MAX_LOCKS
* \retval -ev negative errno upon error
*/
-static int mdt_lock_links(struct mdt_thread_info *info,
- struct mdt_object *pobj,
- const struct md_attr *ma,
- struct mdt_object *obj,
- struct list_head *link_locks)
+static int mdt_link_parents_lock(struct mdt_thread_info *info,
+ struct mdt_object *pobj,
+ const struct md_attr *ma,
+ struct mdt_object *obj,
+ struct mdt_lock_handle *lhp,
+ struct ldlm_enqueue_info *peinfo,
+ struct list_head *parent_slave_locks,
+ struct list_head *link_locks)
{
struct mdt_device *mdt = info->mti_mdt;
struct lu_buf *buf = &info->mti_big_buf;
struct lu_name *lname = &info->mti_name;
struct linkea_data ldata = { NULL };
bool blocked = false;
- int retries = 5;
int local_lnkp_cnt = 0;
int rc;
RETURN(rc);
}
-repeat:
for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
linkea_next_entry(&ldata)) {
struct mdt_object *lnkp;
rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
MDS_INODELOCK_UPDATE, true);
if (!(ibits & MDS_INODELOCK_UPDATE)) {
- blocked = true;
- CDEBUG(D_INFO, "busy lock on "DFID" "DNAME" retry %d\n",
- PFID(&fid), PNAME(lname), retries);
+ CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
+ PFID(&fid), PNAME(lname));
mdt_unlock_list(info, link_locks, 1);
+ /* also unlock parent locks to avoid deadlock */
+ if (!blocked)
+ mdt_migrate_object_unlock(info, pobj, lhp,
+ peinfo,
+ parent_slave_locks,
+ 1);
+
+ blocked = true;
mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
}
- if (blocked) {
- rc = -EBUSY;
- if (--retries > 0) {
- mdt_unlock_list(info, link_locks, rc);
- blocked = false;
- local_lnkp_cnt = 0;
- goto repeat;
- }
- }
+ if (blocked)
+ GOTO(out, rc = -EBUSY);
EXIT;
out:
return rc;
}
-static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
- struct mdt_object *obj,
- struct mdt_lock_handle *lh,
- struct ldlm_enqueue_info *einfo,
- struct list_head *slave_locks,
- int decref)
-{
- if (mdt_object_remote(obj)) {
- mdt_unlock_list(info, slave_locks, decref);
- mdt_object_unlock(info, obj, lh, decref);
- } else {
- mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
- }
-}
-
/* lock parent and its stripes */
static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
struct mdt_object *obj,
LIST_HEAD(parent_slave_locks);
LIST_HEAD(child_slave_locks);
LIST_HEAD(link_locks);
+ int lock_retries = 5;
bool open_sem_locked = false;
bool do_sync = false;
int rc;
if (rc)
GOTO(put_parent, rc);
+lock_parent:
/* lock parent object */
lhp = &info->mti_lh[MDT_LH_PARENT];
mdt_lock_reg_init(lhp, LCK_PW);
GOTO(unlock_parent, rc);
/* lock parents of source links, and revoke LOOKUP lock of links */
- rc = mdt_lock_links(info, pobj, ma, sobj, &link_locks);
+ rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
+ &parent_slave_locks, &link_locks);
+ if (rc == -EBUSY && lock_retries-- > 0) {
+ mdt_object_put(env, sobj);
+ mdt_object_put(env, spobj);
+ goto lock_parent;
+ }
+
if (rc < 0)
GOTO(put_source, rc);
if (open_sem_locked)
up_write(&sobj->mot_open_sem);
unlock_links:
- mdt_unlock_list(info, &link_locks, rc);
+ mdt_unlock_list(info, &link_locks, do_sync ?: rc);
put_source:
mdt_object_put(env, sobj);
mdt_object_put(env, spobj);