X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_handler.c;h=59b6d343dbaf28dfb04b6a5036146b8b48fd8672;hp=097cbb081e0609a568a1807c6bc5fdd588487b6a;hb=c961228f1c30254c454ed1432ba83af3aa7c39b4;hpb=57373a2946f119c3d85dcb9fea13fcc3d3369b72 diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 097cbb0..59b6d34 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -91,7 +91,6 @@ ldlm_mode_t mdt_dlm_lock_modes[] = { [MDL_GROUP] = LCK_GROUP }; - static struct mdt_device *mdt_dev(struct lu_device *d); static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags); static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt, @@ -142,6 +141,7 @@ void mdt_lock_reg_init(struct mdt_lock_handle *lh, ldlm_mode_t lm) { lh->mlh_pdo_hash = 0; lh->mlh_reg_mode = lm; + lh->mlh_rreg_mode = lm; lh->mlh_type = MDT_REG_LOCK; } @@ -149,6 +149,7 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lm, const char *name, int namelen) { lh->mlh_reg_mode = lm; + lh->mlh_rreg_mode = lm; lh->mlh_type = MDT_PDO_LOCK; if (name != NULL && (name[0] != '\0')) { @@ -259,7 +260,7 @@ int mdt_getstatus(struct mdt_thread_info *info) repbody->valid |= OBD_MD_FLID; if (mdt->mdt_opts.mo_mds_capa && - info->mti_exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) { + exp_connect_flags(info->mti_exp) & OBD_CONNECT_MDS_CAPA) { struct mdt_object *root; struct lustre_capa *capa; @@ -398,12 +399,6 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, if (fid) { b->fid1 = *fid; b->valid |= OBD_MD_FLID; - - /* FIXME: these should be fixed when new igif ready.*/ - b->ino = fid_oid(fid); /* 1.6 compatibility */ - b->generation = fid_ver(fid); /* 1.6 compatibility */ - b->valid |= OBD_MD_FLGENER; /* 1.6 compatibility */ - CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, size="LPU64"\n", PFID(fid), b->nlink, b->mode, b->size); } @@ -432,9 +427,9 @@ void mdt_client_compatibility(struct mdt_thread_info *info) struct lu_attr *la = &ma->ma_attr; ENTRY; - if (exp->exp_connect_flags & OBD_CONNECT_LAYOUTLOCK) - /* the client can deal with 16-bit lmm_stripe_count */ - RETURN_EXIT; + if (exp_connect_flags(exp) & OBD_CONNECT_LAYOUTLOCK) + /* the client can deal with 16-bit lmm_stripe_count */ + RETURN_EXIT; body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); @@ -689,13 +684,12 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, ma->ma_valid = 0; - rc = mdt_object_exists(o); - if (rc < 0) { - /* This object is located on remote node.*/ - repbody->fid1 = *mdt_object_fid(o); - repbody->valid = OBD_MD_FLID | OBD_MD_MDS; - GOTO(out, rc = 0); - } + if (mdt_object_remote(o)) { + /* This object is located on remote node.*/ + repbody->fid1 = *mdt_object_fid(o); + repbody->valid = OBD_MD_FLID | OBD_MD_MDS; + GOTO(out, rc = 0); + } buffer->lb_len = reqbody->eadatasize; if (buffer->lb_len > 0) @@ -713,7 +707,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, } else { ma->ma_lmm = buffer->lb_buf; ma->ma_lmm_size = buffer->lb_len; - ma->ma_need = MA_LOV | MA_INODE; + ma->ma_need = MA_LOV | MA_INODE | MA_HSM; } if (S_ISDIR(lu_object_attr(&next->mo_lu)) && @@ -838,8 +832,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, } } #ifdef CONFIG_FS_POSIX_ACL - else if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && - (reqbody->valid & OBD_MD_FLACL)) { + else if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) && + (reqbody->valid & OBD_MD_FLACL)) { buffer->lb_buf = req_capsule_server_get(pill, &RMF_ACL); buffer->lb_len = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER); @@ -865,9 +859,9 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, } #endif - if (reqbody->valid & OBD_MD_FLMDSCAPA && - info->mti_mdt->mdt_opts.mo_mds_capa && - info->mti_exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) { + if (reqbody->valid & OBD_MD_FLMDSCAPA && + info->mti_mdt->mdt_opts.mo_mds_capa && + exp_connect_flags(info->mti_exp) & OBD_CONNECT_MDS_CAPA) { struct lustre_capa *capa; capa = req_capsule_server_get(pill, &RMF_CAPA1); @@ -898,9 +892,9 @@ static int mdt_renew_capa(struct mdt_thread_info *info) * return directly, client will find body->valid OBD_MD_FLOSSCAPA * flag not set. */ - if (!obj || !info->mti_mdt->mdt_opts.mo_oss_capa || - !(info->mti_exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA)) - RETURN(0); + if (!obj || !info->mti_mdt->mdt_opts.mo_oss_capa || + !(exp_connect_flags(info->mti_exp) & OBD_CONNECT_OSS_CAPA)) + RETURN(0); body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); LASSERT(body != NULL); @@ -998,18 +992,18 @@ int mdt_is_subdir(struct mdt_thread_info *info) repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); - /* - * We save last checked parent fid to @repbody->fid1 for remote - * directory case. - */ - LASSERT(fid_is_sane(&body->fid2)); - LASSERT(mdt_object_exists(o) > 0); - rc = mdo_is_subdir(info->mti_env, mdt_object_child(o), - &body->fid2, &repbody->fid1); - if (rc == 0 || rc == -EREMOTE) - repbody->valid |= OBD_MD_FLID; + /* + * We save last checked parent fid to @repbody->fid1 for remote + * directory case. + */ + LASSERT(fid_is_sane(&body->fid2)); + LASSERT(mdt_object_exists(o) && !mdt_object_remote(o)); + rc = mdo_is_subdir(info->mti_env, mdt_object_child(o), + &body->fid2, &repbody->fid1); + if (rc == 0 || rc == -EREMOTE) + repbody->valid |= OBD_MD_FLID; - RETURN(rc); + RETURN(rc); } static int mdt_raw_lookup(struct mdt_thread_info *info, @@ -1122,14 +1116,14 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, } mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD); - rc = mdt_object_exists(parent); - if (unlikely(rc == 0)) { + if (unlikely(!mdt_object_exists(parent))) { LU_OBJECT_DEBUG(D_INODE, info->mti_env, &parent->mot_obj.mo_lu, "Parent doesn't exist!\n"); RETURN(-ESTALE); } else if (!info->mti_cross_ref) { - LASSERTF(rc > 0, "Parent "DFID" is on remote server\n", + LASSERTF(!mdt_object_remote(parent), + "Parent "DFID" is on remote server\n", PFID(mdt_object_fid(parent))); } if (lname) { @@ -1166,9 +1160,9 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, * needed here but update is. */ child_bits &= ~MDS_INODELOCK_LOOKUP; - child_bits |= MDS_INODELOCK_UPDATE; + child_bits |= MDS_INODELOCK_PERM | MDS_INODELOCK_UPDATE; - rc = mdt_object_lock(info, child, lhc, child_bits, + rc = mdt_object_lock(info, child, lhc, child_bits, MDT_LOCAL_LOCK); } if (rc == 0) { @@ -1253,14 +1247,15 @@ relock: mdt_lock_handle_init(lhc); mdt_lock_reg_init(lhc, LCK_PR); - if (mdt_object_exists(child) == 0) { - LU_OBJECT_DEBUG(D_INODE, info->mti_env, - &child->mot_obj.mo_lu, - "Object doesn't exist!\n"); - GOTO(out_child, rc = -ENOENT); - } + if (!mdt_object_exists(child)) { + LU_OBJECT_DEBUG(D_INODE, info->mti_env, + &child->mot_obj.mo_lu, + "Object doesn't exist!\n"); + GOTO(out_child, rc = -ENOENT); + } - if (!(child_bits & MDS_INODELOCK_UPDATE)) { + if (!(child_bits & MDS_INODELOCK_UPDATE) && + mdt_object_exists(child) && !mdt_object_remote(child)) { struct md_attr *ma = &info->mti_attr; ma->ma_valid = 0; @@ -1337,7 +1332,8 @@ relock: (unsigned long)res_id->name[1], (unsigned long)res_id->name[2], PFID(mdt_object_fid(child))); - mdt_pack_size2body(info, child); + if (mdt_object_exists(child) && !mdt_object_remote(child)) + mdt_pack_size2body(info, child); } if (lock) LDLM_LOCK_PUT(lock); @@ -1428,9 +1424,11 @@ int mdt_set_info(struct mdt_thread_info *info) spin_lock(&req->rq_export->exp_lock); if (*(__u32 *)val) - req->rq_export->exp_connect_flags |= OBD_CONNECT_RDONLY; + *exp_connect_flags_ptr(req->rq_export) |= + OBD_CONNECT_RDONLY; else - req->rq_export->exp_connect_flags &=~OBD_CONNECT_RDONLY; + *exp_connect_flags_ptr(req->rq_export) &= + ~OBD_CONNECT_RDONLY; spin_unlock(&req->rq_export->exp_lock); } else if (KEY_IS(KEY_CHANGELOG_CLEAR)) { @@ -1484,7 +1482,7 @@ int mdt_connect(struct mdt_thread_info *info) reply = req_capsule_server_get(info->mti_pill, &RMF_CONNECT_DATA); exp = req->rq_export; spin_lock(&exp->exp_lock); - exp->exp_connect_flags = reply->ocd_connect_flags; + *exp_connect_flags_ptr(exp) = reply->ocd_connect_flags; spin_unlock(&exp->exp_lock); rc = mdt_init_idmap(info); @@ -1523,9 +1521,9 @@ static int mdt_sendpage(struct mdt_thread_info *info, if (desc == NULL) RETURN(-ENOMEM); - if (!(exp->exp_connect_flags & OBD_CONNECT_BRW_SIZE)) + if (!(exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE)) /* old client requires reply size in it's PAGE_SIZE, - * which is rdpg->rp_count */ + * which is rdpg->rp_count */ nob = rdpg->rp_count; for (i = 0, tmpcount = nob; i < rdpg->rp_npages && tmpcount > 0; @@ -1571,10 +1569,10 @@ int mdt_readpage(struct mdt_thread_info *info) } rdpg->rp_attrs = reqbody->mode; - if (info->mti_exp->exp_connect_flags & OBD_CONNECT_64BITHASH) - rdpg->rp_attrs |= LUDA_64BITHASH; - rdpg->rp_count = min_t(unsigned int, reqbody->nlink, - PTLRPC_MAX_BRW_SIZE); + if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_64BITHASH) + rdpg->rp_attrs |= LUDA_64BITHASH; + rdpg->rp_count = min_t(unsigned int, reqbody->nlink, + exp_brw_size(info->mti_exp)); rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT; OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]); @@ -1701,18 +1699,19 @@ static long mdt_reint_opcode(struct mdt_thread_info *info, int mdt_reint(struct mdt_thread_info *info) { - long opc; - int rc; - - static const struct req_format *reint_fmts[REINT_MAX] = { - [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR, - [REINT_CREATE] = &RQF_MDS_REINT_CREATE, - [REINT_LINK] = &RQF_MDS_REINT_LINK, - [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK, - [REINT_RENAME] = &RQF_MDS_REINT_RENAME, - [REINT_OPEN] = &RQF_MDS_REINT_OPEN, - [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR - }; + long opc; + int rc; + + static const struct req_format *reint_fmts[REINT_MAX] = { + [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR, + [REINT_CREATE] = &RQF_MDS_REINT_CREATE, + [REINT_LINK] = &RQF_MDS_REINT_LINK, + [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK, + [REINT_RENAME] = &RQF_MDS_REINT_RENAME, + [REINT_OPEN] = &RQF_MDS_REINT_OPEN, + [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR, + [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK + }; ENTRY; @@ -2007,7 +2006,7 @@ int mdt_obd_idx_read(struct mdt_thread_info *info) if (req_ii->ii_count <= 0) GOTO(out, rc = -EFAULT); rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT, - PTLRPC_MAX_BRW_SIZE); + exp_brw_size(info->mti_exp)); rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE -1) >> CFS_PAGE_SHIFT; /* allocate pages to store the containers */ @@ -2382,6 +2381,57 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, RETURN(rc); } +int mdt_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, int flag) +{ + struct lustre_handle lockh; + int rc; + + switch (flag) { + case LDLM_CB_BLOCKING: + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc < 0) { + CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc); + RETURN(rc); + } + break; + case LDLM_CB_CANCELING: + LDLM_DEBUG(lock, "Revoke remote lock\n"); + break; + default: + LBUG(); + } + RETURN(0); +} + +int mdt_remote_object_lock(struct mdt_thread_info *mti, + struct mdt_object *o, struct lustre_handle *lh, + ldlm_mode_t mode, __u64 ibits) +{ + struct ldlm_enqueue_info *einfo = &mti->mti_einfo; + ldlm_policy_data_t *policy = &mti->mti_policy; + int rc = 0; + ENTRY; + + LASSERT(mdt_object_remote(o)); + + LASSERT((ibits & MDS_INODELOCK_UPDATE)); + + memset(einfo, 0, sizeof(*einfo)); + einfo->ei_type = LDLM_IBITS; + einfo->ei_mode = mode; + einfo->ei_cb_bl = mdt_md_blocking_ast; + einfo->ei_cb_cp = ldlm_completion_ast; + + memset(policy, 0, sizeof(*policy)); + policy->l_inodebits.bits = ibits; + + rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo, + policy); + RETURN(rc); +} + static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o, struct mdt_lock_handle *lh, __u64 ibits, bool nonblock, int locality) @@ -2398,20 +2448,23 @@ static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o, LASSERT(lh->mlh_reg_mode != LCK_MINMODE); LASSERT(lh->mlh_type != MDT_NUL_LOCK); - if (mdt_object_exists(o) < 0) { + if (mdt_object_remote(o)) { if (locality == MDT_CROSS_LOCK) { - /* cross-ref object fix */ - ibits &= ~MDS_INODELOCK_UPDATE; + ibits &= ~(MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM); ibits |= MDS_INODELOCK_LOOKUP; } else { - LASSERT(!(ibits & MDS_INODELOCK_UPDATE)); + LASSERTF(!(ibits & + (MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM)), + "%s: wrong bit "LPX64" for remote obj "DFID"\n", + mdt_obd_name(info->mti_mdt), ibits, + PFID(mdt_object_fid(o))); LASSERT(ibits & MDS_INODELOCK_LOOKUP); } /* No PDO lock on remote object */ LASSERT(lh->mlh_type != MDT_PDO_LOCK); } - if (lh->mlh_type == MDT_PDO_LOCK) { + if (lh->mlh_type == MDT_PDO_LOCK) { /* check for exists after object is locked */ if (mdt_object_exists(o) == 0) { /* Non-existent object shouldn't have PDO lock */ @@ -2572,6 +2625,9 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o, mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref); mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref); + if (lustre_handle_is_used(&lh->mlh_rreg_lh)) + ldlm_lock_decref(&lh->mlh_rreg_lh, lh->mlh_rreg_mode); + EXIT; } @@ -2605,8 +2661,7 @@ void mdt_object_unlock_put(struct mdt_thread_info * info, mdt_object_put(info->mti_env, o); } -static struct mdt_handler *mdt_handler_find(__u32 opc, - struct mdt_opc_slice *supported) +struct mdt_handler *mdt_handler_find(__u32 opc, struct mdt_opc_slice *supported) { struct mdt_opc_slice *s; struct mdt_handler *h; @@ -2783,10 +2838,10 @@ static int mdt_req_handle(struct mdt_thread_info *info, rc = mdt_unpack_req_pack_rep(info, flags); } - if (rc == 0 && flags & MUTABOR && - req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY) - /* should it be rq_status? */ - rc = -EROFS; + if (rc == 0 && flags & MUTABOR && + exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY) + /* should it be rq_status? */ + rc = -EROFS; if (rc == 0 && flags & HABEO_CLAVIS) { struct ldlm_request *dlm_req; @@ -2887,6 +2942,8 @@ void mdt_lock_handle_init(struct mdt_lock_handle *lh) lh->mlh_reg_mode = LCK_MINMODE; lh->mlh_pdo_lh.cookie = 0ull; lh->mlh_pdo_mode = LCK_MINMODE; + lh->mlh_rreg_lh.cookie = 0ull; + lh->mlh_rreg_mode = LCK_MINMODE; } void mdt_lock_handle_fini(struct mdt_lock_handle *lh) @@ -3100,8 +3157,16 @@ static int mdt_msg_check_version(struct lustre_msg *msg) case MDS_SETXATTR: case MDS_SET_INFO: case MDS_GET_INFO: + case MDS_HSM_PROGRESS: + case MDS_HSM_REQUEST: + case MDS_HSM_CT_REGISTER: + case MDS_HSM_CT_UNREGISTER: + case MDS_HSM_STATE_GET: + case MDS_HSM_STATE_SET: + case MDS_HSM_ACTION: case MDS_QUOTACHECK: case MDS_QUOTACTL: + case UPDATE_OBJ: case QUOTA_DQACQ: case QUOTA_DQREL: case SEQ_QUERY: @@ -3504,10 +3569,11 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, switch (opcode) { case MDT_IT_LOOKUP: - child_bits = MDS_INODELOCK_LOOKUP; + child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM; break; case MDT_IT_GETATTR: - child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE; + child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE | + MDS_INODELOCK_PERM; break; default: CERROR("Unsupported intent (%d)\n", opcode); @@ -3751,9 +3817,9 @@ static int mdt_intent_opc(long itopc, struct mdt_thread_info *info, rc = mdt_unpack_req_pack_rep(info, flv->it_flags); if (rc == 0) { struct ptlrpc_request *req = mdt_info_req(info); - if (flv->it_flags & MUTABOR && - req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY) - RETURN(-EROFS); + if (flv->it_flags & MUTABOR && + exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY) + RETURN(-EROFS); } if (rc == 0 && flv->it_act != NULL) { /* execute policy */ @@ -3910,100 +3976,6 @@ out_seq_fini: } /* - * Init client sequence manager which is used by local MDS to talk to sequence - * controller on remote node. - */ -static int mdt_seq_init_cli(const struct lu_env *env, - struct mdt_device *m, - struct lustre_cfg *cfg) -{ - struct seq_server_site *ss = mdt_seq_site(m); - struct obd_device *mdc; - int rc; - int index; - struct mdt_thread_info *info; - char *p, *index_string = lustre_cfg_string(cfg, 2); - ENTRY; - - info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); - - LASSERT(index_string); - index = simple_strtol(index_string, &p, 10); - if (*p) { - CERROR("%s: Invalid index in lustre_cgf, offset 2\n", - mdt2obd_dev(m)->obd_name); - RETURN(-EINVAL); - } - - /* check if this is adding the first MDC and controller is not yet - * initialized. */ - if (index != 0 || ss->ss_client_seq) - RETURN(0); - - mdc = class_name2obd(lustre_cfg_string(cfg, 1)); - if (!mdc) { - CERROR("%s: can't find %s device\n", - mdt2obd_dev(m)->obd_name, lustre_cfg_string(cfg, 1)); - rc = -ENOENT; - } else if (!mdc->obd_set_up) { - CERROR("%s: target %s not set up\n", - mdt2obd_dev(m)->obd_name, mdc->obd_name); - rc = -EINVAL; - } else { - LASSERT(ss->ss_control_exp); - OBD_ALLOC_PTR(ss->ss_client_seq); - if (ss->ss_client_seq != NULL) { - char *prefix; - - OBD_ALLOC(prefix, MAX_OBD_NAME + 5); - if (!prefix) - RETURN(-ENOMEM); - - snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s", - mdc->obd_name); - - rc = seq_client_init(ss->ss_client_seq, - ss->ss_control_exp, - LUSTRE_SEQ_METADATA, - prefix, NULL); - OBD_FREE(prefix, MAX_OBD_NAME + 5); - } else - rc = -ENOMEM; - - if (rc) - RETURN(rc); - - LASSERT(ss->ss_server_seq != NULL); - rc = seq_server_set_cli(ss->ss_server_seq, ss->ss_client_seq, - env); - } - - RETURN(rc); -} - -static void mdt_seq_fini_cli(struct mdt_device *m) -{ - struct seq_server_site *ss; - - ENTRY; - - ss = mdt_seq_site(m); - - if (ss == NULL) - RETURN_EXIT; - - if (ss->ss_server_seq) - seq_server_set_cli(ss->ss_server_seq, NULL, NULL); - - if (ss->ss_control_exp) { - class_export_put(ss->ss_control_exp); - ss->ss_control_exp = NULL; - } - - EXIT; -} - -/* * FLD wrappers */ static int mdt_fld_fini(const struct lu_env *env, @@ -4036,7 +4008,7 @@ static int mdt_fld_init(const struct lu_env *env, RETURN(rc = -ENOMEM); rc = fld_server_init(env, ss->ss_server_fld, m->mdt_bottom, uuid, - ss->ss_node_id); + ss->ss_node_id, LU_SEQ_RANGE_MDT); if (rc) { OBD_FREE_PTR(ss->ss_server_fld); ss->ss_server_fld = NULL; @@ -4046,6 +4018,43 @@ static int mdt_fld_init(const struct lu_env *env, RETURN(0); } +static void mdt_stack_pre_fini(const struct lu_env *env, + struct mdt_device *m, struct lu_device *top) +{ + struct obd_device *obd = mdt2obd_dev(m); + struct lustre_cfg_bufs *bufs; + struct lustre_cfg *lcfg; + struct mdt_thread_info *info; + ENTRY; + + LASSERT(top); + + info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + LASSERT(info != NULL); + + bufs = &info->mti_u.bufs; + + LASSERT(m->mdt_child_exp); + LASSERT(m->mdt_child_exp->exp_obd); + obd = m->mdt_child_exp->exp_obd; + + /* process cleanup, pass mdt obd name to get obd umount flags */ + /* XXX: this is needed because all layers are referenced by + * objects (some of them are pinned by osd, for example * + * the proper solution should be a model where object used + * by osd only doesn't have mdt/mdd slices -bzzz */ + lustre_cfg_bufs_reset(bufs, obd->obd_name); + lustre_cfg_bufs_set_string(bufs, 1, NULL); + lcfg = lustre_cfg_new(LCFG_PRE_CLEANUP, bufs); + if (!lcfg) { + CERROR("%s:Cannot alloc lcfg!\n", mdt_obd_name(m)); + return; + } + top->ld_ops->ldo_process_config(env, top, lcfg); + lustre_cfg_free(lcfg); + EXIT; +} + static void mdt_stack_fini(const struct lu_env *env, struct mdt_device *m, struct lu_device *top) { @@ -4430,6 +4439,8 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) ping_evictor_stop(); + + mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child)); mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT); obd_exports_barrier(obd); obd_zombie_barrier(); @@ -4456,8 +4467,9 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) m->mdt_nosquash_strlen = 0; } + next->md_ops->mdo_iocontrol(env, next, OBD_IOC_PAUSE_LFSCK, + 0, NULL); mdt_seq_fini(env, m); - mdt_seq_fini_cli(m); mdt_fld_fini(env, m); sptlrpc_rule_set_free(&m->mdt_sptlrpc_rset); @@ -4575,6 +4587,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, init_rwsem(&m->mdt_squash_sem); spin_lock_init(&m->mdt_osfs_lock); m->mdt_osfs_age = cfs_time_shift_64(-1000); + m->mdt_enable_remote_dir = 0; m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops; m->mdt_md_dev.md_lu_dev.ld_obd = obd; @@ -4616,6 +4629,14 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, if (rc) GOTO(err_fini_stack, rc); + rc = mdt_fld_init(env, obd->obd_name, m); + if (rc) + GOTO(err_lut, rc); + + rc = mdt_seq_init(env, obd->obd_name, m); + if (rc) + GOTO(err_fini_fld, rc); + snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name, LUSTRE_MDT_NAME"-%p", m); m->mdt_namespace = ldlm_namespace_new(obd, info->mti_u.ns_name, @@ -4719,9 +4740,11 @@ err_free_ns: ldlm_namespace_free(m->mdt_namespace, NULL, 0); obd->obd_namespace = m->mdt_namespace = NULL; err_fini_seq: - mdt_seq_fini(env, m); - mdt_fld_fini(env, m); - tgt_fini(env, &m->mdt_lut); + mdt_seq_fini(env, m); +err_fini_fld: + mdt_fld_fini(env, m); +err_lut: + tgt_fini(env, &m->mdt_lut); err_fini_stack: mdt_stack_fini(env, m, md2lu_dev(m->mdt_child)); err_lmi: @@ -4801,17 +4824,6 @@ static int mdt_process_config(const struct lu_env *env, break; } - case LCFG_ADD_MDC: - /* - * Add mdc hook to get first MDT uuid and connect it to - * ls->controller to use for seq manager. - */ - rc = next->ld_ops->ldo_process_config(env, next, cfg); - if (rc) - CERROR("Can't add mdc, rc %d\n", rc); - else - rc = mdt_seq_init_cli(env, mdt_dev(d), cfg); - break; default: /* others are passed further */ rc = next->ld_ops->ldo_process_config(env, next, cfg); @@ -4916,13 +4928,11 @@ static int mdt_prepare(const struct lu_env *env, if (rc) RETURN(rc); - rc = mdt_fld_init(env, obd->obd_name, mdt); - if (rc) - RETURN(rc); - - rc = mdt_seq_init(env, obd->obd_name, mdt); - if (rc) - RETURN(rc); + rc = mdt->mdt_child->md_ops->mdo_iocontrol(env, mdt->mdt_child, + OBD_IOC_START_LFSCK, + 0, NULL); + if (rc != 0) + CWARN("Fail to auto trigger paused LFSCK.\n"); rc = mdt->mdt_child->md_ops->mdo_root_get(env, mdt->mdt_child, &mdt->mdt_md_root_fid); @@ -4981,12 +4991,12 @@ static int mdt_obd_set_info_async(const struct lu_env *env, * Compute the compatibility flags for a connection request based on * features mutually supported by client and server. * - * The obd_export::exp_connect_flags field in \a exp must not be updated - * here, otherwise a partially initialized value may be exposed. After - * the connection request is successfully processed, the top-level MDT - * connect request handler atomically updates the export connect flags - * from the obd_connect_data::ocd_connect_flags field of the reply. - * \see mdt_connect(). + * The obd_export::exp_connect_data.ocd_connect_flags field in \a exp + * must not be updated here, otherwise a partially initialized value may + * be exposed. After the connection request is successfully processed, + * the top-level MDT connect request handler atomically updates the export + * connect flags from the obd_connect_data::ocd_connect_flags field of the + * reply. \see mdt_connect(). * * \param exp the obd_export associated with this client/target pair * \param mdt the target device for the connection @@ -5023,7 +5033,7 @@ static int mdt_connect_internal(struct obd_export *exp, if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) { data->ocd_brw_size = min(data->ocd_brw_size, - (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT)); + (__u32)MD_MAX_BRW_SIZE); if (data->ocd_brw_size == 0) { CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64 " ocd_version: %x ocd_grant: %d " @@ -5039,17 +5049,19 @@ static int mdt_connect_internal(struct obd_export *exp, } } - /* NB: Disregard the rule against updating exp_connect_flags in this - * case, since tgt_client_new() needs to know if this is a lightweight - * connection, and it is safe to expose this flag before connection - * processing completes. */ + /* NB: Disregard the rule against updating + * exp_connect_data.ocd_connect_flags in this case, since + * tgt_client_new() needs to know if this is a lightweight + * connection, and it is safe to expose this flag before + * connection processing completes. */ if (data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT) { spin_lock(&exp->exp_lock); - exp->exp_connect_flags |= OBD_CONNECT_LIGHTWEIGHT; + *exp_connect_flags_ptr(exp) |= OBD_CONNECT_LIGHTWEIGHT; spin_unlock(&exp->exp_lock); } data->ocd_version = LUSTRE_VERSION_CODE; + exp->exp_connect_data = *data; exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known; if ((data->ocd_connect_flags & OBD_CONNECT_FID) == 0) { @@ -5059,12 +5071,24 @@ static int mdt_connect_internal(struct obd_export *exp, } if (mdt->mdt_som_conf && - !(data->ocd_connect_flags & (OBD_CONNECT_MDS_MDS|OBD_CONNECT_SOM))){ + !(data->ocd_connect_flags & (OBD_CONNECT_LIGHTWEIGHT | + OBD_CONNECT_MDS_MDS | + OBD_CONNECT_SOM))) { CWARN("%s: MDS has SOM enabled, but client does not support " "it\n", mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name); return -EBADE; } + if (data->ocd_connect_flags & OBD_CONNECT_PINGLESS) { + if (suppress_pings) { + spin_lock(&exp->exp_obd->obd_dev_lock); + list_del_init(&exp->exp_obd_chain_timed); + spin_unlock(&exp->exp_obd->obd_dev_lock); + } else { + data->ocd_connect_flags &= ~OBD_CONNECT_PINGLESS; + } + } + return 0; } @@ -5140,7 +5164,8 @@ static int mdt_obd_connect(const struct lu_env *env, * XXX: probably not very appropriate method is used now * at some point we should find a better one */ - if (!test_bit(MDT_FL_SYNCED, &mdt->mdt_state)) { + if (!test_bit(MDT_FL_SYNCED, &mdt->mdt_state) && + !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) { rc = obd_health_check(env, mdt->mdt_child_exp->exp_obd); if (rc) RETURN(-EAGAIN); @@ -5389,7 +5414,7 @@ static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt, if (!fid_is_sane(&fp->gf_fid)) RETURN(-EINVAL); - if (!fid_is_norm(&fp->gf_fid) && !fid_is_igif(&fp->gf_fid)) { + if (!fid_is_client_mdt_visible(&fp->gf_fid)) { CWARN("%s: "DFID" is invalid, sequence should be " ">= "LPX64"\n", obd->obd_name, PFID(&fp->gf_fid), (__u64)FID_SEQ_NORMAL); @@ -5510,24 +5535,23 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg) if (IS_ERR(obj)) RETURN(PTR_ERR(obj)); - rc = mdt_object_exists(obj); - if (rc < 0) { - rc = -EREMOTE; - /** - * before calling version get the correct MDS should be - * fid, this is error to find remote object here - */ - CERROR("nonlocal object "DFID"\n", PFID(fid)); - } else if (rc == 0) { - *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION; - rc = -ENOENT; - } else { - version = dt_version_get(mti->mti_env, mdt_obj2dt(obj)); - *(__u64 *)data->ioc_inlbuf2 = version; - rc = 0; - } - mdt_object_unlock_put(mti, obj, lh, 1); - RETURN(rc); + if (mdt_object_remote(obj)) { + rc = -EREMOTE; + /** + * before calling version get the correct MDS should be + * fid, this is error to find remote object here + */ + CERROR("nonlocal object "DFID"\n", PFID(fid)); + } else if (!mdt_object_exists(obj)) { + *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION; + rc = -ENOENT; + } else { + version = dt_version_get(mti->mti_env, mdt_obj2dt(obj)); + *(__u64 *)data->ioc_inlbuf2 = version; + rc = 0; + } + mdt_object_unlock_put(mti, obj, lh, 1); + RETURN(rc); } /* ioctls on obd dev */ @@ -5622,63 +5646,6 @@ int mdt_obd_postrecov(struct obd_device *obd) return rc; } -/** - * Send a copytool req to a client - * Note this sends a request RPC from a server (MDT) to a client (MDC), - * backwards of normal comms. - */ -int mdt_hsm_copytool_send(struct obd_export *exp) -{ - struct kuc_hdr *lh; - struct hsm_action_list *hal; - struct hsm_action_item *hai; - int rc, len; - ENTRY; - - CWARN("%s: writing to mdc at %s\n", exp->exp_obd->obd_name, - libcfs_nid2str(exp->exp_connection->c_peer.nid)); - - len = sizeof(*lh) + sizeof(*hal) + MTI_NAME_MAXLEN + - /* for mockup below */ 2 * cfs_size_round(sizeof(*hai)); - OBD_ALLOC(lh, len); - if (lh == NULL) - RETURN(-ENOMEM); - - lh->kuc_magic = KUC_MAGIC; - lh->kuc_transport = KUC_TRANSPORT_HSM; - lh->kuc_msgtype = HMT_ACTION_LIST; - lh->kuc_msglen = len; - - hal = (struct hsm_action_list *)(lh + 1); - hal->hal_version = HAL_VERSION; - hal->hal_archive_num = 1; - obd_uuid2fsname(hal->hal_fsname, exp->exp_obd->obd_name, - MTI_NAME_MAXLEN); - - /* mock up an action list */ - hal->hal_count = 2; - hai = hai_zero(hal); - hai->hai_action = HSMA_ARCHIVE; - hai->hai_fid.f_oid = 0xA00A; - hai->hai_len = sizeof(*hai); - hai = hai_next(hai); - hai->hai_action = HSMA_RESTORE; - hai->hai_fid.f_oid = 0xB00B; - hai->hai_len = sizeof(*hai); - - /* Uses the ldlm reverse import; this rpc will be seen by - the ldlm_callback_handler */ - rc = do_set_info_async(exp->exp_imp_reverse, - LDLM_SET_INFO, LUSTRE_OBD_VERSION, - sizeof(KEY_HSM_COPYTOOL_SEND), - KEY_HSM_COPYTOOL_SEND, - len, lh, NULL); - - OBD_FREE(lh, len); - - RETURN(rc); -} - static struct obd_ops mdt_obd_device_ops = { .o_owner = THIS_MODULE, .o_set_info_async = mdt_obd_set_info_async,