struct lu_name *lname = NULL;
const char *name = NULL;
int namelen = 0;
- struct mdt_lock_handle *lhp;
+ struct mdt_lock_handle *lhp = NULL;
struct ldlm_lock *lock;
struct ldlm_res_id *res_id;
int is_resent;
if (namelen == 0) {
reqbody = req_capsule_client_get(info->mti_pill,
&RMF_MDT_BODY);
- LASSERT(fid_is_sane(&reqbody->fid2));
- name = NULL;
+ if (unlikely(reqbody == NULL))
+ RETURN(err_serious(-EFAULT));
+
+ if (unlikely(!fid_is_sane(&reqbody->fid2)))
+ RETURN(err_serious(-EINVAL));
+ name = NULL;
CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
"ldlm_rep = %p\n",
PFID(mdt_object_fid(parent)), PFID(&reqbody->fid2),
RETURN(rc);
}
- /* step 1: lock parent */
- lhp = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
- rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
- MDT_LOCAL_LOCK);
-
- if (unlikely(rc != 0))
- RETURN(rc);
-
if (lname) {
+ /* step 1: lock parent */
+ lhp = &info->mti_lh[MDT_LH_PARENT];
+ mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
+ rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
+ MDT_LOCAL_LOCK);
+ if (unlikely(rc != 0))
+ RETURN(rc);
+
/* step 2: lookup child's fid by name */
rc = mdo_lookup(info->mti_env, next, lname, child_fid,
&info->mti_spec);
LDLM_LOCK_PUT(lock);
rc = 0;
} else {
- struct md_attr *ma;
relock:
- ma = &info->mti_attr;
-
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
mdt_lock_handle_init(lhc);
mdt_lock_reg_init(lhc, LCK_PR);
LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
&child->mot_obj.mo_lu,
"Object doesn't exist!\n");
- GOTO(out_child, rc = -ESTALE);
+ GOTO(out_child, rc = -ENOENT);
}
- ma->ma_valid = 0;
- ma->ma_need = MA_INODE;
- rc = mo_attr_get(info->mti_env, next, ma);
- if (unlikely(rc != 0))
- GOTO(out_child, rc);
+ if (!(child_bits & MDS_INODELOCK_UPDATE)) {
+ struct md_attr *ma = &info->mti_attr;
- /* If the file has not been changed for some time, we return
- * not only a LOOKUP lock, but also an UPDATE lock and this
- * might save us RPC on later STAT. For directories, it also
- * let negative dentry starts working for this dir. */
- if (ma->ma_valid & MA_INODE &&
- ma->ma_attr.la_valid & LA_CTIME &&
- info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
- ma->ma_attr.la_ctime < cfs_time_current_sec())
- child_bits |= MDS_INODELOCK_UPDATE;
+ ma->ma_valid = 0;
+ ma->ma_need = MA_INODE;
+ rc = mo_attr_get(info->mti_env,
+ mdt_object_child(child), ma);
+ if (unlikely(rc != 0))
+ GOTO(out_child, rc);
+
+ /* If the file has not been changed for some time, we
+ * return not only a LOOKUP lock, but also an UPDATE
+ * lock and this might save us RPC on later STAT. For
+ * directories, it also let negative dentry starts
+ * working for this dir. */
+ if (ma->ma_valid & MA_INODE &&
+ ma->ma_attr.la_valid & LA_CTIME &&
+ info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
+ ma->ma_attr.la_ctime < cfs_time_current_sec())
+ child_bits |= MDS_INODELOCK_UPDATE;
+ }
rc = mdt_object_lock(info, child, lhc, child_bits,
MDT_CROSS_LOCK);
out_child:
mdt_object_put(info->mti_env, child);
out_parent:
- mdt_object_unlock(info, parent, lhp, 1);
+ if (lhp)
+ mdt_object_unlock(info, parent, lhp, 1);
return rc;
}
RETURN(-ESTALE);
} else {
/* Non-dir object shouldn't have PDO lock */
- LASSERT(S_ISDIR(lu_object_attr(&o->mot_obj.mo_lu)));
+ if (!S_ISDIR(lu_object_attr(&o->mot_obj.mo_lu)))
+ RETURN(-ENOTDIR);
}
}
LUSTRE_MDT_NAME"-%p", m);
m->mdt_namespace = ldlm_namespace_new(obd, info->mti_u.ns_name,
LDLM_NAMESPACE_SERVER,
- LDLM_NAMESPACE_GREEDY);
+ LDLM_NAMESPACE_GREEDY,
+ LDLM_NS_TYPE_MDT);
if (m->mdt_namespace == NULL)
GOTO(err_fini_seq, rc = -ENOMEM);
memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid);
rc = mdt_client_new(env, mdt);
if (rc == 0)
- mdt_export_stats_init(obd, lexp, 0, localdata);
+ mdt_export_stats_init(obd, lexp, localdata);
}
out:
rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
if (rc == 0)
- mdt_export_stats_init(obd, exp, 1, localdata);
+ mdt_export_stats_init(obd, exp, localdata);
RETURN(rc);
}
int cookie_size;
lmm_size = mdt->mdt_max_mdsize;
- OBD_ALLOC(ma->ma_lmm, lmm_size);
+ OBD_ALLOC_LARGE(ma->ma_lmm, lmm_size);
if (ma->ma_lmm == NULL)
GOTO(out_lmm, rc = -ENOMEM);
cookie_size = mdt->mdt_max_cookiesize;
- OBD_ALLOC(ma->ma_cookie, cookie_size);
+ OBD_ALLOC_LARGE(ma->ma_cookie, cookie_size);
if (ma->ma_cookie == NULL)
GOTO(out_cookie, rc = -ENOMEM);
ma->ma_need = 0;
/* It is not for setattr, just tell MDD to send
* DESTROY RPC to OSS if needed */
- ma->ma_attr_flags = MDS_CLOSE_CLEANUP;
ma->ma_valid = MA_FLAGS;
+ ma->ma_attr_flags = MDS_CLOSE_CLEANUP;
+ /* Don't unlink orphan on failover umount, LU-184 */
+ if (exp->exp_flags & OBD_OPT_FAILOVER)
+ ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
mdt_mfd_close(info, mfd);
}
- OBD_FREE(ma->ma_cookie, cookie_size);
+ OBD_FREE_LARGE(ma->ma_cookie, cookie_size);
ma->ma_cookie = NULL;
out_cookie:
- OBD_FREE(ma->ma_lmm, lmm_size);
+ OBD_FREE_LARGE(ma->ma_lmm, lmm_size);
ma->ma_lmm = NULL;
}
out_lmm:
info->mti_mdt = NULL;
/* cleanup client slot early */
/* Do not erase record for recoverable client. */
- if (!obd->obd_fail || exp->exp_failed)
+ if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed)
mdt_client_del(&env, mdt);
lu_env_fini(&env);
if (cfs_test_bit(MDT_FL_CFGLOG, &m->mdt_state) &&
cfs_test_bit(MDT_FL_SYNCED, &m->mdt_state)) {
struct obd_device *obd = m->mdt_md_dev.md_lu_dev.ld_obd;
-
+
/* Open for clients */
if (obd->obd_no_conn) {
cfs_spin_lock(&obd->obd_dev_lock);
} else {
mdt_max_threads = MDT_MAX_THREADS;
mdt_min_threads = MDT_MIN_THREADS;
- if (mdt_min_threads < MDT_NUM_THREADS)
- mdt_min_threads = MDT_NUM_THREADS;
}
lprocfs_mdt_init_vars(&lvars);