X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_handler.c;h=8ae37944dd8d9fa59ca5423bb259d1907ace4972;hp=b0a7529965e6f2538248fd1250f33a449c638b39;hb=7936ec229c7f62cfbf5a0e4ff5933149967e7c8f;hpb=bd37398ae1b8dbbdd5bf8b700d061b5fc33fd8ed diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index b0a7529..8ae3794 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -53,6 +53,7 @@ * struct OBD_{ALLOC,FREE}*() */ #include +#include /* struct ptlrpc_request */ #include /* struct obd_export */ @@ -67,7 +68,6 @@ #include #include #include -#include #include mdl_mode_t mdt_mdl_lock_modes[] = { @@ -164,27 +164,28 @@ void mdt_lock_reg_init(struct mdt_lock_handle *lh, ldlm_mode_t lm) lh->mlh_type = MDT_REG_LOCK; } -void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lm, - const char *name, int namelen) +void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lock_mode, + const struct lu_name *lname) { - lh->mlh_reg_mode = lm; - lh->mlh_rreg_mode = lm; - lh->mlh_type = MDT_PDO_LOCK; + lh->mlh_reg_mode = lock_mode; + lh->mlh_rreg_mode = lock_mode; + lh->mlh_type = MDT_PDO_LOCK; - if (name != NULL && (name[0] != '\0')) { - LASSERT(namelen > 0); - lh->mlh_pdo_hash = full_name_hash(name, namelen); + if (lu_name_is_valid(lname)) { + lh->mlh_pdo_hash = full_name_hash(lname->ln_name, + lname->ln_namelen); /* XXX Workaround for LU-2856 - * Zero is a valid return value of full_name_hash, but several - * users of mlh_pdo_hash assume a non-zero hash value. We - * therefore map zero onto an arbitrary, but consistent - * value (1) to avoid problems further down the road. */ - if (unlikely(!lh->mlh_pdo_hash)) + * + * Zero is a valid return value of full_name_hash, but + * several users of mlh_pdo_hash assume a non-zero + * hash value. We therefore map zero onto an + * arbitrary, but consistent value (1) to avoid + * problems further down the road. */ + if (unlikely(lh->mlh_pdo_hash == 0)) lh->mlh_pdo_hash = 1; - } else { - LASSERT(namelen == 0); - lh->mlh_pdo_hash = 0ull; - } + } else { + lh->mlh_pdo_hash = 0; + } } static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o, @@ -262,7 +263,7 @@ static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o EXIT; } -int mdt_getstatus(struct tgt_session_info *tsi) +static int mdt_getstatus(struct tgt_session_info *tsi) { struct mdt_thread_info *info = tsi2mdt_info(tsi); struct mdt_device *mdt = info->mti_mdt; @@ -306,7 +307,7 @@ out: return rc; } -int mdt_statfs(struct tgt_session_info *tsi) +static int mdt_statfs(struct tgt_session_info *tsi) { struct ptlrpc_request *req = tgt_ses_req(tsi); struct mdt_thread_info *info = tsi2mdt_info(tsi); @@ -481,8 +482,34 @@ void mdt_client_compatibility(struct mdt_thread_info *info) EXIT; } -static int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o, - char *name) +int mdt_attr_get_eabuf_size(struct mdt_thread_info *info, struct mdt_object *o) +{ + const struct lu_env *env = info->mti_env; + int rc, rc2; + + rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, + XATTR_NAME_LOV); + + if (rc == -ENODATA) + rc = 0; + + if (rc < 0) + goto out; + + /* Is it a directory? Let's check for the LMV as well */ + if (S_ISDIR(lu_object_attr(&mdt_object_child(o)->mo_lu))) { + rc2 = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, + XATTR_NAME_LMV); + if ((rc2 < 0 && rc2 != -ENODATA) || (rc2 > rc)) + rc = rc2; + } + +out: + return rc; +} + +int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o, + const char *name) { const struct lu_env *env = info->mti_env; int rc; @@ -520,30 +547,66 @@ static int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o, RETURN(rc); } -int mdt_attr_get_lov(struct mdt_thread_info *info, - struct mdt_object *o, struct md_attr *ma) +int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, + struct md_attr *ma, const char *name) { struct md_object *next = mdt_object_child(o); struct lu_buf *buf = &info->mti_buf; int rc; - buf->lb_buf = ma->ma_lmm; - buf->lb_len = ma->ma_lmm_size; - rc = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_LOV); + if (strcmp(name, XATTR_NAME_LOV) == 0) { + buf->lb_buf = ma->ma_lmm; + buf->lb_len = ma->ma_lmm_size; + LASSERT(!(ma->ma_valid & MA_LOV)); + } else if (strcmp(name, XATTR_NAME_LMV) == 0) { + buf->lb_buf = ma->ma_lmv; + buf->lb_len = ma->ma_lmv_size; + LASSERT(!(ma->ma_valid & MA_LMV)); + } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { + buf->lb_buf = ma->ma_lmv; + buf->lb_len = ma->ma_lmv_size; + LASSERT(!(ma->ma_valid & MA_LMV_DEF)); + } else { + return -EINVAL; + } + + rc = mo_xattr_get(info->mti_env, next, buf, name); if (rc > 0) { - ma->ma_lmm_size = rc; - ma->ma_valid |= MA_LOV; + if (strcmp(name, XATTR_NAME_LOV) == 0) { + ma->ma_lmm_size = rc; + ma->ma_valid |= MA_LOV; + } else if (strcmp(name, XATTR_NAME_LMV) == 0) { + ma->ma_lmv_size = rc; + ma->ma_valid |= MA_LMV; + } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { + ma->ma_lmv_size = rc; + ma->ma_valid |= MA_LMV_DEF; + } + rc = 0; } else if (rc == -ENODATA) { /* no LOV EA */ rc = 0; } else if (rc == -ERANGE) { - rc = mdt_big_xattr_get(info, o, XATTR_NAME_LOV); + /* Default LMV has fixed size, so it must be able to fit + * in the original buffer */ + if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) + return rc; + rc = mdt_big_xattr_get(info, o, name); if (rc > 0) { info->mti_big_lmm_used = 1; - ma->ma_valid |= MA_LOV; - ma->ma_lmm = info->mti_big_lmm; - ma->ma_lmm_size = rc; + if (!strcmp(name, XATTR_NAME_LOV)) { + ma->ma_valid |= MA_LOV; + ma->ma_lmm = info->mti_big_lmm; + ma->ma_lmm_size = rc; + } else if (!strcmp(name, XATTR_NAME_LMV)) { + ma->ma_valid |= MA_LMV; + ma->ma_lmv = info->mti_big_lmm; + ma->ma_lmv_size = rc; + } else { + return -EINVAL; + } + /* update mdt_max_mdsize so all clients * will be aware about that */ if (info->mti_mdt->mdt_max_mdsize < rc) @@ -555,8 +618,8 @@ int mdt_attr_get_lov(struct mdt_thread_info *info, return rc; } -int mdt_attr_get_pfid(struct mdt_thread_info *info, - struct mdt_object *o, struct lu_fid *pfid) +static int mdt_attr_get_pfid(struct mdt_thread_info *info, + struct mdt_object *o, struct lu_fid *pfid) { struct lu_buf *buf = &info->mti_buf; struct link_ea_header *leh; @@ -632,23 +695,21 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, } if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) { - rc = mdt_attr_get_lov(info, o, ma); + rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LOV); if (rc) GOTO(out, rc); } if (need & MA_LMV && S_ISDIR(mode)) { - buf->lb_buf = ma->ma_lmv; - buf->lb_len = ma->ma_lmv_size; - rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LMV); - if (rc2 > 0) { - ma->ma_lmv_size = rc2; - ma->ma_valid |= MA_LMV; - } else if (rc2 == -ENODATA) { - /* no LMV EA */ - ma->ma_lmv_size = 0; - } else - GOTO(out, rc = rc2); + rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV); + if (rc != 0) + GOTO(out, rc); + } + + if (need & MA_LMV_DEF && S_ISDIR(mode)) { + rc = mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV); + if (rc != 0) + GOTO(out, rc); } if (need & MA_SOM && S_ISREG(mode)) { @@ -724,33 +785,50 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, if (mdt_object_remote(o)) { /* This object is located on remote node.*/ - /* Return -EIO for old client */ + /* Return -ENOTSUPP for old client */ if (!mdt_is_dne_client(req->rq_export)) - GOTO(out, rc = -EIO); + GOTO(out, rc = -ENOTSUPP); repbody->fid1 = *mdt_object_fid(o); repbody->valid = OBD_MD_FLID | OBD_MD_MDS; GOTO(out, rc = 0); } - buffer->lb_len = reqbody->eadatasize; - if (buffer->lb_len > 0) + if (reqbody->eadatasize > 0) { buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD); - else + if (buffer->lb_buf == NULL) + GOTO(out, rc = -EPROTO); + buffer->lb_len = req_capsule_get_size(pill, &RMF_MDT_MD, + RCL_SERVER); + } else { buffer->lb_buf = NULL; + buffer->lb_len = 0; + ma_need &= ~(MA_LOV | MA_LMV); + CDEBUG(D_INFO, "%s: RPC from %s: does not need LOVEA.\n", + mdt_obd_name(info->mti_mdt), + req->rq_export->exp_client_uuid.uuid); + } - /* If it is dir object and client require MEA, then we got MEA */ - if (S_ISDIR(lu_object_attr(&next->mo_lu)) && - reqbody->valid & OBD_MD_MEA) { - /* Assumption: MDT_MD size is enough for lmv size. */ - ma->ma_lmv = buffer->lb_buf; - ma->ma_lmv_size = buffer->lb_len; - ma->ma_need = MA_LMV | MA_INODE; - } else { - ma->ma_lmm = buffer->lb_buf; - ma->ma_lmm_size = buffer->lb_len; - ma->ma_need = MA_LOV | MA_INODE | MA_HSM; - } + /* If it is dir object and client require MEA, then we got MEA */ + if (S_ISDIR(lu_object_attr(&next->mo_lu)) && + (reqbody->valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) { + /* Assumption: MDT_MD size is enough for lmv size. */ + ma->ma_lmv = buffer->lb_buf; + ma->ma_lmv_size = buffer->lb_len; + ma->ma_need = MA_INODE; + if (ma->ma_lmv_size > 0) { + if (reqbody->valid & OBD_MD_MEA) + ma->ma_need |= MA_LMV; + else if (reqbody->valid & OBD_MD_DEFAULT_MEA) + ma->ma_need |= MA_LMV_DEF; + } + } else { + ma->ma_lmm = buffer->lb_buf; + ma->ma_lmm_size = buffer->lb_len; + ma->ma_need = MA_INODE | MA_HSM; + if (ma->ma_lmm_size > 0) + ma->ma_need |= MA_LOV; + } if (S_ISDIR(lu_object_attr(&next->mo_lu)) && reqbody->valid & OBD_MD_FLDIREA && @@ -793,7 +871,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, root = mdt_object_find(env, mdt, &rootfid); if (IS_ERR(root)) RETURN(PTR_ERR(root)); - rc = mdt_attr_get_lov(info, root, ma); + rc = mdt_stripe_get(info, root, ma, XATTR_NAME_LOV); mdt_object_put(info->mti_env, root); if (unlikely(rc)) { CERROR("%s: getattr error for "DFID": rc = %d\n", @@ -811,18 +889,31 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, if (mdt_body_has_lov(la, reqbody)) { if (ma->ma_valid & MA_LOV) { LASSERT(ma->ma_lmm_size); - mdt_dump_lmm(D_INFO, ma->ma_lmm); repbody->eadatasize = ma->ma_lmm_size; if (S_ISDIR(la->la_mode)) repbody->valid |= OBD_MD_FLDIREA; else repbody->valid |= OBD_MD_FLEASIZE; + mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->valid); } - if (ma->ma_valid & MA_LMV) { - LASSERT(S_ISDIR(la->la_mode)); - repbody->eadatasize = ma->ma_lmv_size; - repbody->valid |= (OBD_MD_FLDIREA|OBD_MD_MEA); - } + if (ma->ma_valid & MA_LMV) { + /* Return -ENOTSUPP for old client */ + if (!mdt_is_striped_client(req->rq_export)) + RETURN(-ENOTSUPP); + + LASSERT(S_ISDIR(la->la_mode)); + mdt_dump_lmv(D_INFO, ma->ma_lmv); + repbody->eadatasize = ma->ma_lmv_size; + repbody->valid |= (OBD_MD_FLDIREA|OBD_MD_MEA); + } + if (ma->ma_valid & MA_LMV_DEF) { + /* Return -ENOTSUPP for old client */ + if (!mdt_is_striped_client(req->rq_export)) + RETURN(-ENOTSUPP); + LASSERT(S_ISDIR(la->la_mode)); + repbody->eadatasize = ma->ma_lmv_size; + repbody->valid |= (OBD_MD_FLDIREA|OBD_MD_DEFAULT_MEA); + } } else if (S_ISLNK(la->la_mode) && reqbody->valid & OBD_MD_LINKNAME) { buffer->lb_buf = ma->ma_lmm; @@ -971,14 +1062,13 @@ static int mdt_renew_capa(struct mdt_thread_info *info) RETURN(rc); } -int mdt_getattr(struct tgt_session_info *tsi) +static int mdt_getattr(struct tgt_session_info *tsi) { struct mdt_thread_info *info = tsi2mdt_info(tsi); struct mdt_object *obj = info->mti_object; struct req_capsule *pill = info->mti_pill; struct mdt_body *reqbody; struct mdt_body *repbody; - mode_t mode; int rc, rc2; ENTRY; @@ -996,13 +1086,36 @@ int mdt_getattr(struct tgt_session_info *tsi) LASSERT(obj != NULL); LASSERT(lu_object_assert_exists(&obj->mot_obj)); - mode = lu_object_attr(&obj->mot_obj); + /* Unlike intent case where we need to pre-fill out buffers early on + * in intent policy for ldlm reasons, here we can have a much better + * guess at EA size by just reading it from disk. + * Exceptions are readdir and (missing) directory striping */ + /* Readlink */ + if (reqbody->valid & OBD_MD_LINKNAME) { + /* No easy way to know how long is the symlink, but it cannot + * be more than PATH_MAX, so we allocate +1 */ + rc = PATH_MAX + 1; + + /* A special case for fs ROOT: getattr there might fetch + * default EA for entire fs, not just for this dir! + */ + } else if (lu_fid_eq(mdt_object_fid(obj), + &info->mti_mdt->mdt_md_root_fid) && + (reqbody->valid & OBD_MD_FLDIREA) && + (lustre_msg_get_opc(mdt_info_req(info)->rq_reqmsg) == + MDS_GETATTR)) { + /* Should the default strping be bigger, mdt_fix_reply + * will reallocate */ + rc = DEF_REP_MD_SIZE; + } else { + /* Read the actual EA size from disk */ + rc = mdt_attr_get_eabuf_size(info, obj); + } + + if (rc < 0) + GOTO(out_shrink, rc); - /* old clients may not report needed easize, use max value then */ - req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, - reqbody->eadatasize == 0 ? - info->mti_mdt->mdt_max_mdsize : - reqbody->eadatasize); + req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc); rc = req_capsule_server_pack(pill); if (unlikely(rc != 0)) @@ -1041,7 +1154,7 @@ out: return rc; } -int mdt_is_subdir(struct tgt_session_info *tsi) +static int mdt_is_subdir(struct tgt_session_info *tsi) { struct mdt_thread_info *info = tsi2mdt_info(tsi); struct mdt_object *o = info->mti_object; @@ -1070,7 +1183,7 @@ int mdt_is_subdir(struct tgt_session_info *tsi) RETURN(rc); } -int mdt_swap_layouts(struct tgt_session_info *tsi) +static int mdt_swap_layouts(struct tgt_session_info *tsi) { struct mdt_thread_info *info; struct ptlrpc_request *req = tgt_ses_req(tsi); @@ -1092,6 +1205,10 @@ int mdt_swap_layouts(struct tgt_session_info *tsi) RETURN(-EOPNOTSUPP); info = tsi2mdt_info(tsi); + + if (info->mti_dlm_req != NULL) + ldlm_request_cancel(req, info->mti_dlm_req, 0); + if (req_capsule_get_size(info->mti_pill, &RMF_CAPA1, RCL_CLIENT)) mdt_set_capainfo(info, 0, &info->mti_body->fid1, req_capsule_client_get(info->mti_pill, @@ -1221,94 +1338,38 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, struct md_object *next = mdt_object_child(parent); struct lu_fid *child_fid = &info->mti_tmp_fid1; struct lu_name *lname = NULL; - const char *name = NULL; - int namelen = 0; struct mdt_lock_handle *lhp = NULL; struct ldlm_lock *lock; - struct ldlm_res_id *res_id; - int is_resent; - int ma_need = 0; - int rc; - - ENTRY; - - is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh); - LASSERT(ergo(is_resent, - lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)); - - LASSERT(parent != NULL); - name = req_capsule_client_get(info->mti_pill, &RMF_NAME); - if (name == NULL) - RETURN(err_serious(-EFAULT)); + bool is_resent; + bool try_layout; + int ma_need = 0; + int rc; + ENTRY; - namelen = req_capsule_get_size(info->mti_pill, &RMF_NAME, - RCL_CLIENT) - 1; - if (!info->mti_cross_ref) { - /* - * XXX: Check for "namelen == 0" is for getattr by fid - * (OBD_CONNECT_ATTRFID), otherwise do not allow empty name, - * that is the name must contain at least one character and - * the terminating '\0' - */ - if (namelen == 0) { - reqbody = req_capsule_client_get(info->mti_pill, - &RMF_MDT_BODY); - if (unlikely(reqbody == NULL)) - RETURN(err_serious(-EFAULT)); - - if (unlikely(!fid_is_sane(&reqbody->fid2))) - RETURN(err_serious(-EINVAL)); - - name = NULL; - CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", " - "ldlm_rep = %p\n", - PFID(mdt_object_fid(parent)), - PFID(&reqbody->fid2), ldlm_rep); - } else { - lname = mdt_name(info->mti_env, (char *)name, namelen); - CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, " - "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)), - name, ldlm_rep); - } - } - mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD); + is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh); + LASSERT(ergo(is_resent, + lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)); - if (unlikely(!mdt_object_exists(parent)) && lname) { - LU_OBJECT_DEBUG(D_INODE, info->mti_env, - &parent->mot_obj, - "Parent doesn't exist!\n"); - RETURN(-ESTALE); - } else if (!info->mti_cross_ref) { - LASSERTF(!mdt_object_remote(parent), - "Parent "DFID" is on remote server\n", - PFID(mdt_object_fid(parent))); - } - if (lname) { - rc = mdt_raw_lookup(info, parent, lname, ldlm_rep); - if (rc != 0) { - if (rc > 0) - rc = 0; - RETURN(rc); - } - } + LASSERT(parent != NULL); - if (info->mti_cross_ref) { - /* Only getattr on the child. Parent is on another node. */ - mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); - child = parent; - CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", " - "ldlm_rep=%p\n", PFID(mdt_object_fid(child)), ldlm_rep); - - if (is_resent) { - /* Do not take lock for resent case. */ - lock = ldlm_handle2lock(&lhc->mlh_reg_lh); - LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n", - lhc->mlh_reg_lh.cookie); - LASSERT(fid_res_name_eq(mdt_object_fid(child), - &lock->l_resource->lr_name)); - LDLM_LOCK_PUT(lock); - rc = 0; - } else { + if (info->mti_cross_ref) { + /* Only getattr on the child. Parent is on another node. */ + mdt_set_disposition(info, ldlm_rep, + DISP_LOOKUP_EXECD | DISP_LOOKUP_POS); + child = parent; + CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", " + "ldlm_rep = %p\n", + PFID(mdt_object_fid(child)), ldlm_rep); + + if (is_resent) { + /* Do not take lock for resent case. */ + lock = ldlm_handle2lock(&lhc->mlh_reg_lh); + LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n", + lhc->mlh_reg_lh.cookie); + LASSERT(fid_res_name_eq(mdt_object_fid(child), + &lock->l_resource->lr_name)); + LDLM_LOCK_PUT(lock); + } else { mdt_lock_handle_init(lhc); mdt_lock_reg_init(lhc, LCK_PR); @@ -1322,23 +1383,78 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, rc = mdt_object_lock(info, child, lhc, child_bits, MDT_LOCAL_LOCK); + if (rc < 0) + RETURN(rc); } - if (rc == 0) { - /* Finally, we can get attr for child. */ - mdt_set_capainfo(info, 0, mdt_object_fid(child), - BYPASS_CAPA); - rc = mdt_getattr_internal(info, child, 0); - if (unlikely(rc != 0)) - mdt_object_unlock(info, child, lhc, 1); - } + + /* Finally, we can get attr for child. */ + if (!mdt_object_exists(child)) { + LU_OBJECT_DEBUG(D_INFO, info->mti_env, + &child->mot_obj, + "remote object doesn't exist.\n"); + mdt_object_unlock(info, child, lhc, 1); + RETURN(-ENOENT); + } + + mdt_set_capainfo(info, 0, mdt_object_fid(child), BYPASS_CAPA); + rc = mdt_getattr_internal(info, child, 0); + if (unlikely(rc != 0)) + mdt_object_unlock(info, child, lhc, 1); + RETURN(rc); } - if (lname) { - /* step 1: lock parent only if parent is a directory */ + lname = &info->mti_name; + mdt_name_unpack(info->mti_pill, &RMF_NAME, lname, MNF_FIX_ANON); + + if (lu_name_is_valid(lname)) { + CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", " + "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)), + PNAME(lname), ldlm_rep); + } else { + reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); + if (unlikely(reqbody == NULL)) + RETURN(err_serious(-EPROTO)); + + *child_fid = reqbody->fid2; + + if (unlikely(!fid_is_sane(child_fid))) + RETURN(err_serious(-EINVAL)); + + CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", " + "ldlm_rep = %p\n", + PFID(mdt_object_fid(parent)), + PFID(&reqbody->fid2), ldlm_rep); + } + + mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD); + + if (unlikely(!mdt_object_exists(parent)) && lu_name_is_valid(lname)) { + LU_OBJECT_DEBUG(D_INODE, info->mti_env, + &parent->mot_obj, + "Parent doesn't exist!\n"); + RETURN(-ESTALE); + } + + if (mdt_object_remote(parent)) { + CERROR("%s: parent "DFID" is on remote target\n", + mdt_obd_name(info->mti_mdt), + PFID(mdt_object_fid(parent))); + RETURN(-EIO); + } + + if (lu_name_is_valid(lname)) { + rc = mdt_raw_lookup(info, parent, lname, ldlm_rep); + if (rc != 0) { + if (rc > 0) + rc = 0; + RETURN(rc); + } + + /* step 1: lock parent only if parent is a directory */ if (S_ISDIR(lu_object_attr(&parent->mot_obj))) { - lhp = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lhp, LCK_PR, name, namelen); + lhp = &info->mti_lh[MDT_LH_PARENT]; + mdt_lock_pdo_init(lhp, LCK_PR, lname); rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE, MDT_LOCAL_LOCK); @@ -1350,18 +1466,14 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, fid_zero(child_fid); rc = mdo_lookup(info->mti_env, next, lname, child_fid, &info->mti_spec); + if (rc == -ENOENT) + mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); - if (rc != 0) { - if (rc == -ENOENT) - mdt_set_disposition(info, ldlm_rep, - DISP_LOOKUP_NEG); - GOTO(out_parent, rc); - } else - mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); - } else { - *child_fid = reqbody->fid2; - mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); - } + if (rc != 0) + GOTO(out_parent, rc); + } + + mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); /* *step 3: find the child object by fid & lock it. @@ -1388,7 +1500,6 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n", lhc->mlh_reg_lh.cookie); - res_id = &lock->l_resource->lr_name; if (!fid_res_name_eq(mdt_object_fid(child), &lock->l_resource->lr_name)) { LASSERTF(fid_res_name_eq(mdt_object_fid(parent), @@ -1405,14 +1516,12 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, GOTO(relock, 0); } LDLM_LOCK_PUT(lock); - rc = 0; } else { - bool try_layout = false; - relock: OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2); mdt_lock_handle_init(lhc); mdt_lock_reg_init(lhc, LCK_PR); + try_layout = false; if (!mdt_object_exists(child)) { LU_OBJECT_DEBUG(D_INODE, info->mti_env, @@ -1469,6 +1578,10 @@ relock: ma_need |= MA_LOV; } } else { + /* Do not enqueue the UPDATE lock from MDT(cross-MDT), + * client will enqueue the lock to the remote MDT */ + if (mdt_object_remote(child)) + child_bits &= ~MDS_INODELOCK_UPDATE; rc = mdt_object_lock(info, child, lhc, child_bits, MDT_CROSS_LOCK); } @@ -1490,7 +1603,6 @@ relock: mdt_object_unlock(info, child, lhc, 1); } else if (lock) { /* Debugging code. */ - res_id = &lock->l_resource->lr_name; LDLM_DEBUG(lock, "Returning lock to client"); LASSERTF(fid_res_name_eq(mdt_object_fid(child), &lock->l_resource->lr_name), @@ -1513,7 +1625,7 @@ out_parent: } /* normal handler: should release the child lock */ -int mdt_getattr_name(struct tgt_session_info *tsi) +static int mdt_getattr_name(struct tgt_session_info *tsi) { struct mdt_thread_info *info = tsi2mdt_info(tsi); struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD]; @@ -1554,7 +1666,7 @@ out_shrink: static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void *uarg); -int mdt_set_info(struct tgt_session_info *tsi) +static int mdt_set_info(struct tgt_session_info *tsi) { struct ptlrpc_request *req = tgt_ses_req(tsi); char *key; @@ -1612,7 +1724,7 @@ int mdt_set_info(struct tgt_session_info *tsi) RETURN(rc); } -int mdt_readpage(struct tgt_session_info *tsi) +static int mdt_readpage(struct tgt_session_info *tsi) { struct mdt_thread_info *info = mdt_th_info(tsi->tsi_env); struct mdt_object *object = mdt_obj(tsi->tsi_corpus); @@ -1701,7 +1813,7 @@ static int mdt_reint_internal(struct mdt_thread_info *info, /* for replay (no_create) lmm is not needed, client has it already */ if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, - info->mti_rr.rr_eadatalen); + DEF_REP_MD_SIZE); /* llog cookies are always 0, the field is kept for compatibility */ if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) @@ -1736,7 +1848,8 @@ static int mdt_reint_internal(struct mdt_thread_info *info, GOTO(out_ucred, rc = err_serious(rc)); if (mdt_check_resent(info, mdt_reconstruct, lhc)) { - rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg); + DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt."); + rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg); GOTO(out_ucred, rc); } rc = mdt_reint_rec(info, lhc); @@ -1777,7 +1890,7 @@ static long mdt_reint_opcode(struct ptlrpc_request *req, return opc; } -int mdt_reint(struct tgt_session_info *tsi) +static int mdt_reint(struct tgt_session_info *tsi) { long opc; int rc; @@ -1789,7 +1902,8 @@ int mdt_reint(struct tgt_session_info *tsi) [REINT_RENAME] = &RQF_MDS_REINT_RENAME, [REINT_OPEN] = &RQF_MDS_REINT_OPEN, [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR, - [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK + [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK, + [REINT_MIGRATE] = &RQF_MDS_REINT_RENAME }; ENTRY; @@ -1840,7 +1954,7 @@ static int mdt_object_sync(struct mdt_thread_info *info) RETURN(rc); } -int mdt_sync(struct tgt_session_info *tsi) +static int mdt_sync(struct tgt_session_info *tsi) { struct ptlrpc_request *req = tgt_ses_req(tsi); struct req_capsule *pill = tsi->tsi_pill; @@ -1886,7 +2000,7 @@ int mdt_sync(struct tgt_session_info *tsi) * Handle quota control requests to consult current usage/limit, but also * to configure quota enforcement */ -int mdt_quotactl(struct tgt_session_info *tsi) +static int mdt_quotactl(struct tgt_session_info *tsi) { struct obd_export *exp = tsi->tsi_exp; struct req_capsule *pill = tsi->tsi_pill; @@ -2038,7 +2152,7 @@ static int mdt_llog_ctxt_unclone(const struct lu_env *env, /* * sec context handlers */ -int mdt_sec_ctx_handle(struct tgt_session_info *tsi) +static int mdt_sec_ctx_handle(struct tgt_session_info *tsi) { int rc; @@ -2060,7 +2174,7 @@ int mdt_sec_ctx_handle(struct tgt_session_info *tsi) /* * quota request handlers */ -int mdt_quota_dqacq(struct tgt_session_info *tsi) +static int mdt_quota_dqacq(struct tgt_session_info *tsi) { struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp); struct lu_device *qmt = mdt->mdt_qmt_dev; @@ -2212,8 +2326,9 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, RETURN(rc); } -int mdt_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, int flag) +/* Used for cross-MDT lock */ +int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, int flag) { struct lustre_handle lockh; int rc; @@ -2237,23 +2352,29 @@ int mdt_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, } int mdt_remote_object_lock(struct mdt_thread_info *mti, - struct mdt_object *o, struct lustre_handle *lh, - ldlm_mode_t mode, __u64 ibits) + struct mdt_object *o, const struct lu_fid *fid, + struct lustre_handle *lh, ldlm_mode_t mode, + __u64 ibits) { struct ldlm_enqueue_info *einfo = &mti->mti_einfo; ldlm_policy_data_t *policy = &mti->mti_policy; + struct ldlm_res_id *res_id = &mti->mti_res_id; int rc = 0; ENTRY; LASSERT(mdt_object_remote(o)); - LASSERT(ibits & MDS_INODELOCK_UPDATE); + LASSERT(ibits == MDS_INODELOCK_UPDATE); + + fid_build_reg_res_name(fid, res_id); memset(einfo, 0, sizeof(*einfo)); einfo->ei_type = LDLM_IBITS; einfo->ei_mode = mode; - einfo->ei_cb_bl = mdt_md_blocking_ast; + einfo->ei_cb_bl = mdt_remote_blocking_ast; einfo->ei_cb_cp = ldlm_completion_ast; + einfo->ei_enq_slave = 0; + einfo->ei_res_id = res_id; memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = ibits; @@ -2263,9 +2384,10 @@ int mdt_remote_object_lock(struct mdt_thread_info *mti, RETURN(rc); } -static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 ibits, - bool nonblock, int locality) +static int mdt_object_local_lock(struct mdt_thread_info *info, + struct mdt_object *o, + struct mdt_lock_handle *lh, __u64 ibits, + bool nonblock, int locality) { struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; ldlm_policy_data_t *policy = &info->mti_policy; @@ -2279,23 +2401,9 @@ static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o, LASSERT(lh->mlh_reg_mode != LCK_MINMODE); LASSERT(lh->mlh_type != MDT_NUL_LOCK); - if (mdt_object_remote(o)) { - if (locality == MDT_CROSS_LOCK) { - ibits &= ~(MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM | - MDS_INODELOCK_LAYOUT); - ibits |= MDS_INODELOCK_LOOKUP; - } else { - LASSERTF(!(ibits & - (MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM | - MDS_INODELOCK_LAYOUT)), - "%s: wrong bit "LPX64" for remote obj "DFID"\n", - mdt_obd_name(info->mti_mdt), ibits, - PFID(mdt_object_fid(o))); - LASSERT(ibits & MDS_INODELOCK_LOOKUP); - } - /* No PDO lock on remote object */ - LASSERT(lh->mlh_type != MDT_PDO_LOCK); - } + /* Only enqueue LOOKUP lock for remote object */ + if (mdt_object_remote(o)) + LASSERT(ibits == MDS_INODELOCK_LOOKUP); if (lh->mlh_type == MDT_PDO_LOCK) { /* check for exists after object is locked */ @@ -2330,9 +2438,10 @@ static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o, * want it slowed down due to possible cancels. */ policy->l_inodebits.bits = MDS_INODELOCK_UPDATE; - rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, - policy, res_id, dlmflags, - &info->mti_exp->exp_handle.h_cookie); + rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, + policy, res_id, dlmflags, + info->mti_exp == NULL ? NULL : + &info->mti_exp->exp_handle.h_cookie); if (unlikely(rc)) RETURN(rc); } @@ -2351,9 +2460,10 @@ static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o, * going to be sent to client. If it is - mdt_intent_policy() path will * fix it up and turn FL_LOCAL flag off. */ - rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, - res_id, LDLM_FL_LOCAL_ONLY | dlmflags, - &info->mti_exp->exp_handle.h_cookie); + rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, + res_id, LDLM_FL_LOCAL_ONLY | dlmflags, + info->mti_exp == NULL ? NULL : + &info->mti_exp->exp_handle.h_cookie); if (rc) mdt_object_unlock(info, o, lh, 1); else if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)) && @@ -2365,10 +2475,66 @@ static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o, RETURN(rc); } +static int +mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o, + struct mdt_lock_handle *lh, __u64 ibits, + bool nonblock, int locality) +{ + int rc; + ENTRY; + + if (!mdt_object_remote(o)) + return mdt_object_local_lock(info, o, lh, ibits, nonblock, + locality); + + if (locality == MDT_LOCAL_LOCK) { + CERROR("%s: try to get local lock for remote object" + DFID".\n", mdt_obd_name(info->mti_mdt), + PFID(mdt_object_fid(o))); + RETURN(-EPROTO); + } + + /* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */ + ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT | + MDS_INODELOCK_XATTR); + if (ibits & MDS_INODELOCK_UPDATE) { + /* Sigh, PDO needs to enqueue 2 locks right now, but + * enqueue RPC can only request 1 lock, to avoid extra + * RPC, so it will instead enqueue EX lock for remote + * object anyway XXX*/ + if (lh->mlh_type == MDT_PDO_LOCK && + lh->mlh_pdo_hash != 0) { + CDEBUG(D_INFO, "%s: "DFID" convert PDO lock to" + "EX lock.\n", mdt_obd_name(info->mti_mdt), + PFID(mdt_object_fid(o))); + lh->mlh_pdo_hash = 0; + lh->mlh_rreg_mode = LCK_EX; + lh->mlh_type = MDT_REG_LOCK; + } + rc = mdt_remote_object_lock(info, o, mdt_object_fid(o), + &lh->mlh_rreg_lh, + lh->mlh_rreg_mode, + MDS_INODELOCK_UPDATE); + if (rc != ELDLM_OK) + RETURN(rc); + } + + /* Only enqueue LOOKUP lock for remote object */ + if (ibits & MDS_INODELOCK_LOOKUP) { + rc = mdt_object_local_lock(info, o, lh, + MDS_INODELOCK_LOOKUP, + nonblock, locality); + if (rc != ELDLM_OK) + RETURN(rc); + } + + RETURN(0); +} + int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, struct mdt_lock_handle *lh, __u64 ibits, int locality) { - return mdt_object_lock0(info, o, lh, ibits, false, locality); + return mdt_object_lock_internal(info, o, lh, ibits, false, locality); } int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o, @@ -2377,7 +2543,7 @@ int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o, struct mdt_lock_handle tmp = *lh; int rc; - rc = mdt_object_lock0(info, o, &tmp, ibits, true, locality); + rc = mdt_object_lock_internal(info, o, &tmp, ibits, true, locality); if (rc == 0) *lh = tmp; @@ -2582,7 +2748,7 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags) /* Pack reply. */ if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, - info->mti_body->eadatasize); + DEF_REP_MD_SIZE); if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0); @@ -2655,9 +2821,11 @@ void mdt_thread_info_init(struct ptlrpc_request *req, info->mti_opdata = 0; info->mti_big_lmm_used = 0; - /* To not check for split by default. */ info->mti_spec.no_create = 0; info->mti_spec.sp_rm_entry = 0; + + info->mti_spec.u.sp_ea.eadata = NULL; + info->mti_spec.u.sp_ea.eadatalen = 0; } void mdt_thread_info_fini(struct mdt_thread_info *info) @@ -2710,7 +2878,7 @@ struct mdt_thread_info *tsi2mdt_info(struct tgt_session_info *tsi) return mti; } -int mdt_tgt_connect(struct tgt_session_info *tsi) +static int mdt_tgt_connect(struct tgt_session_info *tsi) { struct ptlrpc_request *req = tgt_ses_req(tsi); int rc; @@ -2837,11 +3005,10 @@ static struct mdt_it_flavor { } }; -int mdt_intent_lock_replace(struct mdt_thread_info *info, - struct ldlm_lock **lockp, - struct ldlm_lock *new_lock, - struct mdt_lock_handle *lh, - __u64 flags) +static int +mdt_intent_lock_replace(struct mdt_thread_info *info, struct ldlm_lock **lockp, + struct ldlm_lock *new_lock, struct mdt_lock_handle *lh, + __u64 flags) { struct ptlrpc_request *req = mdt_info_req(info); struct ldlm_lock *lock = *lockp; @@ -2923,9 +3090,10 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info, } static void mdt_intent_fixup_resent(struct mdt_thread_info *info, - struct ldlm_lock *new_lock, - struct ldlm_lock **old_lock, - struct mdt_lock_handle *lh) + struct ldlm_lock *new_lock, + struct ldlm_lock **old_lock, + struct mdt_lock_handle *lh, + enum mdt_it_code opcode) { struct ptlrpc_request *req = mdt_info_req(info); struct obd_export *exp = req->rq_export; @@ -2938,29 +3106,37 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info, dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ); remote_hdl = dlmreq->lock_handle[0]; - - /* In the function below, .hs_keycmp resolves to - * ldlm_export_lock_keycmp() */ - /* coverity[overrun-buffer-val] */ - lock = cfs_hash_lookup(exp->exp_lock_hash, &remote_hdl); - if (lock) { - if (lock != new_lock) { - lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie; - lh->mlh_reg_mode = lock->l_granted_mode; - - LDLM_DEBUG(lock, "Restoring lock cookie"); - DEBUG_REQ(D_DLMTRACE, req, - "restoring lock cookie "LPX64, - lh->mlh_reg_lh.cookie); - if (old_lock) - *old_lock = LDLM_LOCK_GET(lock); - cfs_hash_put(exp->exp_lock_hash, &lock->l_exp_hash); - return; - } - - cfs_hash_put(exp->exp_lock_hash, &lock->l_exp_hash); - } - + /* If the client does not require open lock, it does not need to + * search lock in exp_lock_hash, since the server thread will + * make sure the lock will be released, and the resend request + * can always re-enqueue the lock */ + if ((opcode != MDT_IT_OPEN) || (opcode == MDT_IT_OPEN && + info->mti_spec.sp_cr_flags & MDS_OPEN_LOCK)) { + /* In the function below, .hs_keycmp resolves to + * ldlm_export_lock_keycmp() */ + /* coverity[overrun-buffer-val] */ + lock = cfs_hash_lookup(exp->exp_lock_hash, &remote_hdl); + if (lock) { + lock_res_and_lock(lock); + if (lock != new_lock) { + lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie; + lh->mlh_reg_mode = lock->l_granted_mode; + + LDLM_DEBUG(lock, "Restoring lock cookie"); + DEBUG_REQ(D_DLMTRACE, req, + "restoring lock cookie "LPX64, + lh->mlh_reg_lh.cookie); + if (old_lock) + *old_lock = LDLM_LOCK_GET(lock); + cfs_hash_put(exp->exp_lock_hash, + &lock->l_exp_hash); + unlock_res_and_lock(lock); + return; + } + cfs_hash_put(exp->exp_lock_hash, &lock->l_exp_hash); + unlock_res_and_lock(lock); + } + } /* * If the xid matches, then we know this is a resent request, and allow * it. (It's probably an OPEN, for which we don't send a lock. @@ -2993,7 +3169,7 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode, * (for the resend case) or a new lock. Below we will use it to * replace the original lock. */ - mdt_intent_fixup_resent(info, *lockp, NULL, lhc); + mdt_intent_fixup_resent(info, *lockp, NULL, lhc, opcode); if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) { mdt_lock_reg_init(lhc, (*lockp)->l_req_mode); rc = mdt_object_lock(info, info->mti_object, lhc, @@ -3026,7 +3202,6 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, struct ldlm_lock *new_lock = NULL; __u64 child_bits; struct ldlm_reply *ldlm_rep; - struct ptlrpc_request *req; struct mdt_body *reqbody; struct mdt_body *repbody; int rc, rc2; @@ -3059,12 +3234,11 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, if (rc) GOTO(out_shrink, rc); - req = info->mti_pill->rc_req; ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD); - /* Get lock from request for possible resent case. */ - mdt_intent_fixup_resent(info, *lockp, &new_lock, lhc); + /* Get lock from request for possible resent case. */ + mdt_intent_fixup_resent(info, *lockp, &new_lock, lhc, opcode); rc = mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep); ldlm_rep->lock_policy_res2 = clear_serious(rc); @@ -3097,8 +3271,7 @@ static int mdt_intent_layout(enum mdt_it_code opcode, struct layout_intent *layout; struct lu_fid *fid; struct mdt_object *obj = NULL; - struct md_object *child = NULL; - int rc; + int rc = 0; ENTRY; if (opcode != MDT_IT_LAYOUT) { @@ -3115,11 +3288,10 @@ static int mdt_intent_layout(enum mdt_it_code opcode, RETURN(PTR_ERR(obj)); if (mdt_object_exists(obj) && !mdt_object_remote(obj)) { - child = mdt_object_child(obj); - /* get the length of lsm */ - rc = mo_xattr_get(info->mti_env, child, &LU_BUF_NULL, - XATTR_NAME_LOV); + rc = mdt_attr_get_eabuf_size(info, obj); + if (rc < 0) + RETURN(rc); if (rc > info->mti_mdt->mdt_max_mdsize) info->mti_mdt->mdt_max_mdsize = rc; @@ -3128,8 +3300,7 @@ static int mdt_intent_layout(enum mdt_it_code opcode, mdt_object_put(info->mti_env, obj); (*lockp)->l_lvb_type = LVB_T_LAYOUT; - req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER, - ldlm_lvbo_size(*lockp)); + req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER, rc); rc = req_capsule_server_pack(info->mti_pill); if (rc != 0) RETURN(-EINVAL); @@ -3172,8 +3343,8 @@ static int mdt_intent_reint(enum mdt_it_code opcode, RETURN(err_serious(-EPROTO)); } - /* Get lock from request for possible resent case. */ - mdt_intent_fixup_resent(info, *lockp, NULL, lhc); + /* Get lock from request for possible resent case. */ + mdt_intent_fixup_resent(info, *lockp, NULL, lhc, opcode); rc = mdt_reint_internal(info, lhc, opc); @@ -3393,97 +3564,199 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, RETURN(rc); } -static int mdt_seq_fini(const struct lu_env *env, - struct mdt_device *m) +static void mdt_deregister_seq_exp(struct mdt_device *mdt) { - return seq_site_fini(env, mdt_seq_site(m)); + struct seq_server_site *ss = mdt_seq_site(mdt); + + if (ss->ss_node_id == 0) + return; + + if (ss->ss_client_seq != NULL) { + lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp); + ss->ss_client_seq->lcs_exp = NULL; + } + + if (ss->ss_server_fld != NULL) { + lustre_deregister_lwp_item(&ss->ss_server_fld->lsf_control_exp); + ss->ss_server_fld->lsf_control_exp = NULL; + } } -static int mdt_seq_init(const struct lu_env *env, - const char *uuid, - struct mdt_device *m) +static void mdt_seq_fini_cli(struct mdt_device *mdt) { - struct seq_server_site *ss; - char *prefix; - int rc; + struct seq_server_site *ss = mdt_seq_site(mdt); + + if (ss == NULL) + return; + + if (ss->ss_server_seq != NULL) + seq_server_set_cli(NULL, ss->ss_server_seq, NULL); +} + +static int mdt_seq_fini(const struct lu_env *env, struct mdt_device *mdt) +{ + mdt_seq_fini_cli(mdt); + mdt_deregister_seq_exp(mdt); + + return seq_site_fini(env, mdt_seq_site(mdt)); +} + +/** + * It will retrieve its FLDB entries from MDT0, and it only happens + * when upgrading existent FS to 2.6 or when local FLDB is corrupted, + * and it needs to refresh FLDB from the MDT0. + **/ +static int mdt_register_lwp_callback(void *data) +{ + struct lu_env env; + struct mdt_device *mdt = data; + struct lu_server_fld *fld = mdt_seq_site(mdt)->ss_server_fld; + int rc; ENTRY; - ss = mdt_seq_site(m); + LASSERT(mdt_seq_site(mdt)->ss_node_id != 0); - /* - * This is sequence-controller node. Init seq-controller server on local - * MDT. - */ - if (ss->ss_node_id == 0) { - LASSERT(ss->ss_control_seq == NULL); + if (!likely(fld->lsf_new)) + RETURN(0); - OBD_ALLOC_PTR(ss->ss_control_seq); - if (ss->ss_control_seq == NULL) - RETURN(-ENOMEM); + rc = lu_env_init(&env, LCT_MD_THREAD); + if (rc) { + CERROR("%s: cannot init env: rc = %d\n", mdt_obd_name(mdt), rc); + RETURN(rc); + } - rc = seq_server_init(ss->ss_control_seq, - m->mdt_bottom, uuid, - LUSTRE_SEQ_CONTROLLER, - ss, - env); + rc = fld_update_from_controller(&env, fld); + if (rc != 0) { + CERROR("%s: cannot update controller: rc = %d\n", + mdt_obd_name(mdt), rc); + GOTO(out, rc); + } +out: + lu_env_fini(&env); + RETURN(rc); +} - if (rc) - GOTO(out_seq_fini, rc); +static int mdt_register_seq_exp(struct mdt_device *mdt) +{ + struct seq_server_site *ss = mdt_seq_site(mdt); + char *lwp_name = NULL; + int rc; - OBD_ALLOC_PTR(ss->ss_client_seq); - if (ss->ss_client_seq == NULL) - GOTO(out_seq_fini, rc = -ENOMEM); + if (ss->ss_node_id == 0) + return 0; - OBD_ALLOC(prefix, MAX_OBD_NAME + 5); - if (prefix == NULL) { - OBD_FREE_PTR(ss->ss_client_seq); - GOTO(out_seq_fini, rc = -ENOMEM); - } + OBD_ALLOC(lwp_name, MAX_OBD_NAME); + if (lwp_name == NULL) + GOTO(out_free, rc = -ENOMEM); - snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s", - uuid); + rc = tgt_name2lwp_name(mdt_obd_name(mdt), lwp_name, MAX_OBD_NAME, 0); + if (rc != 0) + GOTO(out_free, rc); - /* - * Init seq-controller client after seq-controller server is - * ready. Pass ss->ss_control_seq to it for direct talking. - */ - rc = seq_client_init(ss->ss_client_seq, NULL, - LUSTRE_SEQ_METADATA, prefix, - ss->ss_control_seq); - OBD_FREE(prefix, MAX_OBD_NAME + 5); + rc = lustre_register_lwp_item(lwp_name, &ss->ss_client_seq->lcs_exp, + NULL, NULL); + if (rc != 0) + GOTO(out_free, rc); + + rc = lustre_register_lwp_item(lwp_name, + &ss->ss_server_fld->lsf_control_exp, + mdt_register_lwp_callback, mdt); + if (rc != 0) { + lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp); + ss->ss_client_seq->lcs_exp = NULL; + GOTO(out_free, rc); + } +out_free: + if (lwp_name != NULL) + OBD_FREE(lwp_name, MAX_OBD_NAME); + + return rc; +} + +/* + * Init client sequence manager which is used by local MDS to talk to sequence + * controller on remote node. + */ +static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt) +{ + struct seq_server_site *ss = mdt_seq_site(mdt); + int rc; + char *prefix; + ENTRY; + + /* check if this is adding the first MDC and controller is not yet + * initialized. */ + OBD_ALLOC_PTR(ss->ss_client_seq); + if (ss->ss_client_seq == NULL) + RETURN(-ENOMEM); + + OBD_ALLOC(prefix, MAX_OBD_NAME + 5); + if (prefix == NULL) { + OBD_FREE_PTR(ss->ss_client_seq); + ss->ss_client_seq = NULL; + RETURN(-ENOMEM); + } + + /* Note: seq_client_fini will be called in seq_site_fini */ + snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s", mdt_obd_name(mdt)); + rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA, + prefix, ss->ss_node_id == 0 ? ss->ss_control_seq : + NULL); + OBD_FREE(prefix, MAX_OBD_NAME + 5); + if (rc != 0) { + OBD_FREE_PTR(ss->ss_client_seq); + ss->ss_client_seq = NULL; + RETURN(rc); + } + + rc = seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq); + + RETURN(rc); +} + +static int mdt_seq_init(const struct lu_env *env, struct mdt_device *mdt) +{ + struct seq_server_site *ss; + int rc; + ENTRY; + + ss = mdt_seq_site(mdt); + /* init sequence controller server(MDT0) */ + if (ss->ss_node_id == 0) { + OBD_ALLOC_PTR(ss->ss_control_seq); + if (ss->ss_control_seq == NULL) + RETURN(-ENOMEM); + rc = seq_server_init(env, ss->ss_control_seq, mdt->mdt_bottom, + mdt_obd_name(mdt), LUSTRE_SEQ_CONTROLLER, + ss); if (rc) GOTO(out_seq_fini, rc); } - /* Init seq-server on local MDT */ - LASSERT(ss->ss_server_seq == NULL); - + /* Init normal sequence server */ OBD_ALLOC_PTR(ss->ss_server_seq); if (ss->ss_server_seq == NULL) GOTO(out_seq_fini, rc = -ENOMEM); - rc = seq_server_init(ss->ss_server_seq, - m->mdt_bottom, uuid, - LUSTRE_SEQ_SERVER, - ss, - env); + rc = seq_server_init(env, ss->ss_server_seq, mdt->mdt_bottom, + mdt_obd_name(mdt), LUSTRE_SEQ_SERVER, ss); if (rc) - GOTO(out_seq_fini, rc = -ENOMEM); + GOTO(out_seq_fini, rc); - /* Assign seq-controller client to local seq-server. */ - if (ss->ss_node_id == 0) { - LASSERT(ss->ss_client_seq != NULL); + /* init seq client for seq server to talk to seq controller(MDT0) */ + rc = mdt_seq_init_cli(env, mdt); + if (rc != 0) + GOTO(out_seq_fini, rc); - rc = seq_server_set_cli(ss->ss_server_seq, - ss->ss_client_seq, - env); - } + if (ss->ss_node_id != 0) + /* register controler export through lwp */ + rc = mdt_register_seq_exp(mdt); EXIT; out_seq_fini: if (rc) - mdt_seq_fini(env, m); + mdt_seq_fini(env, mdt); return rc; } @@ -3521,7 +3794,7 @@ static int mdt_fld_init(const struct lu_env *env, RETURN(rc = -ENOMEM); rc = fld_server_init(env, ss->ss_server_fld, m->mdt_bottom, uuid, - ss->ss_node_id, LU_SEQ_RANGE_MDT); + LU_SEQ_RANGE_MDT); if (rc) { OBD_FREE_PTR(ss->ss_server_fld); ss->ss_server_fld = NULL; @@ -3534,7 +3807,6 @@ static int mdt_fld_init(const struct lu_env *env, static void mdt_stack_pre_fini(const struct lu_env *env, struct mdt_device *m, struct lu_device *top) { - struct obd_device *obd; struct lustre_cfg_bufs *bufs; struct lustre_cfg *lcfg; struct mdt_thread_info *info; @@ -3549,7 +3821,6 @@ static void mdt_stack_pre_fini(const struct lu_env *env, LASSERT(m->mdt_child_exp); LASSERT(m->mdt_child_exp->exp_obd); - obd = m->mdt_child_exp->exp_obd; /* process cleanup, pass mdt obd name to get obd umount flags */ /* XXX: this is needed because all layers are referenced by @@ -3761,7 +4032,6 @@ static int mdt_stack_init(const struct lu_env *env, struct mdt_device *mdt, site->ls_top_dev = &mdt->mdt_lu_dev; mdt->mdt_child = lu2md_dev(mdt->mdt_child_exp->exp_obd->obd_lu_dev); - /* now connect to bottom OSD */ snprintf(name, MAX_OBD_NAME, "%s-osd", dev); rc = mdt_connect_to_next(env, mdt, name, &mdt->mdt_bottom_exp); @@ -3770,7 +4040,6 @@ static int mdt_stack_init(const struct lu_env *env, struct mdt_device *mdt, mdt->mdt_bottom = lu2dt_dev(mdt->mdt_bottom_exp->exp_obd->obd_lu_dev); - rc = lu_env_refill((struct lu_env *)env); if (rc != 0) CERROR("Failure to refill session: '%d'\n", rc); @@ -3993,8 +4262,9 @@ TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_STATE_SET, TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_ACTION, mdt_hsm_action), TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_REQUEST, mdt_hsm_request), -TGT_MDT_HDL(HABEO_CORPUS|HABEO_REFERO | MUTABOR, MDS_SWAP_LAYOUTS, - mdt_swap_layouts) +TGT_MDT_HDL(HABEO_CLAVIS | HABEO_CORPUS | HABEO_REFERO | MUTABOR, + MDS_SWAP_LAYOUTS, + mdt_swap_layouts), }; static struct tgt_handler mdt_sec_ctx_ops[] = { @@ -4029,8 +4299,8 @@ static struct tgt_opc_slice mdt_common_slice[] = { .tos_hs = mdt_sec_ctx_ops }, { - .tos_opc_start = UPDATE_OBJ, - .tos_opc_end = UPDATE_LAST_OPC, + .tos_opc_start = OUT_UPDATE_FIRST_OPC, + .tos_opc_end = OUT_UPDATE_LAST_OPC, .tos_hs = tgt_out_handlers }, { @@ -4053,6 +4323,11 @@ static struct tgt_opc_slice mdt_common_slice[] = { .tos_opc_end = LLOG_LAST_OPC, .tos_hs = tgt_llog_handlers }, + { + .tos_opc_start = LFSCK_FIRST_OPC, + .tos_opc_end = LFSCK_LAST_OPC, + .tos_hs = tgt_lfsck_handlers + }, { .tos_hs = NULL @@ -4061,15 +4336,18 @@ static struct tgt_opc_slice mdt_common_slice[] = { static void mdt_fini(const struct lu_env *env, struct mdt_device *m) { - struct md_device *next = m->mdt_child; - struct lu_device *d = &m->mdt_lu_dev; - struct obd_device *obd = mdt2obd_dev(m); - ENTRY; - - target_recovery_fini(obd); + struct md_device *next = m->mdt_child; + struct lu_device *d = &m->mdt_lu_dev; + struct obd_device *obd = mdt2obd_dev(m); + struct lfsck_stop stop; + ENTRY; - ping_evictor_stop(); + stop.ls_status = LS_PAUSED; + stop.ls_flags = 0; + next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop); + target_recovery_fini(obd); + ping_evictor_stop(); mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child)); if (m->mdt_opts.mo_coordinator) @@ -4097,14 +4375,7 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) mdt_quota_fini(env, m); - cfs_free_nidlist(&m->mdt_nosquash_nids); - if (m->mdt_nosquash_str) { - OBD_FREE(m->mdt_nosquash_str, m->mdt_nosquash_strlen); - m->mdt_nosquash_str = NULL; - m->mdt_nosquash_strlen = 0; - } - - next->md_ops->mdo_iocontrol(env, next, OBD_IOC_PAUSE_LFSCK, 0, NULL); + cfs_free_nidlist(&m->mdt_squash.rsi_nosquash_nids); mdt_seq_fini(env, m); mdt_fld_fini(env, m); @@ -4118,14 +4389,14 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) */ mdt_stack_fini(env, m, md2lu_dev(m->mdt_child)); - LASSERT(cfs_atomic_read(&d->ld_ref) == 0); + LASSERT(atomic_read(&d->ld_ref) == 0); - server_put_mount(mdt_obd_name(m)); + server_put_mount(mdt_obd_name(m), true); EXIT; } -int mdt_postrecov(const struct lu_env *, struct mdt_device *); +static int mdt_postrecov(const struct lu_env *, struct mdt_device *); static int mdt_init0(const struct lu_env *env, struct mdt_device *m, struct lu_device_type *ldt, struct lustre_cfg *cfg) @@ -4190,12 +4461,10 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, m->mdt_capa_timeout = CAPA_TIMEOUT; m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1; m->mdt_ck_timeout = CAPA_KEY_TIMEOUT; - m->mdt_squash_uid = 0; - m->mdt_squash_gid = 0; - CFS_INIT_LIST_HEAD(&m->mdt_nosquash_nids); - m->mdt_nosquash_str = NULL; - m->mdt_nosquash_strlen = 0; - init_rwsem(&m->mdt_squash_sem); + m->mdt_squash.rsi_uid = 0; + m->mdt_squash.rsi_gid = 0; + INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids); + init_rwsem(&m->mdt_squash.rsi_sem); spin_lock_init(&m->mdt_osfs_lock); m->mdt_osfs_age = cfs_time_shift_64(-1000); m->mdt_enable_remote_dir = 0; @@ -4242,7 +4511,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, if (rc) GOTO(err_fini_stack, rc); - rc = mdt_seq_init(env, mdt_obd_name(m), m); + rc = mdt_seq_init(env, m); if (rc) GOTO(err_fini_fld, rc); @@ -4376,7 +4645,7 @@ err_fini_stack: mdt_stack_fini(env, m, md2lu_dev(m->mdt_child)); err_lmi: if (lmi) - server_put_mount(dev); + server_put_mount(dev, true); return(rc); } @@ -4404,7 +4673,6 @@ static int mdt_process_config(const struct lu_env *env, switch (cfg->lcfg_command) { case LCFG_PARAM: { - struct lprocfs_static_vars lvars; struct obd_device *obd = d->ld_obd; /* For interoperability */ @@ -4439,14 +4707,13 @@ static int mdt_process_config(const struct lu_env *env, } } - lprocfs_mdt_init_vars(&lvars); - rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, - cfg, obd); + rc = class_process_proc_seq_param(PARAM_MDT, obd->obd_vars, + cfg, obd); if (rc > 0 || rc == -ENOSYS) { /* is it an HSM var ? */ - rc = class_process_proc_param(PARAM_HSM, - hsm_cdt_get_proc_vars(), - cfg, obd); + rc = class_process_proc_seq_param(PARAM_HSM, + hsm_cdt_get_proc_vars(), + cfg, obd); if (rc > 0 || rc == -ENOSYS) /* we don't understand; pass it on */ rc = next->ld_ops->ldo_process_config(env, next, @@ -4467,24 +4734,24 @@ static int mdt_process_config(const struct lu_env *env, } static struct lu_object *mdt_object_alloc(const struct lu_env *env, - const struct lu_object_header *hdr, - struct lu_device *d) + const struct lu_object_header *hdr, + struct lu_device *d) { - struct mdt_object *mo; + struct mdt_object *mo; - ENTRY; + ENTRY; - OBD_SLAB_ALLOC_PTR_GFP(mo, mdt_object_kmem, __GFP_IO); - if (mo != NULL) { - struct lu_object *o; - struct lu_object_header *h; + OBD_SLAB_ALLOC_PTR_GFP(mo, mdt_object_kmem, GFP_NOFS); + if (mo != NULL) { + struct lu_object *o; + struct lu_object_header *h; o = &mo->mot_obj; - h = &mo->mot_header; - lu_object_header_init(h); - lu_object_init(o, h, d); - lu_object_add_top(h, o); - o->lo_ops = &mdt_obj_ops; + h = &mo->mot_header; + lu_object_header_init(h); + lu_object_init(o, h, d); + lu_object_add_top(h, o); + o->lo_ops = &mdt_obj_ops; mutex_init(&mo->mot_ioepoch_mutex); mutex_init(&mo->mot_lov_mutex); init_rwsem(&mo->mot_open_sem); @@ -4552,7 +4819,6 @@ static int mdt_prepare(const struct lu_env *env, struct mdt_device *mdt = mdt_dev(cdev); struct lu_device *next = &mdt->mdt_child->md_lu_dev; struct obd_device *obd = cdev->ld_obd; - struct lfsck_start_param lsp; int rc; ENTRY; @@ -4571,16 +4837,10 @@ static int mdt_prepare(const struct lu_env *env, if (rc) RETURN(rc); - lsp.lsp_start = NULL; - lsp.lsp_namespace = mdt->mdt_namespace; - rc = mdt->mdt_child->md_ops->mdo_iocontrol(env, mdt->mdt_child, - OBD_IOC_START_LFSCK, - 0, &lsp); - if (rc != 0) { - CWARN("%s: auto trigger paused LFSCK failed: rc = %d\n", - mdt_obd_name(mdt), rc); - rc = 0; - } + rc = lfsck_register_namespace(env, mdt->mdt_bottom, mdt->mdt_namespace); + /* The LFSCK instance is registered just now, so it must be there when + * register the namespace to such instance. */ + LASSERTF(rc == 0, "register namespace failed: rc = %d\n", rc); if (mdt->mdt_seq_site.ss_node_id == 0) { rc = mdt->mdt_child->md_ops->mdo_root_get(env, mdt->mdt_child, @@ -4766,7 +5026,9 @@ static int mdt_obd_connect(const struct lu_env *env, */ if (!test_bit(MDT_FL_SYNCED, &mdt->mdt_state) && data != NULL && !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) { - rc = obd_health_check(env, mdt->mdt_child_exp->exp_obd); + rc = obd_get_info(env, mdt->mdt_child_exp, + sizeof(KEY_OSP_CONNECTED), + KEY_OSP_CONNECTED, NULL, NULL, NULL); if (rc) RETURN(-EAGAIN); set_bit(MDT_FL_SYNCED, &mdt->mdt_state); @@ -4788,6 +5050,9 @@ static int mdt_obd_connect(const struct lu_env *env, rc = tgt_client_new(env, lexp); if (rc == 0) mdt_export_stats_init(obd, lexp, localdata); + + /* For phase I, sync for cross-ref operation. */ + lexp->exp_keep_sync = 1; } if (rc != 0) { @@ -5021,8 +5286,8 @@ struct path_lookup_info { int pli_fidcount; /**< number of \a pli_fids */ }; -static int mdt_links_read(struct mdt_thread_info *info, - struct mdt_object *mdt_obj, struct linkea_data *ldata) +int mdt_links_read(struct mdt_thread_info *info, struct mdt_object *mdt_obj, + struct linkea_data *ldata) { int rc; @@ -5080,19 +5345,46 @@ static int mdt_path_current(struct mdt_thread_info *info, --ptr; pli->pli_fidcount = 0; pli->pli_fids[0] = *(struct lu_fid *)mdt_object_fid(pli->pli_mdt_obj); - + *tmpfid = pli->pli_fids[0]; /* root FID only exists on MDT0, and fid2path should also ends at MDT0, * so checking root_fid can only happen on MDT0. */ while (!lu_fid_eq(&mdt->mdt_md_root_fid, &pli->pli_fids[pli->pli_fidcount])) { - mdt_obj = mdt_object_find(info->mti_env, mdt, - &pli->pli_fids[pli->pli_fidcount]); + struct lu_buf lmv_buf; + + mdt_obj = mdt_object_find(info->mti_env, mdt, tmpfid); if (IS_ERR(mdt_obj)) GOTO(out, rc = PTR_ERR(mdt_obj)); + if (mdt_object_remote(mdt_obj)) { mdt_object_put(info->mti_env, mdt_obj); GOTO(remote_out, rc = -EREMOTE); } + + lmv_buf.lb_buf = info->mti_xattr_buf; + lmv_buf.lb_len = sizeof(info->mti_xattr_buf); + + /* Check if it is slave stripes */ + rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj), + &lmv_buf, XATTR_NAME_LMV); + if (rc > 0) { + union lmv_mds_md *lmm = lmv_buf.lb_buf; + + /* For slave stripes, get its master */ + if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE) { + struct lmv_mds_md_v1 *lmm1 = &lmm->lmv_md_v1; + + fid_le_to_cpu(tmpfid, &lmm1->lmv_master_fid); + if (!fid_is_sane(tmpfid)) { + mdt_object_put(info->mti_env, mdt_obj); + GOTO(out, rc = -EINVAL); + } + mdt_object_put(info->mti_env, mdt_obj); + pli->pli_fids[pli->pli_fidcount] = *tmpfid; + continue; + } + } + if (!mdt_object_exists(mdt_obj)) { mdt_object_put(info->mti_env, mdt_obj); GOTO(out, rc = -ENOENT); @@ -5425,14 +5717,24 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, } lsp.lsp_start = (struct lfsck_start *)(data->ioc_inlbuf1); - lsp.lsp_namespace = mdt->mdt_namespace; + lsp.lsp_index_valid = 0; rc = next->md_ops->mdo_iocontrol(&env, next, cmd, 0, &lsp); break; } case OBD_IOC_STOP_LFSCK: { - struct md_device *next = mdt->mdt_child; + struct md_device *next = mdt->mdt_child; + struct obd_ioctl_data *data = karg; + struct lfsck_stop stop; + + stop.ls_status = LS_STOPPED; + /* Old lfsck utils may pass NULL @stop. */ + if (data->ioc_inlbuf1 == NULL) + stop.ls_flags = 0; + else + stop.ls_flags = + ((struct lfsck_stop *)(data->ioc_inlbuf1))->ls_flags; - rc = next->md_ops->mdo_iocontrol(&env, next, cmd, 0, NULL); + rc = next->md_ops->mdo_iocontrol(&env, next, cmd, 0, &stop); break; } case OBD_IOC_GET_OBJ_VERSION: { @@ -5446,9 +5748,15 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, rc = mdt_ioc_version_get(mti, karg); break; } - case OBD_IOC_CATLOGLIST: - rc = llog_catalog_list(&env, mdt->mdt_bottom, 0, karg); + case OBD_IOC_CATLOGLIST: { + struct mdt_thread_info *mti; + + mti = lu_context_key_get(&env.le_ctx, &mdt_thread_key); + lu_local_obj_fid(&mti->mti_tmp_fid1, LLOG_CATALOGS_OID); + rc = llog_catalog_list(&env, mdt->mdt_bottom, 0, karg, + &mti->mti_tmp_fid1); break; + } default: rc = -EOPNOTSUPP; CERROR("%s: Not supported cmd = %d, rc = %d\n", @@ -5459,17 +5767,27 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, RETURN(rc); } -int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt) +static int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt) { - struct lu_device *ld = md2lu_dev(mdt->mdt_child); - int rc; - ENTRY; + struct lu_device *ld = md2lu_dev(mdt->mdt_child); + struct lfsck_start_param lsp; + int rc; + ENTRY; - rc = ld->ld_ops->ldo_recovery_complete(env, ld); - RETURN(rc); + lsp.lsp_start = NULL; + lsp.lsp_index_valid = 0; + rc = mdt->mdt_child->md_ops->mdo_iocontrol(env, mdt->mdt_child, + OBD_IOC_START_LFSCK, + 0, &lsp); + if (rc != 0 && rc != -EALREADY) + CWARN("%s: auto trigger paused LFSCK failed: rc = %d\n", + mdt_obd_name(mdt), rc); + + rc = ld->ld_ops->ldo_recovery_complete(env, ld); + RETURN(rc); } -int mdt_obd_postrecov(struct obd_device *obd) +static int mdt_obd_postrecov(struct obd_device *obd) { struct lu_env env; int rc; @@ -5619,7 +5937,6 @@ static struct lu_device_type mdt_device_type = { static int __init mdt_mod_init(void) { - struct lprocfs_static_vars lvars; int rc; CLASSERT(sizeof("0x0123456789ABCDEF:0x01234567:0x01234567") == @@ -5634,10 +5951,11 @@ static int __init mdt_mod_init(void) if (rc) GOTO(lu_fini, rc); - lprocfs_mdt_init_vars(&lvars); - rc = class_register_type(&mdt_obd_device_ops, NULL, - lvars.module_vars, LUSTRE_MDT_NAME, - &mdt_device_type); + rc = class_register_type(&mdt_obd_device_ops, NULL, true, NULL, +#ifndef HAVE_ONLY_PROCFS_SEQ + NULL, +#endif + LUSTRE_MDT_NAME, &mdt_device_type); if (rc) GOTO(mds_fini, rc); lu_fini: